Skip to content

Commit fdf8b65

Browse files
authored
Fail activity worker on broken executor (#253)
Fixes #245
1 parent ddbb92f commit fdf8b65

File tree

4 files changed

+94
-3
lines changed

4 files changed

+94
-3
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,9 @@ Note, all calls from an activity to functions in the `temporalio.activity` packa
938938
activities must `copy_context()` and then `.run()` manually to ensure `temporalio.activity` calls like `heartbeat` still
939939
function in the new threads.
940940

941+
If any activity ever throws a `concurrent.futures.BrokenExecutor`, the failure is consisted unrecoverable and the worker
942+
will fail and shutdown.
943+
941944
###### Synchronous Multithreaded Activities
942945

943946
If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities

temporalio/worker/_activity.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,18 @@
1616
from contextlib import contextmanager
1717
from dataclasses import dataclass
1818
from datetime import datetime, timedelta, timezone
19-
from typing import Any, Callable, Dict, Iterator, Optional, Sequence, Tuple, Type, Union
19+
from typing import (
20+
Any,
21+
Callable,
22+
Dict,
23+
Iterator,
24+
NoReturn,
25+
Optional,
26+
Sequence,
27+
Tuple,
28+
Type,
29+
Union,
30+
)
2031

2132
import google.protobuf.duration_pb2
2233
import google.protobuf.timestamp_pb2
@@ -64,6 +75,7 @@ def __init__(
6475
self._running_activities: Dict[bytes, _RunningActivity] = {}
6576
self._data_converter = data_converter
6677
self._interceptors = interceptors
78+
self._fail_worker_exception_queue: asyncio.Queue[Exception] = asyncio.Queue()
6779
# Lazily created on first activity
6880
self._worker_shutdown_event: Optional[
6981
temporalio.activity._CompositeEvent
@@ -111,11 +123,26 @@ def __init__(
111123
self._activities[defn.name] = defn
112124

113125
async def run(self) -> None:
126+
# Create a task that fails when we get a failure on the queue
127+
async def raise_from_queue() -> NoReturn:
128+
raise await self._fail_worker_exception_queue.get()
129+
130+
exception_task = asyncio.create_task(raise_from_queue())
131+
114132
# Continually poll for activity work
115133
while True:
116134
try:
117135
# Poll for a task
118-
task = await self._bridge_worker().poll_activity_task()
136+
poll_task = asyncio.create_task(
137+
self._bridge_worker().poll_activity_task()
138+
)
139+
await asyncio.wait([poll_task, exception_task], return_when=asyncio.FIRST_COMPLETED) # type: ignore
140+
# If exception for failing the worker happened, raise it.
141+
# Otherwise, the poll succeeded.
142+
if exception_task.done():
143+
poll_task.cancel()
144+
await exception_task
145+
task = await poll_task
119146

120147
if task.HasField("start"):
121148
# Cancelled event and sync field will be updated inside
@@ -131,8 +158,10 @@ async def run(self) -> None:
131158
else:
132159
raise RuntimeError(f"Unrecognized activity task: {task}")
133160
except temporalio.bridge.worker.PollShutdownError:
161+
exception_task.cancel()
134162
return
135163
except Exception as err:
164+
exception_task.cancel()
136165
raise RuntimeError("Activity worker failed") from err
137166

138167
async def shutdown(self, after_graceful_timeout: timedelta) -> None:
@@ -465,6 +494,10 @@ async def _run_activity(
465494
await self._data_converter.encode_failure(
466495
err, completion.result.failed.failure
467496
)
497+
498+
# For broken executors, we have to fail the entire worker
499+
if isinstance(err, concurrent.futures.BrokenExecutor):
500+
self._fail_worker_exception_queue.put_nowait(err)
468501
except Exception as inner_err:
469502
temporalio.activity.logger.exception(
470503
f"Exception handling failed, original error: {err}"

temporalio/worker/_worker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ def __init__(
9393
activity_executor: Concurrent executor to use for non-async
9494
activities. This is required if any activities are non-async. If
9595
this is a :py:class:`concurrent.futures.ProcessPoolExecutor`,
96-
all non-async activities must be picklable.
96+
all non-async activities must be picklable. Note, a broken
97+
executor failure from this executor will cause the worker to
98+
fail and shutdown.
9799
workflow_task_executor: Thread pool executor for workflow tasks. If
98100
this is not present, a new
99101
:py:class:`concurrent.futures.ThreadPoolExecutor` will be

tests/worker/test_activity.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
import logging
44
import logging.handlers
55
import multiprocessing
6+
import os
67
import queue
8+
import signal
79
import threading
810
import time
911
import uuid
12+
from concurrent.futures.process import BrokenProcessPool
1013
from dataclasses import dataclass
1114
from datetime import datetime, timedelta, timezone
1215
from typing import Any, Callable, List, NoReturn, Optional, Sequence
@@ -919,6 +922,56 @@ async def test_sync_activity_process_worker_shutdown_graceful(
919922
assert "Worker graceful shutdown" == await handle.result()
920923

921924

925+
@activity.defn
926+
def kill_my_process() -> str:
927+
os.kill(os.getpid(), getattr(signal, "SIGKILL", -9))
928+
return "does not get here"
929+
930+
931+
async def test_sync_activity_process_executor_crash(
932+
client: Client, worker: ExternalWorker
933+
):
934+
act_task_queue = str(uuid.uuid4())
935+
with concurrent.futures.ProcessPoolExecutor() as executor:
936+
act_worker = Worker(
937+
client,
938+
task_queue=act_task_queue,
939+
activities=[kill_my_process],
940+
activity_executor=executor,
941+
graceful_shutdown_timeout=timedelta(seconds=2),
942+
shared_state_manager=_default_shared_state_manager,
943+
)
944+
act_worker_task = asyncio.create_task(act_worker.run())
945+
946+
# Confirm workflow failure with broken pool
947+
with pytest.raises(WorkflowFailureError) as workflow_err:
948+
await client.execute_workflow(
949+
"kitchen_sink",
950+
KSWorkflowParams(
951+
actions=[
952+
KSAction(
953+
execute_activity=KSExecuteActivityAction(
954+
name="kill_my_process",
955+
task_queue=act_task_queue,
956+
heartbeat_timeout_ms=30000,
957+
)
958+
)
959+
]
960+
),
961+
id=str(uuid.uuid4()),
962+
task_queue=worker.task_queue,
963+
)
964+
assert isinstance(workflow_err.value.cause, ActivityError)
965+
assert isinstance(workflow_err.value.cause.cause, ApplicationError)
966+
assert workflow_err.value.cause.cause.type == "BrokenProcessPool"
967+
968+
# Also confirm that activity worker fails unrecoverably
969+
with pytest.raises(RuntimeError) as worker_err:
970+
await asyncio.wait_for(act_worker_task, 10)
971+
assert str(worker_err.value) == "Activity worker failed"
972+
assert isinstance(worker_err.value.__cause__, BrokenProcessPool)
973+
974+
922975
class AsyncActivityWrapper:
923976
def __init__(self) -> None:
924977
self._info: Optional[activity.Info] = None

0 commit comments

Comments
 (0)