Skip to content

Commit ded3747

Browse files
authored
Encourage threaded activities, warn when max_workers too low, and other small changes (#387)
1 parent d03f356 commit ded3747

File tree

6 files changed

+127
-62
lines changed

6 files changed

+127
-62
lines changed

README.md

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ execute asynchronous, long-running business logic in a scalable and resilient wa
1010
"Temporal Python SDK" is the framework for authoring workflows and activities using the Python programming language.
1111

1212
Also see:
13-
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) - Once you've tried our [Quick Start](#quick-start), check out our guide on how to use Temporal in your Python applications, including information around Temporal core concepts.
13+
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) - Once you've tried our
14+
[Quick Start](#quick-start), check out our guide on how to use Temporal in your Python applications, including
15+
information around Temporal core concepts.
1416
* [Python Code Samples](https://github.com/temporalio/samples-python)
1517
* [API Documentation](https://python.temporal.io) - Complete Temporal Python SDK Package reference.
1618

@@ -84,10 +86,10 @@ informal introduction to the features and their implementation.
8486
- [Activities](#activities)
8587
- [Definition](#definition-1)
8688
- [Types of Activities](#types-of-activities)
87-
- [Asynchronous Activities](#asynchronous-activities)
8889
- [Synchronous Activities](#synchronous-activities)
8990
- [Synchronous Multithreaded Activities](#synchronous-multithreaded-activities)
9091
- [Synchronous Multiprocess/Other Activities](#synchronous-multiprocessother-activities)
92+
- [Asynchronous Activities](#asynchronous-activities)
9193
- [Activity Context](#activity-context)
9294
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
9395
- [Worker Shutdown](#worker-shutdown)
@@ -111,7 +113,9 @@ informal introduction to the features and their implementation.
111113

112114
# Quick Start
113115

114-
We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended as one of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal. For more information, check out the docs references in "Next Steps" below the quick start.
116+
We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended as
117+
one of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal.
118+
For more information, check out the docs references in "Next Steps" below the quick start.
115119

116120
## Installation
117121

@@ -136,7 +140,7 @@ Create the following in `activities.py`:
136140
from temporalio import activity
137141

138142
@activity.defn
139-
async def say_hello(name: str) -> str:
143+
def say_hello(name: str) -> str:
140144
return f"Hello, {name}!"
141145
```
142146

@@ -163,6 +167,7 @@ Create the following in `run_worker.py`:
163167

164168
```python
165169
import asyncio
170+
import concurrent.futures
166171
from temporalio.client import Client
167172
from temporalio.worker import Worker
168173

@@ -175,8 +180,15 @@ async def main():
175180
client = await Client.connect("localhost:7233")
176181

177182
# Run the worker
178-
worker = Worker(client, task_queue="my-task-queue", workflows=[SayHello], activities=[say_hello])
179-
await worker.run()
183+
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
184+
worker = Worker(
185+
client,
186+
task_queue="my-task-queue",
187+
workflows=[SayHello],
188+
activities=[say_hello],
189+
activity_executor=activity_executor,
190+
)
191+
await worker.run()
180192

181193
if __name__ == "__main__":
182194
asyncio.run(main())
@@ -235,8 +247,9 @@ give you much more information about how Temporal works with Python:
235247

236248
# Usage
237249

238-
From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around Temporal concepts.
239-
*This section is not intended as a how-to guide* -- For more how-to oriented information, check out the links in the [Next Steps](#next-steps) section above.
250+
From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around
251+
Temporal concepts. *This section is not intended as a how-to guide* -- For more how-to oriented information, check out
252+
the links in the [Next Steps](#next-steps) section above.
240253

241254
### Client
242255

@@ -269,6 +282,7 @@ Some things to note about the above code:
269282
does the same thing
270283
* Clients can have many more options not shown here (e.g. data converters and interceptors)
271284
* A string can be used instead of the method reference to call a workflow by name (e.g. if defined in another language)
285+
* Clients to not work across forks
272286

273287
Clients also provide a shallow copy of their config for use in making slightly different clients backed by the same
274288
connection. For instance, given the `client` above, this is how to have a client in another namespace:
@@ -516,7 +530,7 @@ class GreetingInfo:
516530
name: str = "<unknown>"
517531

518532
@activity.defn
519-
async def create_greeting_activity(info: GreetingInfo) -> str:
533+
def create_greeting_activity(info: GreetingInfo) -> str:
520534
return f"{info.salutation}, {info.name}!"
521535
```
522536

@@ -1044,13 +1058,14 @@ Activities are decorated with `@activity.defn` like so:
10441058
from temporalio import activity
10451059

10461060
@activity.defn
1047-
async def say_hello_activity(name: str) -> str:
1061+
def say_hello_activity(name: str) -> str:
10481062
return f"Hello, {name}!"
10491063
```
10501064

10511065
Some things to note about activity definitions:
10521066

1053-
* The `say_hello_activity` is `async` which is the recommended activity type (see "Types of Activities" below)
1067+
* The `say_hello_activity` is synchronous which is the recommended activity type (see "Types of Activities" below), but
1068+
it can be `async`
10541069
* A custom name for the activity can be set with a decorator argument, e.g. `@activity.defn(name="my activity")`
10551070
* Long running activities should regularly heartbeat and handle cancellation
10561071
* Activities can only have positional arguments. Best practice is to only take a single argument that is an
@@ -1066,19 +1081,8 @@ Some things to note about activity definitions:
10661081

10671082
#### Types of Activities
10681083

1069-
There are 3 types of activity callables accepted and described below: asynchronous, synchronous multithreaded, and
1070-
synchronous multiprocess/other. Only positional parameters are allowed in activity callables.
1071-
1072-
##### Asynchronous Activities
1073-
1074-
Asynchronous activities, i.e. functions using `async def`, are the recommended activity type. When using asynchronous
1075-
activities no special worker parameters are needed.
1076-
1077-
Cancellation for asynchronous activities is done via
1078-
[`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). This means that
1079-
`asyncio.CancelledError` will be raised (and can be caught, but it is not recommended). A non-local activity must
1080-
heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and
1081-
"Heartbeating and Cancellation" later).
1084+
There are 3 types of activity callables accepted and described below: synchronous multithreaded, synchronous
1085+
multiprocess/other, and asynchronous. Only positional parameters are allowed in activity callables.
10821086

10831087
##### Synchronous Activities
10841088

@@ -1102,8 +1106,9 @@ will fail and shutdown.
11021106
###### Synchronous Multithreaded Activities
11031107

11041108
If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities
1105-
are considered multithreaded activities. Besides `activity_executor`, no other worker parameters are required for
1106-
synchronous multithreaded activities.
1109+
are considered multithreaded activities. If `max_workers` is not set to at least the worker's
1110+
`max_concurrent_activities` setting a warning will be issued. Besides `activity_executor`, no other worker parameters
1111+
are required for synchronous multithreaded activities.
11071112

11081113
By default, cancellation of a synchronous multithreaded activity is done via a `temporalio.exceptions.CancelledError`
11091114
thrown into the activity thread. Activities that do not wish to have cancellation thrown can set
@@ -1118,6 +1123,8 @@ there is a return statement within, it will throw the cancellation if there was
11181123

11191124
If `activity_executor` is set to an instance of `concurrent.futures.Executor` that is _not_
11201125
`concurrent.futures.ThreadPoolExecutor`, then the synchronous activities are considered multiprocess/other activities.
1126+
Users should prefer threaded activities over multiprocess ones since, among other reasons, threaded activities can raise
1127+
on cancellation.
11211128

11221129
These require special primitives for heartbeating and cancellation. The `shared_state_manager` worker parameter must be
11231130
set to an instance of `temporalio.worker.SharedStateManager`. The most common implementation can be created by passing a
@@ -1127,6 +1134,20 @@ set to an instance of `temporalio.worker.SharedStateManager`. The most common im
11271134
Also, all of these activity functions must be
11281135
["picklable"](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled).
11291136

1137+
##### Asynchronous Activities
1138+
1139+
Asynchronous activities are functions defined with `async def`. Asynchronous activities are often much more performant
1140+
than synchronous ones. When using asynchronous activities no special worker parameters are needed.
1141+
1142+
**⚠️ WARNING: Do not block the thread in `async def` Python functions. This can stop the processing of the rest of the
1143+
Temporal.**
1144+
1145+
Cancellation for asynchronous activities is done via
1146+
[`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). This means that
1147+
`asyncio.CancelledError` will be raised (and can be caught, but it is not recommended). A non-local activity must
1148+
heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and
1149+
"Heartbeating and Cancellation" later).
1150+
11301151
#### Activity Context
11311152

11321153
During activity execution, an implicit activity context is set as a
@@ -1155,7 +1176,7 @@ occurs. Synchronous activities cannot call any of the `async` functions.
11551176
In order for a non-local activity to be notified of cancellation requests, it must be given a `heartbeat_timeout` at
11561177
invocation time and invoke `temporalio.activity.heartbeat()` inside the activity. It is strongly recommended that all
11571178
but the fastest executing activities call this function regularly. "Types of Activities" has specifics on cancellation
1158-
for asynchronous and synchronous activities.
1179+
for synchronous and asynchronous activities.
11591180

11601181
In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server
11611182
for retrieval during activity retry. If an activity calls `temporalio.activity.heartbeat(123, 456)` and then fails and

temporalio/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ class Client:
8080
than where it was created, make sure the event loop where it was created is
8181
captured, and then call :py:func:`asyncio.run_coroutine_threadsafe` with the
8282
client call and that event loop.
83+
84+
Clients do not work across forks since runtimes do not work across forks.
8385
"""
8486

8587
@staticmethod

temporalio/runtime.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ class Runtime:
2525
Users are encouraged to use :py:meth:`default`. It can be set with
2626
:py:meth:`set_default`. Every time a new runtime is created, a new internal
2727
thread pool is created.
28+
29+
Runtimes do not work across forks.
2830
"""
2931

3032
@staticmethod

temporalio/worker/_worker.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import hashlib
88
import logging
99
import sys
10+
import warnings
1011
from datetime import timedelta
1112
from typing import Any, Awaitable, Callable, List, Optional, Sequence, Type, cast
1213

@@ -93,11 +94,14 @@ def __init__(
9394
workflows: Set of workflow classes decorated with
9495
:py:func:`@workflow.defn<temporalio.workflow.defn>`.
9596
activity_executor: Concurrent executor to use for non-async
96-
activities. This is required if any activities are non-async. If
97-
this is a :py:class:`concurrent.futures.ProcessPoolExecutor`,
98-
all non-async activities must be picklable. Note, a broken
99-
executor failure from this executor will cause the worker to
100-
fail and shutdown.
97+
activities. This is required if any activities are non-async.
98+
:py:class:`concurrent.futures.ThreadPoolExecutor` is
99+
recommended. If this is a
100+
:py:class:`concurrent.futures.ProcessPoolExecutor`, all
101+
non-async activities must be picklable. ``max_workers`` on the
102+
executor should at least be ``max_concurrent_activities`` or a
103+
warning is issued. Note, a broken-executor failure from this
104+
executor will cause the worker to fail and shutdown.
101105
workflow_task_executor: Thread pool executor for workflow tasks. If
102106
this is not present, a new
103107
:py:class:`concurrent.futures.ThreadPoolExecutor` will be
@@ -262,6 +266,17 @@ def __init__(
262266
self._activity_worker: Optional[_ActivityWorker] = None
263267
runtime = bridge_client.config.runtime or temporalio.runtime.Runtime.default()
264268
if activities:
269+
# Issue warning here if executor max_workers is lower than max
270+
# concurrent activities. We do this here instead of in
271+
# _ActivityWorker so the stack level is predictable.
272+
max_workers = getattr(activity_executor, "_max_workers", None)
273+
if isinstance(max_workers, int) and max_workers < max_concurrent_activities:
274+
warnings.warn(
275+
f"Worker max_concurrent_activities is {max_concurrent_activities} "
276+
+ f"but activity_executor's max_workers is only {max_workers}",
277+
stacklevel=2,
278+
)
279+
265280
self._activity_worker = _ActivityWorker(
266281
bridge_worker=lambda: self._bridge_worker,
267282
task_queue=task_queue,

0 commit comments

Comments
 (0)