Skip to content

Commit c47e3f1

Browse files
Enable Eager Workflow Start (#430)
1 parent 75e528b commit c47e3f1

File tree

5 files changed

+77
-40
lines changed

5 files changed

+77
-40
lines changed

temporalio/bridge/Cargo.lock

Lines changed: 34 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/client.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ async def start_workflow(
287287
start_signal_args: Sequence[Any] = [],
288288
rpc_metadata: Mapping[str, str] = {},
289289
rpc_timeout: Optional[timedelta] = None,
290+
request_eager_start: bool = False,
290291
) -> WorkflowHandle[SelfType, ReturnType]:
291292
...
292293

@@ -317,6 +318,7 @@ async def start_workflow(
317318
start_signal_args: Sequence[Any] = [],
318319
rpc_metadata: Mapping[str, str] = {},
319320
rpc_timeout: Optional[timedelta] = None,
321+
request_eager_start: bool = False,
320322
) -> WorkflowHandle[SelfType, ReturnType]:
321323
...
322324

@@ -349,6 +351,7 @@ async def start_workflow(
349351
start_signal_args: Sequence[Any] = [],
350352
rpc_metadata: Mapping[str, str] = {},
351353
rpc_timeout: Optional[timedelta] = None,
354+
request_eager_start: bool = False,
352355
) -> WorkflowHandle[SelfType, ReturnType]:
353356
...
354357

@@ -381,6 +384,7 @@ async def start_workflow(
381384
start_signal_args: Sequence[Any] = [],
382385
rpc_metadata: Mapping[str, str] = {},
383386
rpc_timeout: Optional[timedelta] = None,
387+
request_eager_start: bool = False,
384388
) -> WorkflowHandle[Any, Any]:
385389
...
386390

@@ -411,6 +415,7 @@ async def start_workflow(
411415
start_signal_args: Sequence[Any] = [],
412416
rpc_metadata: Mapping[str, str] = {},
413417
rpc_timeout: Optional[timedelta] = None,
418+
request_eager_start: bool = False,
414419
stack_level: int = 2,
415420
) -> WorkflowHandle[Any, Any]:
416421
"""Start a workflow and return its handle.
@@ -445,6 +450,10 @@ async def start_workflow(
445450
rpc_metadata: Headers used on the RPC call. Keys here override
446451
client-level RPC metadata keys.
447452
rpc_timeout: Optional RPC deadline to set for the RPC call.
453+
request_eager_start: Potentially reduce the latency to start this workflow by
454+
encouraging the server to start it on a local worker running with
455+
this same client.
456+
This is currently experimental.
448457
449458
Returns:
450459
A workflow handle to the started workflow.
@@ -492,6 +501,7 @@ async def start_workflow(
492501
ret_type=result_type,
493502
rpc_metadata=rpc_metadata,
494503
rpc_timeout=rpc_timeout,
504+
request_eager_start=request_eager_start,
495505
)
496506
)
497507

@@ -521,6 +531,7 @@ async def execute_workflow(
521531
start_signal_args: Sequence[Any] = [],
522532
rpc_metadata: Mapping[str, str] = {},
523533
rpc_timeout: Optional[timedelta] = None,
534+
request_eager_start: bool = False,
524535
) -> ReturnType:
525536
...
526537

@@ -551,6 +562,7 @@ async def execute_workflow(
551562
start_signal_args: Sequence[Any] = [],
552563
rpc_metadata: Mapping[str, str] = {},
553564
rpc_timeout: Optional[timedelta] = None,
565+
request_eager_start: bool = False,
554566
) -> ReturnType:
555567
...
556568

@@ -583,6 +595,7 @@ async def execute_workflow(
583595
start_signal_args: Sequence[Any] = [],
584596
rpc_metadata: Mapping[str, str] = {},
585597
rpc_timeout: Optional[timedelta] = None,
598+
request_eager_start: bool = False,
586599
) -> ReturnType:
587600
...
588601

@@ -615,6 +628,7 @@ async def execute_workflow(
615628
start_signal_args: Sequence[Any] = [],
616629
rpc_metadata: Mapping[str, str] = {},
617630
rpc_timeout: Optional[timedelta] = None,
631+
request_eager_start: bool = False,
618632
) -> Any:
619633
...
620634

@@ -645,6 +659,7 @@ async def execute_workflow(
645659
start_signal_args: Sequence[Any] = [],
646660
rpc_metadata: Mapping[str, str] = {},
647661
rpc_timeout: Optional[timedelta] = None,
662+
request_eager_start: bool = False,
648663
) -> Any:
649664
"""Start a workflow and wait for completion.
650665
@@ -674,6 +689,7 @@ async def execute_workflow(
674689
start_signal_args=start_signal_args,
675690
rpc_metadata=rpc_metadata,
676691
rpc_timeout=rpc_timeout,
692+
request_eager_start=request_eager_start,
677693
stack_level=3,
678694
)
679695
).result()
@@ -1082,6 +1098,7 @@ def __init__(
10821098
self._result_run_id = result_run_id
10831099
self._first_execution_run_id = first_execution_run_id
10841100
self._result_type = result_type
1101+
self.__temporal_eagerly_started = False
10851102

10861103
@property
10871104
def id(self) -> str:
@@ -4282,6 +4299,7 @@ class StartWorkflowInput:
42824299
ret_type: Optional[Type]
42834300
rpc_metadata: Mapping[str, str]
42844301
rpc_timeout: Optional[timedelta]
4302+
request_eager_start: bool
42854303

42864304

42874305
@dataclass
@@ -4751,6 +4769,8 @@ async def start_workflow(
47514769
)
47524770
else:
47534771
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest()
4772+
req.request_eager_execution = input.request_eager_start
4773+
47544774
req.namespace = self._client.namespace
47554775
req.workflow_id = input.id
47564776
req.workflow_type.name = input.workflow
@@ -4794,6 +4814,7 @@ async def start_workflow(
47944814
temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse,
47954815
]
47964816
first_execution_run_id = None
4817+
eagerly_started = False
47974818
try:
47984819
if isinstance(
47994820
req,
@@ -4813,6 +4834,7 @@ async def start_workflow(
48134834
timeout=input.rpc_timeout,
48144835
)
48154836
first_execution_run_id = resp.run_id
4837+
eagerly_started = resp.HasField("eager_workflow_task")
48164838
except RPCError as err:
48174839
# If the status is ALREADY_EXISTS and the details can be extracted
48184840
# as already started, use a different exception
@@ -4826,13 +4848,15 @@ async def start_workflow(
48264848
)
48274849
else:
48284850
raise
4829-
return WorkflowHandle(
4851+
handle: WorkflowHandle[Any, Any] = WorkflowHandle(
48304852
self._client,
48314853
req.workflow_id,
48324854
result_run_id=resp.run_id,
48334855
first_execution_run_id=first_execution_run_id,
48344856
result_type=input.ret_type,
48354857
)
4858+
setattr(handle, "__temporal_eagerly_started", eagerly_started)
4859+
return handle
48364860

48374861
async def cancel_workflow(self, input: CancelWorkflowInput) -> None:
48384862
await self._client.workflow_service.request_cancel_workflow_execution(

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
8181
"system.forceSearchAttributesCacheRefreshOnRead=true",
8282
"--dynamic-config-value",
8383
f"limit.historyCount.suggestContinueAsNew={CONTINUE_AS_NEW_SUGGEST_HISTORY_COUNT}",
84+
"--dynamic-config-value",
85+
"system.enableEagerWorkflowStart=true",
8486
]
8587
)
8688
elif env_type == "time-skipping":

tests/worker/test_workflow.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,21 @@ async def test_workflow_hello(client: Client):
114114
assert result == "Hello, Temporal!"
115115

116116

117+
async def test_workflow_hello_eager(client: Client):
118+
async with new_worker(client, HelloWorkflow) as worker:
119+
handle = await client.start_workflow(
120+
HelloWorkflow.run,
121+
"Temporal",
122+
id=f"workflow-{uuid.uuid4()}",
123+
task_queue=worker.task_queue,
124+
request_eager_start=True,
125+
task_timeout=timedelta(hours=1), # hang if retry needed
126+
)
127+
assert handle.__temporal_eagerly_started
128+
result = await handle.result()
129+
assert result == "Hello, Temporal!"
130+
131+
117132
@activity.defn
118133
async def multi_param_activity(param1: int, param2: str) -> str:
119134
return f"param1: {param1}, param2: {param2}"

0 commit comments

Comments
 (0)