Skip to content

Commit 929dc81

Browse files
authored
Failure converter (#185)
Fixes #142 Fixes #131 Fixes #139
1 parent a75256e commit 929dc81

File tree

17 files changed

+596
-416
lines changed

17 files changed

+596
-416
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,9 @@ other_ns_client = Client(**config)
174174

175175
Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
176176
`temporalio.converter.DataConverter` can be set via the `data_converter` client parameter. Data converters are a
177-
combination of payload converters and payload codecs. The former converts Python values to/from serialized bytes, and
178-
the latter converts bytes to bytes (e.g. for compression or encryption).
177+
combination of payload converters, payload codecs, and failure converters. Payload converters convert Python values
178+
to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption). Failure converters
179+
convert exceptions to/from serialized failures.
179180

180181
The default data converter supports converting multiple types including:
181182

temporalio/bridge/worker.py

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -219,22 +219,20 @@ async def decode_activation(
219219
await _decode_payloads(job.query_workflow.arguments, codec)
220220
elif job.HasField("resolve_activity"):
221221
if job.resolve_activity.result.HasField("cancelled"):
222-
await temporalio.exceptions.decode_failure(
223-
job.resolve_activity.result.cancelled.failure, codec
222+
await codec.decode_failure(
223+
job.resolve_activity.result.cancelled.failure
224224
)
225225
elif job.resolve_activity.result.HasField("completed"):
226226
if job.resolve_activity.result.completed.HasField("result"):
227227
await _decode_payload(
228228
job.resolve_activity.result.completed.result, codec
229229
)
230230
elif job.resolve_activity.result.HasField("failed"):
231-
await temporalio.exceptions.decode_failure(
232-
job.resolve_activity.result.failed.failure, codec
233-
)
231+
await codec.decode_failure(job.resolve_activity.result.failed.failure)
234232
elif job.HasField("resolve_child_workflow_execution"):
235233
if job.resolve_child_workflow_execution.result.HasField("cancelled"):
236-
await temporalio.exceptions.decode_failure(
237-
job.resolve_child_workflow_execution.result.cancelled.failure, codec
234+
await codec.decode_failure(
235+
job.resolve_child_workflow_execution.result.cancelled.failure
238236
)
239237
elif job.resolve_child_workflow_execution.result.HasField(
240238
"completed"
@@ -245,32 +243,28 @@ async def decode_activation(
245243
job.resolve_child_workflow_execution.result.completed.result, codec
246244
)
247245
elif job.resolve_child_workflow_execution.result.HasField("failed"):
248-
await temporalio.exceptions.decode_failure(
249-
job.resolve_child_workflow_execution.result.failed.failure, codec
246+
await codec.decode_failure(
247+
job.resolve_child_workflow_execution.result.failed.failure
250248
)
251249
elif job.HasField("resolve_child_workflow_execution_start"):
252250
if job.resolve_child_workflow_execution_start.HasField("cancelled"):
253-
await temporalio.exceptions.decode_failure(
254-
job.resolve_child_workflow_execution_start.cancelled.failure, codec
251+
await codec.decode_failure(
252+
job.resolve_child_workflow_execution_start.cancelled.failure
255253
)
256254
elif job.HasField("resolve_request_cancel_external_workflow"):
257255
if job.resolve_request_cancel_external_workflow.HasField("failure"):
258-
await temporalio.exceptions.decode_failure(
259-
job.resolve_request_cancel_external_workflow.failure, codec
256+
await codec.decode_failure(
257+
job.resolve_request_cancel_external_workflow.failure
260258
)
261259
elif job.HasField("resolve_signal_external_workflow"):
262260
if job.resolve_signal_external_workflow.HasField("failure"):
263-
await temporalio.exceptions.decode_failure(
264-
job.resolve_signal_external_workflow.failure, codec
265-
)
261+
await codec.decode_failure(job.resolve_signal_external_workflow.failure)
266262
elif job.HasField("signal_workflow"):
267263
await _decode_payloads(job.signal_workflow.input, codec)
268264
elif job.HasField("start_workflow"):
269265
await _decode_payloads(job.start_workflow.arguments, codec)
270266
if job.start_workflow.HasField("continued_failure"):
271-
await temporalio.exceptions.decode_failure(
272-
job.start_workflow.continued_failure, codec
273-
)
267+
await codec.decode_failure(job.start_workflow.continued_failure)
274268
for val in job.start_workflow.memo.fields.values():
275269
# This uses API payload not bridge payload
276270
new_payload = (await codec.decode([val]))[0]
@@ -285,7 +279,7 @@ async def encode_completion(
285279
) -> None:
286280
"""Recursively encode the given completion with the codec."""
287281
if comp.HasField("failed"):
288-
await temporalio.exceptions.encode_failure(comp.failed.failure, codec)
282+
await codec.encode_failure(comp.failed.failure)
289283
elif comp.HasField("successful"):
290284
for command in comp.successful.commands:
291285
if command.HasField("complete_workflow_execution"):
@@ -300,14 +294,10 @@ async def encode_completion(
300294
for val in command.continue_as_new_workflow_execution.memo.values():
301295
await _encode_payload(val, codec)
302296
elif command.HasField("fail_workflow_execution"):
303-
await temporalio.exceptions.encode_failure(
304-
command.fail_workflow_execution.failure, codec
305-
)
297+
await codec.encode_failure(command.fail_workflow_execution.failure)
306298
elif command.HasField("respond_to_query"):
307299
if command.respond_to_query.HasField("failed"):
308-
await temporalio.exceptions.encode_failure(
309-
command.respond_to_query.failed, codec
310-
)
300+
await codec.encode_failure(command.respond_to_query.failed)
311301
elif command.respond_to_query.HasField(
312302
"succeeded"
313303
) and command.respond_to_query.succeeded.HasField("response"):

temporalio/client.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ async def connect(
7272
target_host: str,
7373
*,
7474
namespace: str = "default",
75-
data_converter: temporalio.converter.DataConverter = temporalio.converter.default(),
75+
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
7676
interceptors: Sequence[Interceptor] = [],
7777
default_workflow_query_reject_condition: Optional[
7878
temporalio.common.QueryRejectCondition
@@ -139,7 +139,7 @@ def __init__(
139139
service_client: temporalio.service.ServiceClient,
140140
*,
141141
namespace: str = "default",
142-
data_converter: temporalio.converter.DataConverter = temporalio.converter.default(),
142+
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
143143
interceptors: Sequence[Interceptor] = [],
144144
default_workflow_query_reject_condition: Optional[
145145
temporalio.common.QueryRejectCondition
@@ -862,8 +862,8 @@ async def result(
862862
hist_run_id = fail_attr.new_execution_run_id
863863
break
864864
raise WorkflowFailureError(
865-
cause=await temporalio.exceptions.decode_failure_to_error(
866-
fail_attr.failure, self._client.data_converter
865+
cause=await self._client.data_converter.decode_failure(
866+
fail_attr.failure
867867
),
868868
)
869869
elif event.HasField("workflow_execution_canceled_event_attributes"):
@@ -1950,15 +1950,16 @@ async def __anext__(self) -> temporalio.api.history.v1.HistoryEvent:
19501950
class WorkflowFailureError(temporalio.exceptions.TemporalError):
19511951
"""Error that occurs when a workflow is unsuccessful."""
19521952

1953-
def __init__(self, *, cause: temporalio.exceptions.FailureError) -> None:
1953+
def __init__(self, *, cause: BaseException) -> None:
19541954
"""Create workflow failure error."""
19551955
super().__init__("Workflow execution failed")
19561956
self.__cause__ = cause
19571957

19581958
@property
1959-
def cause(self) -> temporalio.exceptions.FailureError:
1959+
def cause(self) -> BaseException:
19601960
"""Cause of the workflow failure."""
1961-
return cast(temporalio.exceptions.FailureError, self.__cause__)
1961+
assert self.__cause__
1962+
return self.__cause__
19621963

19631964

19641965
class WorkflowContinuedAsNewError(temporalio.exceptions.TemporalError):
@@ -2546,9 +2547,7 @@ async def complete_async_activity(self, input: CompleteAsyncActivityInput) -> No
25462547

25472548
async def fail_async_activity(self, input: FailAsyncActivityInput) -> None:
25482549
failure = temporalio.api.failure.v1.Failure()
2549-
await temporalio.exceptions.encode_exception_to_failure(
2550-
input.error, self._client.data_converter, failure
2551-
)
2550+
await self._client.data_converter.encode_failure(input.error, failure)
25522551
last_heartbeat_details = (
25532552
None
25542553
if not input.last_heartbeat_details

temporalio/contrib/opentelemetry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def __init__(
8383
default_text_map_propagator
8484
)
8585
# TODO(cretz): Should I be using the configured one at the client and activity level?
86-
self.payload_converter = temporalio.converter.default().payload_converter
86+
self.payload_converter = temporalio.converter.PayloadConverter.default
8787

8888
def intercept_client(
8989
self, next: temporalio.client.OutboundInterceptor
@@ -309,7 +309,7 @@ def __init__(self, next: temporalio.worker.WorkflowInboundInterceptor) -> None:
309309
default_text_map_propagator
310310
)
311311
# TODO(cretz): Should I be using the configured one for this workflow?
312-
self.payload_converter = temporalio.converter.default().payload_converter
312+
self.payload_converter = temporalio.converter.PayloadConverter.default
313313
# This is the context for the overall workflow, lazily created
314314
self._workflow_context_carrier: Optional[_CarrierDict] = None
315315

0 commit comments

Comments
 (0)