Skip to content

Commit 6b9f554

Browse files
authored
Propagate fatal worker errors (#188)
1 parent 2546871 commit 6b9f554

File tree

5 files changed

+358
-43
lines changed

5 files changed

+358
-43
lines changed

temporalio/worker/_activity.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,8 @@ async def run(self) -> None:
130130
raise RuntimeError(f"Unrecognized activity task: {task}")
131131
except temporalio.bridge.worker.PollShutdownError:
132132
return
133-
except Exception:
134-
# Should never happen
135-
logger.exception(f"Activity runner failed")
133+
except Exception as err:
134+
raise RuntimeError("Activity worker failed") from err
136135

137136
async def shutdown(self, after_graceful_timeout: timedelta) -> None:
138137
# Set event that we're shutting down (updates all activity tasks)

temporalio/worker/_worker.py

Lines changed: 168 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import logging
99
import sys
1010
from datetime import timedelta
11-
from typing import Any, Callable, List, Optional, Sequence, Type, cast
11+
from typing import Any, Awaitable, Callable, List, Optional, Sequence, Type, cast
1212

1313
from typing_extensions import TypedDict
1414

@@ -38,7 +38,9 @@ class Worker:
3838
"""Worker to process workflows and/or activities.
3939
4040
Once created, workers can be run and shutdown explicitly via :py:meth:`run`
41-
and :py:meth:`shutdown`, or they can be used in an ``async with`` clause.
41+
and :py:meth:`shutdown`. Alternatively workers can be used in an
42+
``async with`` clause. See :py:meth:`__aenter__` and :py:meth:`__aexit__`
43+
for important details about fatal errors.
4244
"""
4345

4446
def __init__(
@@ -71,6 +73,7 @@ def __init__(
7173
graceful_shutdown_timeout: timedelta = timedelta(),
7274
shared_state_manager: Optional[SharedStateManager] = None,
7375
debug_mode: bool = False,
76+
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None,
7477
) -> None:
7578
"""Create a worker to process workflows and/or activities.
7679
@@ -163,6 +166,9 @@ def __init__(
163166
sandboxing in order to make using a debugger easier. If false
164167
but the environment variable ``TEMPORAL_DEBUG`` is truthy, this
165168
will be set to true.
169+
on_fatal_error: An async function that can handle a failure before
170+
the worker shutdown commences. This cannot stop the shutdown and
171+
any exception raised is logged and ignored.
166172
"""
167173
if not activities and not workflows:
168174
raise ValueError("At least one activity or workflow must be specified")
@@ -222,8 +228,14 @@ def __init__(
222228
graceful_shutdown_timeout=graceful_shutdown_timeout,
223229
shared_state_manager=shared_state_manager,
224230
debug_mode=debug_mode,
231+
on_fatal_error=on_fatal_error,
225232
)
226-
self._task: Optional[asyncio.Task] = None
233+
self._started = False
234+
self._shutdown_event = asyncio.Event()
235+
self._shutdown_complete_event = asyncio.Event()
236+
self._async_context_inner_task: Optional[asyncio.Task] = None
237+
self._async_context_run_task: Optional[asyncio.Task] = None
238+
self._async_context_run_exception: Optional[BaseException] = None
227239

228240
# Create activity and workflow worker
229241
self._activity_worker: Optional[_ActivityWorker] = None
@@ -314,59 +326,174 @@ def task_queue(self) -> str:
314326
"""Task queue this worker is on."""
315327
return self._config["task_queue"]
316328

317-
async def __aenter__(self) -> Worker:
318-
"""Start the worker and return self for use by ``async with``.
329+
@property
330+
def is_running(self) -> bool:
331+
"""Whether the worker is running.
319332
320-
Returns:
321-
Self.
333+
This is only ``True`` if the worker has been started and not yet
334+
shut down.
322335
"""
323-
self._start()
324-
return self
336+
return self._started and not self.is_shutdown
325337

326-
async def __aexit__(self, *args) -> None:
327-
"""Same as :py:meth:`shutdown` for use by ``async with``."""
328-
await self.shutdown()
338+
@property
339+
def is_shutdown(self) -> bool:
340+
"""Whether the worker has run and shut down.
329341
330-
async def run(self) -> None:
331-
"""Run the worker and wait on it to be shutdown."""
332-
await self._start()
342+
This is only ``True`` if the worker was once started and then shutdown.
343+
This is not necessarily ``True`` after :py:meth:`shutdown` is first
344+
called because the shutdown process can take a bit.
345+
"""
346+
return self._shutdown_complete_event.is_set()
333347

334-
def _start(self) -> asyncio.Task:
335-
if self._task:
336-
raise RuntimeError("Already started")
337-
worker_tasks: List[asyncio.Task] = []
338-
if self._activity_worker:
339-
worker_tasks.append(asyncio.create_task(self._activity_worker.run()))
340-
if self._workflow_worker:
341-
worker_tasks.append(asyncio.create_task(self._workflow_worker.run()))
342-
self._task = asyncio.create_task(asyncio.wait(worker_tasks))
343-
return self._task
348+
async def run(self) -> None:
349+
"""Run the worker and wait on it to be shut down.
344350
345-
async def shutdown(self) -> None:
346-
"""Shutdown the worker and wait until all activities have completed.
351+
This will not return until shutdown is complete. This means that
352+
activities have all completed after being told to cancel after the
353+
graceful timeout period.
347354
348-
This will initiate a shutdown and optionally wait for a grace period
349-
before sending cancels to all activities.
355+
This method will raise if there is a worker fatal error. While
356+
:py:meth:`shutdown` does not need to be invoked in this case, it is
357+
harmless to do so. Otherwise, to shut down this worker, invoke
358+
:py:meth:`shutdown`.
350359
351-
This worker should not be used in any way once this is called.
360+
Technically this worker can be shutdown by issuing a cancel to this
361+
async function assuming that it is currently running. A cancel could
362+
also cancel the shutdown process. Therefore users are encouraged to use
363+
explicit shutdown instead.
352364
"""
353-
if not self._task:
354-
raise RuntimeError("Never started")
365+
if self._started:
366+
raise RuntimeError("Already started")
367+
self._started = True
368+
369+
# Create a task that raises when a shutdown is requested
370+
async def raise_on_shutdown():
371+
try:
372+
await self._shutdown_event.wait()
373+
raise _ShutdownRequested()
374+
except asyncio.CancelledError:
375+
pass
376+
377+
tasks: List[asyncio.Task] = [asyncio.create_task(raise_on_shutdown())]
378+
# Create tasks for workers
379+
if self._activity_worker:
380+
tasks.append(asyncio.create_task(self._activity_worker.run()))
381+
if self._workflow_worker:
382+
tasks.append(asyncio.create_task(self._workflow_worker.run()))
383+
384+
# Wait for either worker or shutdown requested
385+
wait_task = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
386+
try:
387+
await asyncio.shield(wait_task)
388+
389+
# If any of the last two tasks failed, we want to re-raise that as
390+
# the exception
391+
exception = next((t.exception() for t in tasks[1:] if t.done()), None)
392+
if exception:
393+
logger.error("Worker failed, shutting down", exc_info=exception)
394+
if self._config["on_fatal_error"]:
395+
try:
396+
await self._config["on_fatal_error"](exception)
397+
except:
398+
logger.warning("Fatal error handler failed")
399+
400+
except asyncio.CancelledError as user_cancel_err:
401+
# Represents user literally calling cancel
402+
logger.info("Worker cancelled, shutting down")
403+
exception = user_cancel_err
404+
405+
# Cancel the shutdown task (safe if already done)
406+
tasks[0].cancel()
355407
graceful_timeout = self._config["graceful_shutdown_timeout"]
356408
logger.info(
357-
f"Beginning worker shutdown, will wait {graceful_timeout} before cancelling workflows/activities"
409+
f"Beginning worker shutdown, will wait {graceful_timeout} before cancelling activities"
358410
)
359411
# Start shutdown of the bridge
360412
bridge_shutdown_task = asyncio.create_task(self._bridge_worker.shutdown())
361-
# Wait for the poller loops to stop
362-
await self._task
413+
414+
# Wait for all tasks to complete (i.e. for poller loops to stop)
415+
await asyncio.wait(tasks)
416+
# Sometimes both workers throw an exception and since we only take the
417+
# first, Python may complain with "Task exception was never retrieved"
418+
# if we don't get the others. Therefore we call cancel on each task
419+
# which suppresses this.
420+
for task in tasks:
421+
task.cancel()
422+
363423
# Shutdown the activity worker (there is no workflow worker shutdown)
364424
if self._activity_worker:
365425
await self._activity_worker.shutdown(graceful_timeout)
366426
# Wait for the bridge to report everything is completed
367427
await bridge_shutdown_task
368428
# Do final shutdown
369-
await self._bridge_worker.finalize_shutdown()
429+
try:
430+
await self._bridge_worker.finalize_shutdown()
431+
except:
432+
# Ignore errors here that can arise in some tests where the bridge
433+
# worker still has a reference
434+
pass
435+
436+
# Mark as shutdown complete and re-raise exception if present
437+
self._shutdown_complete_event.set()
438+
if exception:
439+
raise exception
440+
441+
async def shutdown(self) -> None:
442+
"""Initiate a worker shutdown and wait until complete.
443+
444+
This can be called before the worker has even started and is safe for
445+
repeated invocations. It simply sets a marker informing the worker to
446+
shut down as it runs.
447+
448+
This will not return until the worker has completed shutting down.
449+
"""
450+
self._shutdown_event.set()
451+
await self._shutdown_complete_event.wait()
452+
453+
async def __aenter__(self) -> Worker:
454+
"""Start the worker and return self for use by ``async with``.
455+
456+
This is a wrapper around :py:meth:`run`. Please review that method.
457+
458+
This takes a similar approach to :py:func:`asyncio.timeout` in that it
459+
will cancel the current task if there is a fatal worker error and raise
460+
that error out of the context manager. However, if the inner async code
461+
swallows/wraps the :py:class:`asyncio.CancelledError`, the exiting
462+
portion of the context manager will not raise the fatal worker error.
463+
"""
464+
if self._async_context_inner_task:
465+
raise RuntimeError("Already started")
466+
self._async_context_inner_task = asyncio.current_task()
467+
if not self._async_context_inner_task:
468+
raise RuntimeError("Can only use async with inside a task")
469+
470+
# Start a task that runs and if there's an error, cancels the current
471+
# task and re-raises the error
472+
async def run():
473+
try:
474+
await self.run()
475+
except BaseException as err:
476+
self._async_context_run_exception = err
477+
self._async_context_inner_task.cancel()
478+
479+
self._async_context_run_task = asyncio.create_task(run())
480+
return self
481+
482+
async def __aexit__(self, exc_type: Optional[Type[BaseException]], *args) -> None:
483+
"""Same as :py:meth:`shutdown` for use by ``async with``.
484+
485+
Note, this will raise the worker fatal error if one occurred and the
486+
inner task cancellation was not inadvertently swallowed/wrapped.
487+
"""
488+
# Wait for shutdown then run complete
489+
if not self._async_context_run_task:
490+
raise RuntimeError("Never started")
491+
await self.shutdown()
492+
# Cancel our run task
493+
self._async_context_run_task.cancel()
494+
# Only re-raise our exception if present and exc_type is cancel
495+
if exc_type is asyncio.CancelledError and self._async_context_run_exception:
496+
raise self._async_context_run_exception
370497

371498

372499
class WorkerConfig(TypedDict, total=False):
@@ -399,6 +526,7 @@ class WorkerConfig(TypedDict, total=False):
399526
graceful_shutdown_timeout: timedelta
400527
shared_state_manager: Optional[SharedStateManager]
401528
debug_mode: bool
529+
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]]
402530

403531

404532
_default_build_id: Optional[str] = None
@@ -478,3 +606,7 @@ def _get_module_code(mod_name: str) -> Optional[bytes]:
478606
except Exception:
479607
pass
480608
return None
609+
610+
611+
class _ShutdownRequested(RuntimeError):
612+
pass

temporalio/worker/_workflow.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,8 @@ async def run(self) -> None:
129129
setattr(task, "__temporal_task_tag", task_tag)
130130
except temporalio.bridge.worker.PollShutdownError:
131131
pass
132-
except Exception:
133-
# Should never happen
134-
logger.exception(f"Workflow runner failed")
132+
except Exception as err:
133+
raise RuntimeError("Workflow worker failed") from err
135134
finally:
136135
# Collect all tasks and wait for them to complete
137136
our_tasks = [

tests/helpers/worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,12 @@ def __init__(self, env: WorkflowEnvironment) -> None:
230230
self.worker = Worker(
231231
env.client, task_queue=str(uuid.uuid4()), workflows=[KitchenSinkWorkflow]
232232
)
233-
self.worker._start()
233+
self.run_task = asyncio.create_task(self.worker.run())
234234

235235
@property
236236
def task_queue(self) -> str:
237237
return self.worker.task_queue
238238

239239
async def close(self):
240240
await self.worker.shutdown()
241+
await self.run_task

0 commit comments

Comments
 (0)