Skip to content

Commit 87fd193

Browse files
authored
Clarity on client thread safety and sync activity cancel (#189)
Fixes #147
1 parent 6b9f554 commit 87fd193

File tree

4 files changed

+69
-4
lines changed

4 files changed

+69
-4
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -783,8 +783,9 @@ Synchronous activities, i.e. functions that do not have `async def`, can be used
783783
activities.
784784

785785
Cancellation for synchronous activities is done in the background and the activity must choose to listen for it and
786-
react appropriately. An activity must heartbeat to receive cancellation and there are other ways to be notified about
787-
cancellation (see "Activity Context" and "Heartbeating and Cancellation" later).
786+
react appropriately. If after cancellation is obtained an unwrapped `temporalio.exceptions.CancelledError` is raised,
787+
the activity will be marked cancelled. An activity must heartbeat to receive cancellation and there are other ways to be
788+
notified about cancellation (see "Activity Context" and "Heartbeating and Cancellation" later).
788789

789790
Note, all calls from an activity to functions in the `temporalio.activity` package are powered by
790791
[contextvars](https://docs.python.org/3/library/contextvars.html). Therefore, new threads starting _inside_ of

temporalio/client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ class Client:
6565
:py:attr:`service` property provides access to a raw gRPC client. To create
6666
another client, like for a different namespace, :py:func:`Client` may be
6767
directly instantiated with a :py:attr:`service` of another.
68+
69+
Clients are not thread-safe and should only be used in the event loop they
70+
are first connected in. If a client needs to be used from another thread
71+
than where it was created, make sure the event loop where it was created is
72+
captured, and then call :py:func:`asyncio.run_coroutine_threadsafe` with the
73+
client call and that event loop.
6874
"""
6975

7076
@staticmethod

temporalio/worker/_activity.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,14 +401,18 @@ async def _run_activity(
401401
except (
402402
Exception,
403403
asyncio.CancelledError,
404+
temporalio.exceptions.CancelledError,
404405
temporalio.activity._CompleteAsyncError,
405406
) as err:
406407
try:
407408
if isinstance(err, temporalio.activity._CompleteAsyncError):
408409
temporalio.activity.logger.debug("Completing asynchronously")
409410
completion.result.will_complete_async.SetInParent()
410411
elif (
411-
isinstance(err, asyncio.CancelledError)
412+
isinstance(
413+
err,
414+
(asyncio.CancelledError, temporalio.exceptions.CancelledError),
415+
)
412416
and running_activity.cancelled_due_to_heartbeat_error
413417
):
414418
err = running_activity.cancelled_due_to_heartbeat_error
@@ -419,7 +423,10 @@ async def _run_activity(
419423
err, completion.result.failed.failure
420424
)
421425
elif (
422-
isinstance(err, asyncio.CancelledError)
426+
isinstance(
427+
err,
428+
(asyncio.CancelledError, temporalio.exceptions.CancelledError),
429+
)
423430
and running_activity.cancelled_by_request
424431
):
425432
temporalio.activity.logger.debug("Completing as cancelled")

tests/worker/test_activity.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,31 @@ def wait_cancel() -> str:
283283
assert result.result == "Cancelled"
284284

285285

286+
async def test_sync_activity_thread_cancel_uncaught(
287+
client: Client, worker: ExternalWorker
288+
):
289+
@activity.defn
290+
def wait_cancel() -> str:
291+
while not activity.is_cancelled():
292+
time.sleep(1)
293+
activity.heartbeat()
294+
raise CancelledError("Cancelled")
295+
296+
with pytest.raises(WorkflowFailureError) as err:
297+
with concurrent.futures.ThreadPoolExecutor() as executor:
298+
await _execute_workflow_with_activity(
299+
client,
300+
worker,
301+
wait_cancel,
302+
cancel_after_ms=100,
303+
wait_for_cancellation=True,
304+
heartbeat_timeout_ms=3000,
305+
worker_config={"activity_executor": executor},
306+
)
307+
assert isinstance(err.value.cause, ActivityError)
308+
assert isinstance(err.value.cause.cause, CancelledError)
309+
310+
286311
@activity.defn
287312
def picklable_activity_wait_cancel() -> str:
288313
while not activity.is_cancelled():
@@ -305,6 +330,32 @@ async def test_sync_activity_process_cancel(client: Client, worker: ExternalWork
305330
assert result.result == "Cancelled"
306331

307332

333+
@activity.defn
334+
def picklable_activity_raise_cancel() -> str:
335+
while not activity.is_cancelled():
336+
time.sleep(1)
337+
activity.heartbeat()
338+
raise CancelledError("Cancelled")
339+
340+
341+
async def test_sync_activity_process_cancel_uncaught(
342+
client: Client, worker: ExternalWorker
343+
):
344+
with pytest.raises(WorkflowFailureError) as err:
345+
with concurrent.futures.ProcessPoolExecutor() as executor:
346+
result = await _execute_workflow_with_activity(
347+
client,
348+
worker,
349+
picklable_activity_raise_cancel,
350+
cancel_after_ms=100,
351+
wait_for_cancellation=True,
352+
heartbeat_timeout_ms=3000,
353+
worker_config={"activity_executor": executor},
354+
)
355+
assert isinstance(err.value.cause, ActivityError)
356+
assert isinstance(err.value.cause.cause, CancelledError)
357+
358+
308359
async def test_activity_does_not_exist(client: Client, worker: ExternalWorker):
309360
@activity.defn
310361
async def say_hello(name: str) -> str:

0 commit comments

Comments
 (0)