Skip to content

Commit b75a567

Browse files
authored
Allow converter failures to fail workflow and other minor things (#329)
1 parent 6a3662c commit b75a567

File tree

7 files changed

+115
-41
lines changed

7 files changed

+115
-41
lines changed

.github/workflows/ci.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ jobs:
2525
runsOn: buildjet-4vcpu-ubuntu-2204-arm
2626
runs-on: ${{ matrix.runsOn || matrix.os }}
2727
steps:
28-
- name: Print build information
29-
run: "echo head_ref: ${{ github.head_ref }}, ref: ${{ github.ref }}, os: ${{ matrix.os }}, python: ${{ matrix.python }}"
3028
- uses: actions/checkout@v2
3129
with:
3230
submodules: recursive

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,8 @@ This notably doesn't include any `date`, `time`, or `datetime` objects as they m
306306
Users are strongly encouraged to use a single `dataclass` for parameter and return types so fields with defaults can be
307307
easily added without breaking compatibility.
308308

309-
Classes with generics may not have the generics properly resolved. The current implementation, similar to Pydantic, does
310-
not have generic type resolution. Users should use concrete types.
309+
Classes with generics may not have the generics properly resolved. The current implementation does not have generic
310+
type resolution. Users should use concrete types.
311311

312312
##### Custom Type Data Conversion
313313

poetry.lock

Lines changed: 11 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ protoc-wheel-0 = "^21.1"
5151
psutil = "^5.9.3"
5252
pydantic = "^1.9.1"
5353
pydocstyle = "^6.1.1"
54-
# TODO(cretz): Update when https://github.com/twisted/pydoctor/pull/595 released
55-
# pydoctor = "^22.5.1"
56-
pydoctor = { git = "https://github.com/cretz/pydoctor.git", branch = "overloads" }
54+
pydoctor = "^23.4.1"
5755
pytest = "^7.1.2"
5856
pytest-asyncio = "^0.18.3"
5957
pytest-timeout = "^2.1.0"

temporalio/exceptions.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Common Temporal exceptions."""
22

3+
import asyncio
34
from enum import IntEnum
45
from typing import Any, Optional, Sequence, Tuple
56

@@ -322,3 +323,32 @@ def started_event_id(self) -> int:
322323
def retry_state(self) -> Optional[RetryState]:
323324
"""Retry state for this error."""
324325
return self._retry_state
326+
327+
328+
def is_cancelled_exception(exception: BaseException) -> bool:
329+
"""Check whether the given exception is considered a cancellation exception
330+
according to Temporal.
331+
332+
This is often used in a conditional of a catch clause to check whether a
333+
cancel occurred inside of a workflow. This can occur from
334+
:py:class:`asyncio.CancelledError` or :py:class:`CancelledError` or either
335+
:py:class:`ActivityError` or :py:class:`ChildWorkflowError` if either of
336+
those latter two have a :py:class:`CancelledError` cause.
337+
338+
Args:
339+
exception: Exception to check.
340+
341+
Returns:
342+
True if a cancelled exception, false if not.
343+
"""
344+
return (
345+
isinstance(exception, asyncio.CancelledError)
346+
or isinstance(exception, CancelledError)
347+
or (
348+
(
349+
isinstance(exception, ActivityError)
350+
or isinstance(exception, ChildWorkflowError)
351+
)
352+
and isinstance(exception.cause, CancelledError)
353+
)
354+
)

temporalio/worker/_workflow_instance.py

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ def activate(
259259
self._time_ns = act.timestamp.ToNanoseconds()
260260
self._is_replaying = act.is_replaying
261261

262+
activation_err: Optional[Exception] = None
262263
try:
263264
# Split into job sets with patches, then signals, then non-queries, then
264265
# queries
@@ -287,7 +288,17 @@ def activate(
287288
# be checked in patch jobs (first index) or query jobs (last
288289
# index).
289290
self._run_once(check_conditions=index == 1 or index == 2)
291+
except temporalio.exceptions.FailureError as err:
292+
# We want failure errors during activation, like those that can
293+
# happen during payload conversion, to fail the workflow not the
294+
# task
295+
try:
296+
self._set_workflow_failure(err)
297+
except Exception as inner_err:
298+
activation_err = inner_err
290299
except Exception as err:
300+
activation_err = err
301+
if activation_err:
291302
logger.warning(
292303
f"Failed activation on workflow {self._info.workflow_type} with ID {self._info.workflow_id} and run ID {self._info.run_id}",
293304
exc_info=True,
@@ -296,7 +307,7 @@ def activate(
296307
self._current_completion.failed.failure.SetInParent()
297308
try:
298309
self._failure_converter.to_failure(
299-
err,
310+
activation_err,
300311
self._payload_converter,
301312
self._current_completion.failed.failure,
302313
)
@@ -1134,6 +1145,9 @@ def _convert_payloads(
11341145
payloads,
11351146
type_hints=types,
11361147
)
1148+
except temporalio.exceptions.FailureError:
1149+
# Don't wrap payload conversion errors that would fail the workflow
1150+
raise
11371151
except Exception as err:
11381152
raise RuntimeError("Failed decoding arguments") from err
11391153

@@ -1227,33 +1241,26 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
12271241
# cancel later on will show the workflow as cancelled. But this is
12281242
# a Temporal limitation in that cancellation is a state not an
12291243
# event.
1230-
if self._cancel_requested and (
1231-
isinstance(err, temporalio.exceptions.CancelledError)
1232-
or (
1233-
(
1234-
isinstance(err, temporalio.exceptions.ActivityError)
1235-
or isinstance(err, temporalio.exceptions.ChildWorkflowError)
1236-
)
1237-
and isinstance(err.cause, temporalio.exceptions.CancelledError)
1238-
)
1244+
if self._cancel_requested and temporalio.exceptions.is_cancelled_exception(
1245+
err
12391246
):
12401247
self._add_command().cancel_workflow_execution.SetInParent()
12411248
elif isinstance(err, temporalio.exceptions.FailureError):
12421249
# All other failure errors fail the workflow
1243-
failure = self._add_command().fail_workflow_execution.failure
1244-
failure.SetInParent()
1245-
try:
1246-
self._failure_converter.to_failure(
1247-
err, self._payload_converter, failure
1248-
)
1249-
except Exception as inner_err:
1250-
raise ValueError(
1251-
"Failed converting workflow exception"
1252-
) from inner_err
1250+
self._set_workflow_failure(err)
12531251
else:
12541252
# All other exceptions fail the task
12551253
self._current_activation_error = err
12561254

1255+
def _set_workflow_failure(self, err: temporalio.exceptions.FailureError) -> None:
1256+
# All other failure errors fail the workflow
1257+
failure = self._add_command().fail_workflow_execution.failure
1258+
failure.SetInParent()
1259+
try:
1260+
self._failure_converter.to_failure(err, self._payload_converter, failure)
1261+
except Exception as inner_err:
1262+
raise ValueError("Failed converting workflow exception") from inner_err
1263+
12571264
async def _signal_external_workflow(
12581265
self,
12591266
# Should not have seq set

tests/worker/test_workflow.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
Optional,
2121
Sequence,
2222
Tuple,
23+
Type,
2324
cast,
2425
)
2526

@@ -51,6 +52,7 @@
5152
from temporalio.converter import (
5253
DataConverter,
5354
DefaultFailureConverterWithEncodedAttributes,
55+
DefaultPayloadConverter,
5456
PayloadCodec,
5557
PayloadConverter,
5658
)
@@ -2719,3 +2721,45 @@ async def test_workflow_optional_param(client: Client):
27192721
task_queue=worker.task_queue,
27202722
)
27212723
assert result3 == OptionalParam(some_string="foo")
2724+
2725+
2726+
class ExceptionRaisingPayloadConverter(DefaultPayloadConverter):
2727+
bad_str = "bad-payload-str"
2728+
2729+
def from_payloads(
2730+
self, payloads: Sequence[Payload], type_hints: Optional[List] = None
2731+
) -> List[Any]:
2732+
# Check if any payloads contain the bad data
2733+
for payload in payloads:
2734+
if ExceptionRaisingPayloadConverter.bad_str.encode() in payload.data:
2735+
raise ApplicationError("Intentional converter failure")
2736+
return super().from_payloads(payloads, type_hints)
2737+
2738+
2739+
@workflow.defn
2740+
class ExceptionRaisingConverterWorkflow:
2741+
@workflow.run
2742+
async def run(self, some_param: str) -> str:
2743+
return some_param
2744+
2745+
2746+
async def test_exception_raising_converter_param(client: Client):
2747+
# Clone the client but change the data converter to use our converter
2748+
config = client.config()
2749+
config["data_converter"] = dataclasses.replace(
2750+
config["data_converter"],
2751+
payload_converter_class=ExceptionRaisingPayloadConverter,
2752+
)
2753+
client = Client(**config)
2754+
2755+
# Run workflow and confirm error
2756+
async with new_worker(client, ExceptionRaisingConverterWorkflow) as worker:
2757+
with pytest.raises(WorkflowFailureError) as err:
2758+
await client.execute_workflow(
2759+
ExceptionRaisingConverterWorkflow.run,
2760+
ExceptionRaisingPayloadConverter.bad_str,
2761+
id=f"workflow-{uuid.uuid4()}",
2762+
task_queue=worker.task_queue,
2763+
)
2764+
assert isinstance(err.value.cause, ApplicationError)
2765+
assert "Intentional converter failure" in str(err.value.cause)

0 commit comments

Comments
 (0)