Skip to content

Commit 97814c2

Browse files
authored
Workflow start delay (#406)
Fixes #404
1 parent 4242dfb commit 97814c2

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed

temporalio/client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ async def start_workflow(
275275
cron_schedule: str = "",
276276
memo: Optional[Mapping[str, Any]] = None,
277277
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
278+
start_delay: Optional[timedelta] = None,
278279
start_signal: Optional[str] = None,
279280
start_signal_args: Sequence[Any] = [],
280281
rpc_metadata: Mapping[str, str] = {},
@@ -299,6 +300,7 @@ async def start_workflow(
299300
cron_schedule: str = "",
300301
memo: Optional[Mapping[str, Any]] = None,
301302
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
303+
start_delay: Optional[timedelta] = None,
302304
start_signal: Optional[str] = None,
303305
start_signal_args: Sequence[Any] = [],
304306
rpc_metadata: Mapping[str, str] = {},
@@ -325,6 +327,7 @@ async def start_workflow(
325327
cron_schedule: str = "",
326328
memo: Optional[Mapping[str, Any]] = None,
327329
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
330+
start_delay: Optional[timedelta] = None,
328331
start_signal: Optional[str] = None,
329332
start_signal_args: Sequence[Any] = [],
330333
rpc_metadata: Mapping[str, str] = {},
@@ -351,6 +354,7 @@ async def start_workflow(
351354
cron_schedule: str = "",
352355
memo: Optional[Mapping[str, Any]] = None,
353356
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
357+
start_delay: Optional[timedelta] = None,
354358
start_signal: Optional[str] = None,
355359
start_signal_args: Sequence[Any] = [],
356360
rpc_metadata: Mapping[str, str] = {},
@@ -375,6 +379,7 @@ async def start_workflow(
375379
cron_schedule: str = "",
376380
memo: Optional[Mapping[str, Any]] = None,
377381
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
382+
start_delay: Optional[timedelta] = None,
378383
start_signal: Optional[str] = None,
379384
start_signal_args: Sequence[Any] = [],
380385
rpc_metadata: Mapping[str, str] = {},
@@ -400,6 +405,9 @@ async def start_workflow(
400405
cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
401406
memo: Memo for the workflow.
402407
search_attributes: Search attributes for the workflow.
408+
start_delay: Amount of time to wait before starting the workflow.
409+
This does not work with ``cron_schedule``. This is currently
410+
experimental.
403411
start_signal: If present, this signal is sent as signal-with-start
404412
instead of traditional workflow start.
405413
start_signal_args: Arguments for start_signal if start_signal
@@ -444,6 +452,7 @@ async def start_workflow(
444452
cron_schedule=cron_schedule,
445453
memo=memo,
446454
search_attributes=search_attributes,
455+
start_delay=start_delay,
447456
headers={},
448457
start_signal=start_signal,
449458
start_signal_args=start_signal_args,
@@ -469,6 +478,7 @@ async def execute_workflow(
469478
cron_schedule: str = "",
470479
memo: Optional[Mapping[str, Any]] = None,
471480
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
481+
start_delay: Optional[timedelta] = None,
472482
start_signal: Optional[str] = None,
473483
start_signal_args: Sequence[Any] = [],
474484
rpc_metadata: Mapping[str, str] = {},
@@ -493,6 +503,7 @@ async def execute_workflow(
493503
cron_schedule: str = "",
494504
memo: Optional[Mapping[str, Any]] = None,
495505
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
506+
start_delay: Optional[timedelta] = None,
496507
start_signal: Optional[str] = None,
497508
start_signal_args: Sequence[Any] = [],
498509
rpc_metadata: Mapping[str, str] = {},
@@ -519,6 +530,7 @@ async def execute_workflow(
519530
cron_schedule: str = "",
520531
memo: Optional[Mapping[str, Any]] = None,
521532
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
533+
start_delay: Optional[timedelta] = None,
522534
start_signal: Optional[str] = None,
523535
start_signal_args: Sequence[Any] = [],
524536
rpc_metadata: Mapping[str, str] = {},
@@ -545,6 +557,7 @@ async def execute_workflow(
545557
cron_schedule: str = "",
546558
memo: Optional[Mapping[str, Any]] = None,
547559
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
560+
start_delay: Optional[timedelta] = None,
548561
start_signal: Optional[str] = None,
549562
start_signal_args: Sequence[Any] = [],
550563
rpc_metadata: Mapping[str, str] = {},
@@ -569,6 +582,7 @@ async def execute_workflow(
569582
cron_schedule: str = "",
570583
memo: Optional[Mapping[str, Any]] = None,
571584
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
585+
start_delay: Optional[timedelta] = None,
572586
start_signal: Optional[str] = None,
573587
start_signal_args: Sequence[Any] = [],
574588
rpc_metadata: Mapping[str, str] = {},
@@ -597,6 +611,7 @@ async def execute_workflow(
597611
cron_schedule=cron_schedule,
598612
memo=memo,
599613
search_attributes=search_attributes,
614+
start_delay=start_delay,
600615
start_signal=start_signal,
601616
start_signal_args=start_signal_args,
602617
rpc_metadata=rpc_metadata,
@@ -3753,6 +3768,7 @@ class StartWorkflowInput:
37533768
cron_schedule: str
37543769
memo: Optional[Mapping[str, Any]]
37553770
search_attributes: Optional[temporalio.common.SearchAttributes]
3771+
start_delay: Optional[timedelta]
37563772
headers: Mapping[str, temporalio.api.common.v1.Payload]
37573773
start_signal: Optional[str]
37583774
start_signal_args: Sequence[Any]
@@ -4233,6 +4249,8 @@ async def start_workflow(
42334249
temporalio.converter.encode_search_attributes(
42344250
input.search_attributes, req.search_attributes
42354251
)
4252+
if input.start_delay is not None:
4253+
req.workflow_start_delay.FromTimedelta(input.start_delay)
42364254
if input.headers is not None:
42374255
temporalio.common._apply_headers(input.headers, req.header.fields)
42384256

tests/test_client.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,53 @@ async def test_start_with_signal(client: Client, worker: ExternalWorker):
140140
assert "some signal arg" == await handle.result()
141141

142142

143+
async def test_start_delay(
144+
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
145+
):
146+
if env.supports_time_skipping:
147+
pytest.skip("Java test server does not support start delay")
148+
start_delay = timedelta(hours=1, minutes=20, seconds=30)
149+
handle = await client.start_workflow(
150+
"kitchen_sink",
151+
KSWorkflowParams(
152+
actions=[KSAction(result=KSResultAction(value="some result"))]
153+
),
154+
id=f"workflow-{uuid.uuid4()}",
155+
task_queue=worker.task_queue,
156+
start_delay=start_delay,
157+
)
158+
# Check that first event has start delay
159+
first_event = [e async for e in handle.fetch_history_events()][0]
160+
assert (
161+
start_delay
162+
== first_event.workflow_execution_started_event_attributes.first_workflow_task_backoff.ToTimedelta()
163+
)
164+
165+
166+
async def test_signal_with_start_delay(
167+
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
168+
):
169+
if env.supports_time_skipping:
170+
pytest.skip("Java test server does not support start delay")
171+
start_delay = timedelta(hours=1, minutes=20, seconds=30)
172+
handle = await client.start_workflow(
173+
"kitchen_sink",
174+
KSWorkflowParams(
175+
actions=[KSAction(result=KSResultAction(value="some result"))]
176+
),
177+
id=f"workflow-{uuid.uuid4()}",
178+
task_queue=worker.task_queue,
179+
start_delay=start_delay,
180+
start_signal="some-signal",
181+
)
182+
# Check that first event has start delay
183+
first_event = [e async for e in handle.fetch_history_events()][0]
184+
assert (
185+
start_delay
186+
== first_event.workflow_execution_started_event_attributes.first_workflow_task_backoff.ToTimedelta()
187+
)
188+
189+
143190
async def test_result_follow_continue_as_new(
144191
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
145192
):

0 commit comments

Comments
 (0)