Skip to content

Commit 73a1673

Browse files
authored
Support activity retry delay (#571)
1 parent a5b9661 commit 73a1673

File tree

4 files changed

+70
-1
lines changed

4 files changed

+70
-1
lines changed

temporalio/converter.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
overload,
3535
)
3636

37+
import google.protobuf.duration_pb2
3738
import google.protobuf.json_format
3839
import google.protobuf.message
3940
import google.protobuf.symbol_database
@@ -843,6 +844,10 @@ def _error_to_failure(
843844
failure.application_failure_info.details.CopyFrom(
844845
payload_converter.to_payloads_wrapper(error.details)
845846
)
847+
if error.next_retry_delay:
848+
failure.application_failure_info.next_retry_delay.FromTimedelta(
849+
error.next_retry_delay
850+
)
846851
elif isinstance(error, temporalio.exceptions.TimeoutError):
847852
failure.timeout_failure_info.SetInParent()
848853
failure.timeout_failure_info.timeout_type = (
@@ -928,6 +933,7 @@ def from_failure(
928933
*payload_converter.from_payloads_wrapper(app_info.details),
929934
type=app_info.type or None,
930935
non_retryable=app_info.non_retryable,
936+
next_retry_delay=app_info.next_retry_delay.ToTimedelta(),
931937
)
932938
elif failure.HasField("timeout_failure_info"):
933939
timeout_info = failure.timeout_failure_info

temporalio/exceptions.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Common Temporal exceptions."""
22

33
import asyncio
4+
from datetime import timedelta
45
from enum import IntEnum
56
from typing import Any, Optional, Sequence, Tuple
67

@@ -78,6 +79,7 @@ def __init__(
7879
*details: Any,
7980
type: Optional[str] = None,
8081
non_retryable: bool = False,
82+
next_retry_delay: Optional[timedelta] = None,
8183
) -> None:
8284
"""Initialize an application error."""
8385
super().__init__(
@@ -88,6 +90,7 @@ def __init__(
8890
self._details = details
8991
self._type = type
9092
self._non_retryable = non_retryable
93+
self._next_retry_delay = next_retry_delay
9194

9295
@property
9396
def details(self) -> Sequence[Any]:
@@ -109,6 +112,15 @@ def non_retryable(self) -> bool:
109112
"""
110113
return self._non_retryable
111114

115+
@property
116+
def next_retry_delay(self) -> Optional[timedelta]:
117+
"""Delay before the next activity retry attempt.
118+
119+
User activity code may set this when raising ApplicationError to specify
120+
a delay before the next activity retry.
121+
"""
122+
return self._next_retry_delay
123+
112124

113125
class CancelledError(FailureError):
114126
"""Error raised on workflow/activity cancellation."""

tests/worker/test_activity.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,14 @@ async def test_sync_activity_process_non_picklable_heartbeat_details(
744744
picklable_activity_non_pickable_heartbeat_details,
745745
worker_config={"activity_executor": executor},
746746
)
747-
assert "Can't pickle" in str(assert_activity_application_error(err.value))
747+
msg = str(assert_activity_application_error(err.value))
748+
# TODO: different messages can apparently be produced across runs/platforms
749+
# See e.g. https://github.com/temporalio/sdk-python/actions/runs/10455232879/job/28949714969?pr=571
750+
assert (
751+
"Can't pickle" in msg
752+
or "Can't get local object 'picklable_activity_non_pickable_heartbeat_details.<locals>.<lambda>'"
753+
in msg
754+
)
748755

749756

750757
async def test_activity_error_non_retryable(client: Client, worker: ExternalWorker):

tests/worker/test_workflow.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5861,3 +5861,47 @@ async def test_timer_started_after_workflow_completion(client: Client):
58615861
)
58625862
await handle.signal(TimerStartedAfterWorkflowCompletionWorkflow.my_signal)
58635863
assert await handle.result() == "workflow-result"
5864+
5865+
5866+
@activity.defn
5867+
async def activity_with_retry_delay():
5868+
raise ApplicationError(
5869+
ActivitiesWithRetryDelayWorkflow.error_message,
5870+
next_retry_delay=ActivitiesWithRetryDelayWorkflow.next_retry_delay,
5871+
)
5872+
5873+
5874+
@workflow.defn
5875+
class ActivitiesWithRetryDelayWorkflow:
5876+
error_message = "Deliberately failing with next_retry_delay set"
5877+
next_retry_delay = timedelta(milliseconds=5)
5878+
5879+
@workflow.run
5880+
async def run(self) -> None:
5881+
await workflow.execute_activity(
5882+
activity_with_retry_delay,
5883+
retry_policy=RetryPolicy(maximum_attempts=2),
5884+
schedule_to_close_timeout=timedelta(minutes=5),
5885+
)
5886+
5887+
5888+
async def test_activity_retry_delay(client: Client):
5889+
async with new_worker(
5890+
client, ActivitiesWithRetryDelayWorkflow, activities=[activity_with_retry_delay]
5891+
) as worker:
5892+
try:
5893+
await client.execute_workflow(
5894+
ActivitiesWithRetryDelayWorkflow.run,
5895+
id=str(uuid.uuid4()),
5896+
task_queue=worker.task_queue,
5897+
)
5898+
except WorkflowFailureError as err:
5899+
assert isinstance(err.cause, ActivityError)
5900+
assert isinstance(err.cause.cause, ApplicationError)
5901+
assert (
5902+
str(err.cause.cause) == ActivitiesWithRetryDelayWorkflow.error_message
5903+
)
5904+
assert (
5905+
err.cause.cause.next_retry_delay
5906+
== ActivitiesWithRetryDelayWorkflow.next_retry_delay
5907+
)

0 commit comments

Comments
 (0)