Skip to content

Commit b9df212

Browse files
authored
Log and drop signals whose params can't be deserialized (and other error handling improvements) (#349)
Fixes #333 Fixes #318 Fixes #347
1 parent 60b72c6 commit b9df212

File tree

5 files changed

+117
-45
lines changed

5 files changed

+117
-45
lines changed

poetry.lock

Lines changed: 34 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/common.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,10 @@ def _type_hints_from_func(
260260
if (
261261
index == 0
262262
and value.name == "self"
263-
and value.annotation is inspect.Parameter.empty
263+
and (
264+
value.annotation is inspect.Parameter.empty
265+
or str(value.annotation) == "typing.Self"
266+
)
264267
):
265268
continue
266269
# Stop if non-positional or not a class

temporalio/worker/_activity.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -436,12 +436,7 @@ async def _run_activity(
436436
completion.result.completed.result.CopyFrom(
437437
(await self._data_converter.encode([result]))[0]
438438
)
439-
except (
440-
Exception,
441-
asyncio.CancelledError,
442-
temporalio.exceptions.CancelledError,
443-
temporalio.activity._CompleteAsyncError,
444-
) as err:
439+
except BaseException as err:
445440
try:
446441
if isinstance(err, temporalio.activity._CompleteAsyncError):
447442
temporalio.activity.logger.debug("Completing asynchronously")

temporalio/worker/_workflow_instance.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,9 +1231,19 @@ def _process_signal_job(
12311231
defn: temporalio.workflow._SignalDefinition,
12321232
job: temporalio.bridge.proto.workflow_activation.SignalWorkflow,
12331233
) -> None:
1234-
args = self._process_handler_args(
1235-
job.signal_name, job.input, defn.name, defn.arg_types, defn.dynamic_vararg
1236-
)
1234+
try:
1235+
args = self._process_handler_args(
1236+
job.signal_name,
1237+
job.input,
1238+
defn.name,
1239+
defn.arg_types,
1240+
defn.dynamic_vararg,
1241+
)
1242+
except Exception:
1243+
logger.exception(
1244+
f"Failed deserializing signal input for {job.signal_name}, dropping the signal"
1245+
)
1246+
return
12371247
input = HandleSignalInput(
12381248
signal=job.signal_name, args=args, headers=job.headers
12391249
)

tests/worker/test_workflow.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import queue
88
import sys
99
import threading
10+
import typing
1011
import uuid
11-
import warnings
1212
from abc import ABC, abstractmethod
1313
from dataclasses import dataclass
1414
from datetime import datetime, timedelta, timezone
@@ -493,6 +493,46 @@ async def test_workflow_signal_qnd_query_handlers_old_dynamic_style(client: Clie
493493
)
494494

495495

496+
@dataclass
497+
class BadSignalParam:
498+
some_str: str
499+
500+
501+
@workflow.defn
502+
class BadSignalParamWorkflow:
503+
def __init__(self) -> None:
504+
self._signals: List[BadSignalParam] = []
505+
506+
@workflow.run
507+
async def run(self) -> List[BadSignalParam]:
508+
await workflow.wait_condition(
509+
lambda: bool(self._signals) and self._signals[-1].some_str == "finish"
510+
)
511+
return self._signals
512+
513+
@workflow.signal
514+
async def some_signal(self, param: BadSignalParam) -> None:
515+
self._signals.append(param)
516+
517+
518+
async def test_workflow_bad_signal_param(client: Client):
519+
async with new_worker(client, BadSignalParamWorkflow) as worker:
520+
handle = await client.start_workflow(
521+
BadSignalParamWorkflow.run,
522+
id=f"workflow-{uuid.uuid4()}",
523+
task_queue=worker.task_queue,
524+
)
525+
# Send 4 signals, first and third are bad
526+
await handle.signal("some_signal", "bad")
527+
await handle.signal("some_signal", BadSignalParam(some_str="good"))
528+
await handle.signal("some_signal", 123)
529+
await handle.signal("some_signal", BadSignalParam(some_str="finish"))
530+
assert [
531+
BadSignalParam(some_str="good"),
532+
BadSignalParam(some_str="finish"),
533+
] == await handle.result()
534+
535+
496536
@workflow.defn
497537
class AsyncUtilWorkflow:
498538
def __init__(self) -> None:
@@ -3055,3 +3095,27 @@ async def test_workflow_dynamic(client: Client):
30553095
)
30563096
assert isinstance(result, DynamicWorkflowValue)
30573097
assert result == DynamicWorkflowValue("some-workflow - val1 - val2")
3098+
3099+
3100+
# typing.Self only in 3.11+
3101+
if sys.version_info >= (3, 11):
3102+
3103+
@dataclass
3104+
class AnnotatedWithSelfParam:
3105+
some_str: str
3106+
3107+
@workflow.defn
3108+
class WorkflowAnnotatedWithSelf:
3109+
@workflow.run
3110+
async def run(self: typing.Self, some_arg: AnnotatedWithSelfParam) -> str:
3111+
assert isinstance(some_arg, AnnotatedWithSelfParam)
3112+
return some_arg.some_str
3113+
3114+
async def test_workflow_annotated_with_self(client: Client):
3115+
async with new_worker(client, WorkflowAnnotatedWithSelf) as worker:
3116+
assert "foo" == await client.execute_workflow(
3117+
WorkflowAnnotatedWithSelf.run,
3118+
AnnotatedWithSelfParam(some_str="foo"),
3119+
id=f"wf-{uuid.uuid4()}",
3120+
task_queue=worker.task_queue,
3121+
)

0 commit comments

Comments
 (0)