Skip to content

Commit 0bb94f8

Browse files
authored
Ability for certain task failure types to fail workflow (#516)
Fixes #446
1 parent ecd703d commit 0bb94f8

File tree

13 files changed

+500
-68
lines changed

13 files changed

+500
-68
lines changed

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,8 @@ While running in a workflow, in addition to features documented elsewhere, the f
722722

723723
#### Exceptions
724724

725-
* Workflows can raise exceptions to fail the workflow or the "workflow task" (i.e. suspend the workflow retrying).
725+
* Workflows/updates can raise exceptions to fail the workflow or the "workflow task" (i.e. suspend the workflow
726+
in a retrying state).
726727
* Exceptions that are instances of `temporalio.exceptions.FailureError` will fail the workflow with that exception
727728
* For failing the workflow explicitly with a user exception, use `temporalio.exceptions.ApplicationError`. This can
728729
be marked non-retryable or include details as needed.
@@ -732,6 +733,13 @@ While running in a workflow, in addition to features documented elsewhere, the f
732733
fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use an
733734
`ApplicationError` as mentioned above.
734735

736+
This default can be changed by providing a list of exception types to `workflow_failure_exception_types` when creating a
737+
`Worker` or `failure_exception_types` on the `@workflow.defn` decorator. If a workflow-thrown exception is an instance
738+
of any type in either list, it will fail the workflow instead of the task. This means a value of `[Exception]` will
739+
cause every exception to fail the workflow instead of the task. Also, as a special case, if
740+
`temporalio.workflow.NondeterminismError` (or any superclass of it) is set, non-deterministic exceptions will fail the
741+
workflow. WARNING: These settings are experimental.
742+
735743
#### External Workflows
736744

737745
* `workflow.get_external_workflow_handle()` inside a workflow returns a handle to interact with another workflow

temporalio/bridge/src/worker.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ use prost::Message;
22
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError};
33
use pyo3::prelude::*;
44
use pyo3::types::{PyBytes, PyTuple};
5+
use std::collections::HashMap;
6+
use std::collections::HashSet;
57
use std::sync::Arc;
68
use std::time::Duration;
79
use temporal_sdk_core::api::errors::{PollActivityError, PollWfError};
810
use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput};
11+
use temporal_sdk_core_api::errors::WorkflowErrorType;
912
use temporal_sdk_core_api::Worker;
1013
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
1114
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
@@ -45,6 +48,8 @@ pub struct WorkerConfig {
4548
max_task_queue_activities_per_second: Option<f64>,
4649
graceful_shutdown_period_millis: u64,
4750
use_worker_versioning: bool,
51+
nondeterminism_as_workflow_fail: bool,
52+
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
4853
}
4954

5055
macro_rules! enter_sync {
@@ -234,6 +239,22 @@ impl TryFrom<WorkerConfig> for temporal_sdk_core::WorkerConfig {
234239
// always set it even if 0.
235240
.graceful_shutdown_period(Duration::from_millis(conf.graceful_shutdown_period_millis))
236241
.use_worker_versioning(conf.use_worker_versioning)
242+
.workflow_failure_errors(if conf.nondeterminism_as_workflow_fail {
243+
HashSet::from([WorkflowErrorType::Nondeterminism])
244+
} else {
245+
HashSet::new()
246+
})
247+
.workflow_types_to_failure_errors(
248+
conf.nondeterminism_as_workflow_fail_for_types
249+
.iter()
250+
.map(|s| {
251+
(
252+
s.to_owned(),
253+
HashSet::from([WorkflowErrorType::Nondeterminism]),
254+
)
255+
})
256+
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
257+
)
237258
.build()
238259
.map_err(|err| PyValueError::new_err(format!("Invalid worker config: {}", err)))
239260
}

temporalio/bridge/worker.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,16 @@
66
from __future__ import annotations
77

88
from dataclasses import dataclass
9-
from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Sequence, Tuple
9+
from typing import (
10+
TYPE_CHECKING,
11+
Awaitable,
12+
Callable,
13+
List,
14+
Optional,
15+
Sequence,
16+
Set,
17+
Tuple,
18+
)
1019

1120
import google.protobuf.internal.containers
1221
from typing_extensions import TypeAlias
@@ -48,6 +57,8 @@ class WorkerConfig:
4857
max_task_queue_activities_per_second: Optional[float]
4958
graceful_shutdown_period_millis: int
5059
use_worker_versioning: bool
60+
nondeterminism_as_workflow_fail: bool
61+
nondeterminism_as_workflow_fail_for_types: Set[str]
5162

5263

5364
class Worker:

temporalio/worker/_replayer.py

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def __init__(
4343
interceptors: Sequence[Interceptor] = [],
4444
build_id: Optional[str] = None,
4545
identity: Optional[str] = None,
46+
workflow_failure_exception_types: Sequence[Type[BaseException]] = [],
4647
debug_mode: bool = False,
4748
runtime: Optional[temporalio.runtime.Runtime] = None,
4849
disable_safe_workflow_eviction: bool = False,
@@ -66,6 +67,7 @@ def __init__(
6667
interceptors=interceptors,
6768
build_id=build_id,
6869
identity=identity,
70+
workflow_failure_exception_types=workflow_failure_exception_types,
6971
debug_mode=debug_mode,
7072
runtime=runtime,
7173
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
@@ -153,35 +155,6 @@ async def workflow_replay_iterator(
153155
An async iterator that returns replayed workflow results as they are
154156
replayed.
155157
"""
156-
# Create bridge worker
157-
task_queue = f"replay-{self._config['build_id']}"
158-
runtime = self._config["runtime"] or temporalio.runtime.Runtime.default()
159-
bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay(
160-
runtime._core_runtime,
161-
temporalio.bridge.worker.WorkerConfig(
162-
namespace=self._config["namespace"],
163-
task_queue=task_queue,
164-
build_id=self._config["build_id"] or load_default_build_id(),
165-
identity_override=self._config["identity"],
166-
# All values below are ignored but required by Core
167-
max_cached_workflows=2,
168-
max_outstanding_workflow_tasks=2,
169-
max_outstanding_activities=1,
170-
max_outstanding_local_activities=1,
171-
max_concurrent_workflow_task_polls=1,
172-
nonsticky_to_sticky_poll_ratio=1,
173-
max_concurrent_activity_task_polls=1,
174-
no_remote_activities=True,
175-
sticky_queue_schedule_to_start_timeout_millis=1000,
176-
max_heartbeat_throttle_interval_millis=1000,
177-
default_heartbeat_throttle_interval_millis=1000,
178-
max_activities_per_second=None,
179-
max_task_queue_activities_per_second=None,
180-
graceful_shutdown_period_millis=0,
181-
use_worker_versioning=False,
182-
),
183-
)
184-
185158
try:
186159
last_replay_failure: Optional[Exception]
187160
last_replay_complete = asyncio.Event()
@@ -212,29 +185,62 @@ def on_eviction_hook(
212185
last_replay_failure = None
213186
last_replay_complete.set()
214187

215-
# Start the worker
216-
workflow_worker_task = asyncio.create_task(
217-
_WorkflowWorker(
218-
bridge_worker=lambda: bridge_worker,
188+
# Create worker referencing bridge worker
189+
bridge_worker: temporalio.bridge.worker.Worker
190+
task_queue = f"replay-{self._config['build_id']}"
191+
runtime = self._config["runtime"] or temporalio.runtime.Runtime.default()
192+
workflow_worker = _WorkflowWorker(
193+
bridge_worker=lambda: bridge_worker,
194+
namespace=self._config["namespace"],
195+
task_queue=task_queue,
196+
workflows=self._config["workflows"],
197+
workflow_task_executor=self._config["workflow_task_executor"],
198+
workflow_runner=self._config["workflow_runner"],
199+
unsandboxed_workflow_runner=self._config["unsandboxed_workflow_runner"],
200+
data_converter=self._config["data_converter"],
201+
interceptors=self._config["interceptors"],
202+
workflow_failure_exception_types=self._config[
203+
"workflow_failure_exception_types"
204+
],
205+
debug_mode=self._config["debug_mode"],
206+
metric_meter=runtime.metric_meter,
207+
on_eviction_hook=on_eviction_hook,
208+
disable_eager_activity_execution=False,
209+
disable_safe_eviction=self._config["disable_safe_workflow_eviction"],
210+
)
211+
# Create bridge worker
212+
bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay(
213+
runtime._core_runtime,
214+
temporalio.bridge.worker.WorkerConfig(
219215
namespace=self._config["namespace"],
220216
task_queue=task_queue,
221-
workflows=self._config["workflows"],
222-
workflow_task_executor=self._config["workflow_task_executor"],
223-
workflow_runner=self._config["workflow_runner"],
224-
unsandboxed_workflow_runner=self._config[
225-
"unsandboxed_workflow_runner"
226-
],
227-
data_converter=self._config["data_converter"],
228-
interceptors=self._config["interceptors"],
229-
debug_mode=self._config["debug_mode"],
230-
metric_meter=runtime.metric_meter,
231-
on_eviction_hook=on_eviction_hook,
232-
disable_eager_activity_execution=False,
233-
disable_safe_eviction=self._config[
234-
"disable_safe_workflow_eviction"
235-
],
236-
).run()
217+
build_id=self._config["build_id"] or load_default_build_id(),
218+
identity_override=self._config["identity"],
219+
# Need to tell core whether we want to consider all
220+
# non-determinism exceptions as workflow fail, and whether we do
221+
# per workflow type
222+
nondeterminism_as_workflow_fail=workflow_worker.nondeterminism_as_workflow_fail(),
223+
nondeterminism_as_workflow_fail_for_types=workflow_worker.nondeterminism_as_workflow_fail_for_types(),
224+
# All values below are ignored but required by Core
225+
max_cached_workflows=2,
226+
max_outstanding_workflow_tasks=2,
227+
max_outstanding_activities=1,
228+
max_outstanding_local_activities=1,
229+
max_concurrent_workflow_task_polls=1,
230+
nonsticky_to_sticky_poll_ratio=1,
231+
max_concurrent_activity_task_polls=1,
232+
no_remote_activities=True,
233+
sticky_queue_schedule_to_start_timeout_millis=1000,
234+
max_heartbeat_throttle_interval_millis=1000,
235+
default_heartbeat_throttle_interval_millis=1000,
236+
max_activities_per_second=None,
237+
max_task_queue_activities_per_second=None,
238+
graceful_shutdown_period_millis=0,
239+
use_worker_versioning=False,
240+
),
237241
)
242+
# Start worker
243+
workflow_worker_task = asyncio.create_task(workflow_worker.run())
238244

239245
# Yield iterator
240246
async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]:
@@ -301,6 +307,7 @@ class ReplayerConfig(TypedDict, total=False):
301307
interceptors: Sequence[Interceptor]
302308
build_id: Optional[str]
303309
identity: Optional[str]
310+
workflow_failure_exception_types: Sequence[Type[BaseException]]
304311
debug_mode: bool
305312
runtime: Optional[temporalio.runtime.Runtime]
306313
disable_safe_workflow_eviction: bool

temporalio/worker/_worker.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def __init__(
7373
max_activities_per_second: Optional[float] = None,
7474
max_task_queue_activities_per_second: Optional[float] = None,
7575
graceful_shutdown_timeout: timedelta = timedelta(),
76+
workflow_failure_exception_types: Sequence[Type[BaseException]] = [],
7677
shared_state_manager: Optional[SharedStateManager] = None,
7778
debug_mode: bool = False,
7879
disable_eager_activity_execution: bool = False,
@@ -167,6 +168,13 @@ def __init__(
167168
graceful_shutdown_timeout: Amount of time after shutdown is called
168169
that activities are given to complete before their tasks are
169170
cancelled.
171+
workflow_failure_exception_types: The types of exceptions that, if a
172+
workflow-thrown exception extends, will cause the
173+
workflow/update to fail instead of suspending the workflow via
174+
task failure. These are applied in addition to ones set on the
175+
``workflow.defn`` decorator. If ``Exception`` is set, it
176+
effectively will fail a workflow/update in all user exception
177+
cases. WARNING: This setting is experimental.
170178
shared_state_manager: Used for obtaining cross-process friendly
171179
synchronization primitives. This is required for non-async
172180
activities where the activity_executor is not a
@@ -258,6 +266,7 @@ def __init__(
258266
max_activities_per_second=max_activities_per_second,
259267
max_task_queue_activities_per_second=max_task_queue_activities_per_second,
260268
graceful_shutdown_timeout=graceful_shutdown_timeout,
269+
workflow_failure_exception_types=workflow_failure_exception_types,
261270
shared_state_manager=shared_state_manager,
262271
debug_mode=debug_mode,
263272
disable_eager_activity_execution=disable_eager_activity_execution,
@@ -309,6 +318,7 @@ def __init__(
309318
unsandboxed_workflow_runner=unsandboxed_workflow_runner,
310319
data_converter=client_config["data_converter"],
311320
interceptors=interceptors,
321+
workflow_failure_exception_types=workflow_failure_exception_types,
312322
debug_mode=debug_mode,
313323
disable_eager_activity_execution=disable_eager_activity_execution,
314324
metric_meter=runtime.metric_meter,
@@ -366,6 +376,14 @@ def __init__(
366376
1000 * graceful_shutdown_timeout.total_seconds()
367377
),
368378
use_worker_versioning=use_worker_versioning,
379+
# Need to tell core whether we want to consider all
380+
# non-determinism exceptions as workflow fail, and whether we do
381+
# per workflow type
382+
nondeterminism_as_workflow_fail=self._workflow_worker is not None
383+
and self._workflow_worker.nondeterminism_as_workflow_fail(),
384+
nondeterminism_as_workflow_fail_for_types=self._workflow_worker.nondeterminism_as_workflow_fail_for_types()
385+
if self._workflow_worker
386+
else set(),
369387
),
370388
)
371389

@@ -605,6 +623,7 @@ class WorkerConfig(TypedDict, total=False):
605623
max_activities_per_second: Optional[float]
606624
max_task_queue_activities_per_second: Optional[float]
607625
graceful_shutdown_timeout: timedelta
626+
workflow_failure_exception_types: Sequence[Type[BaseException]]
608627
shared_state_manager: Optional[SharedStateManager]
609628
debug_mode: bool
610629
disable_eager_activity_execution: bool

temporalio/worker/_workflow.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88
import os
99
from datetime import timezone
10-
from typing import Callable, Dict, List, MutableMapping, Optional, Sequence, Type
10+
from typing import Callable, Dict, List, MutableMapping, Optional, Sequence, Set, Type
1111

1212
import temporalio.activity
1313
import temporalio.api.common.v1
@@ -52,6 +52,7 @@ def __init__(
5252
unsandboxed_workflow_runner: WorkflowRunner,
5353
data_converter: temporalio.converter.DataConverter,
5454
interceptors: Sequence[Interceptor],
55+
workflow_failure_exception_types: Sequence[Type[BaseException]],
5556
debug_mode: bool,
5657
disable_eager_activity_execution: bool,
5758
metric_meter: temporalio.common.MetricMeter,
@@ -89,6 +90,7 @@ def __init__(
8990
self._extern_functions.update(
9091
**_WorkflowExternFunctions(__temporal_get_metric_meter=lambda: metric_meter)
9192
)
93+
self._workflow_failure_exception_types = workflow_failure_exception_types
9294
self._running_workflows: Dict[str, WorkflowInstance] = {}
9395
self._disable_eager_activity_execution = disable_eager_activity_execution
9496
self._on_eviction_hook = on_eviction_hook
@@ -104,6 +106,11 @@ def __init__(
104106
# Keep track of workflows that could not be evicted
105107
self._could_not_evict_count = 0
106108

109+
# Set the worker-level failure exception types into the runner
110+
workflow_runner.set_worker_level_failure_exception_types(
111+
workflow_failure_exception_types
112+
)
113+
107114
# Validate and build workflow dict
108115
self._workflows: Dict[str, temporalio.workflow._Definition] = {}
109116
self._dynamic_workflow: Optional[temporalio.workflow._Definition] = None
@@ -389,8 +396,25 @@ def _create_workflow_instance(
389396
randomness_seed=start.randomness_seed,
390397
extern_functions=self._extern_functions,
391398
disable_eager_activity_execution=self._disable_eager_activity_execution,
399+
worker_level_failure_exception_types=self._workflow_failure_exception_types,
392400
)
393401
if defn.sandboxed:
394402
return self._workflow_runner.create_instance(det)
395403
else:
396404
return self._unsandboxed_workflow_runner.create_instance(det)
405+
406+
def nondeterminism_as_workflow_fail(self) -> bool:
407+
return any(
408+
issubclass(temporalio.workflow.NondeterminismError, typ)
409+
for typ in self._workflow_failure_exception_types
410+
)
411+
412+
def nondeterminism_as_workflow_fail_for_types(self) -> Set[str]:
413+
return set(
414+
k
415+
for k, v in self._workflows.items()
416+
if any(
417+
issubclass(temporalio.workflow.NondeterminismError, typ)
418+
for typ in v.failure_exception_types
419+
)
420+
)

0 commit comments

Comments
 (0)