Skip to content

Commit 07da1b9

Browse files
authored
Test Framework (#121)
Fixes #81
1 parent 4db14cc commit 07da1b9

File tree

91 files changed

+5441
-3938
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+5441
-3938
lines changed

README.md

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,122 @@ While running in a workflow, in addition to features documented elsewhere, the f
465465
* `await handle.signal()` can be called on the handle to signal the external workflow
466466
* `await handle.cancel()` can be called on the handle to send a cancel to the external workflow
467467

468+
#### Testing
469+
470+
Workflow testing can be done in an integration-test fashion against a real server, however it is hard to simulate
471+
timeouts and other long time-based code. Using the time-skipping workflow test environment can help there.
472+
473+
The time-skipping `temporalio.testing.WorkflowEnvironment` can be created via the static async `start_time_skipping()`.
474+
This internally downloads the Temporal time-skipping test server to a temporary directory if it doesn't already exist,
475+
then starts the test server which has special APIs for skipping time.
476+
477+
##### Automatic Time Skipping
478+
479+
Anytime a workflow result is waited on, the time-skipping server automatically advances to the next event it can. To
480+
manually advance time before waiting on the result of a workflow, the `WorkflowEnvironment.sleep` method can be used.
481+
482+
Here's a simple example of a workflow that sleeps for 24 hours:
483+
484+
```python
485+
import asyncio
486+
from temporalio import workflow
487+
488+
@workflow.defn
489+
class WaitADayWorkflow:
490+
@workflow.run
491+
async def run(self) -> str:
492+
await asyncio.sleep(24 * 60 * 60)
493+
return "all done"
494+
```
495+
496+
An integration test of this workflow would be way too slow. However the time-skipping server automatically skips to the
497+
next event when we wait on the result. Here's a test for that workflow:
498+
499+
```python
500+
from temporalio.testing import WorkflowEnvironment
501+
from temporalio.worker import Worker
502+
503+
async def test_wait_a_day_workflow():
504+
async with await WorkflowEnvironment.start_time_skipping() as env:
505+
async with Worker(env.client, task_queue="tq1", workflows=[WaitADayWorkflow]):
506+
assert "all done" == await env.client.execute_workflow(WaitADayWorkflow.run, id="wf1", task_queue="tq1")
507+
```
508+
509+
That test will run almost instantly. This is because by calling `execute_workflow` on our client, we have asked the
510+
environment to automatically skip time as much as it can (basically until the end of the workflow or until an activity
511+
is run).
512+
513+
To disable automatic time-skipping while waiting for a workflow result, run code inside a
514+
`with env.auto_time_skipping_disabled():` block.
515+
516+
##### Manual Time Skipping
517+
518+
Until a workflow is waited on, all time skipping in the time-skipping environment is done manually via
519+
`WorkflowEnvironment.sleep`.
520+
521+
Here's workflow that waits for a signal or times out:
522+
523+
```python
524+
import asyncio
525+
from temporalio import workflow
526+
527+
@workflow.defn
528+
class SignalWorkflow:
529+
def __init__(self) -> None:
530+
self.signal_received = False
531+
532+
@workflow.run
533+
async def run(self) -> str:
534+
# Wait for signal or timeout in 45 seconds
535+
try:
536+
await workflow.wait_condition(lambda: self.signal_received, timeout=45)
537+
return "got signal"
538+
except asyncio.TimeoutError:
539+
return "got timeout"
540+
541+
@workflow.signal
542+
def some_signal(self) -> None:
543+
self.signal_received = True
544+
```
545+
546+
To test a normal signal, you might:
547+
548+
```python
549+
from temporalio.testing import WorkflowEnvironment
550+
from temporalio.worker import Worker
551+
552+
async def test_signal_workflow():
553+
async with await WorkflowEnvironment.start_time_skipping() as env:
554+
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
555+
# Start workflow, send signal, check result
556+
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
557+
await handle.signal(SignalWorkflow.some_signal)
558+
assert "got signal" == await handle.result()
559+
```
560+
561+
But how would you test the timeout part? Like so:
562+
563+
```python
564+
from temporalio.testing import WorkflowEnvironment
565+
from temporalio.worker import Worker
566+
567+
async def test_signal_workflow_timeout():
568+
async with await WorkflowEnvironment.start_time_skipping() as env:
569+
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
570+
# Start workflow, advance time past timeout, check result
571+
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
572+
await env.sleep(50)
573+
assert "got timeout" == await handle.result()
574+
```
575+
576+
Also, the current time of the workflow environment can be obtained via the async `WorkflowEnvironment.get_current_time`
577+
method.
578+
579+
##### Mocking Activities
580+
581+
Activities are just functions decorated with `@activity.defn`. Simply write different ones and pass those to the worker
582+
to have different activities called during the test.
583+
468584
### Activities
469585

470586
#### Definition
@@ -517,6 +633,11 @@ Cancellation for synchronous activities is done in the background and the activi
517633
react appropriately. An activity must heartbeat to receive cancellation and there are other ways to be notified about
518634
cancellation (see "Activity Context" and "Heartbeating and Cancellation" later).
519635

636+
Note, all calls from an activity to functions in the `temporalio.activity` package are powered by
637+
[contextvars](https://docs.python.org/3/library/contextvars.html). Therefore, new threads starting _inside_ of
638+
activities must `copy_context()` and then `.run()` manually to ensure `temporalio.activity` calls like `heartbeat` still
639+
function in the new threads.
640+
520641
###### Synchronous Multithreaded Activities
521642

522643
If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities
@@ -580,6 +701,18 @@ cancellation of all outstanding activities.
580701
The `shutdown()` invocation will wait on all activities to complete, so if a long-running activity does not at least
581702
respect cancellation, the shutdown may never complete.
582703

704+
#### Testing
705+
706+
Unit testing an activity or any code that could run in an activity is done via the
707+
`temporalio.testing.ActivityEnvironment` class. Simply instantiate this and any callable + params passed to `run` will
708+
be invoked inside the activity context. The following are attributes/methods on the environment that can be used to
709+
affect calls activity code might make to functions on the `temporalio.activity` package.
710+
711+
* `info` property can be set to customize what is returned from `activity.info()`
712+
* `on_heartbeat` property can be set to handle `activity.heartbeat()` calls
713+
* `cancel()` can be invoked to simulate a cancellation of the activity
714+
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity
715+
583716
### Workflow Replay
584717

585718
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,

poetry.lock

Lines changed: 48 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pydoctor = { git = "https://github.com/cretz/pydoctor.git", branch = "overloads"
4949
pytest = "^7.1.2"
5050
pytest-asyncio = "^0.18.3"
5151
pytest-timeout = "^2.1.0"
52-
setuptools = "^64.0.1"
52+
setuptools = "^65.0.0"
5353
setuptools-rust = "^1.3.0"
5454
toml = "^0.10.2"
5555
twine = "^4.0.1"
@@ -149,6 +149,8 @@ intersphinx = [
149149
privacy = [
150150
"PRIVATE:temporalio.bridge",
151151
"PRIVATE:temporalio.types",
152+
"HIDDEN:temporalio.testing.activity",
153+
"HIDDEN:temporalio.testing.workflow",
152154
"HIDDEN:temporalio.worker.activity",
153155
"HIDDEN:temporalio.worker.interceptor",
154156
"HIDDEN:temporalio.worker.worker",

temporalio/activity.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
overload,
3131
)
3232

33-
import temporalio.api.common.v1
3433
import temporalio.common
3534
import temporalio.exceptions
3635

@@ -130,8 +129,12 @@ def current() -> _Context:
130129
return context
131130

132131
@staticmethod
133-
def set(context: _Context) -> None:
134-
_current_context.set(context)
132+
def set(context: _Context) -> contextvars.Token:
133+
return _current_context.set(context)
134+
135+
@staticmethod
136+
def reset(token: contextvars.Token) -> None:
137+
_current_context.reset(token)
135138

136139
@property
137140
def logger_details(self) -> Mapping[str, Any]:

0 commit comments

Comments
 (0)