Skip to content

Commit 7540596

Browse files
committed
Move start_workflow to WorkflowRunOperationContext
1 parent 160cd9a commit 7540596

10 files changed

+168
-153
lines changed

temporalio/nexus/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from ._operation_context import info as info
1919
from ._operation_handlers import cancel_operation as cancel_operation
2020
from ._token import WorkflowHandle as WorkflowHandle
21-
from ._workflow import start_workflow as start_workflow
21+
from ._workflow import WorkflowRunOperationContext as WorkflowRunOperationContext
2222

2323

2424
class LoggerAdapter(logging.LoggerAdapter):

temporalio/nexus/_decorators.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,17 @@
2525
get_callable_name,
2626
get_workflow_run_start_method_input_and_output_type_annotations,
2727
)
28+
from temporalio.nexus._workflow import WorkflowRunOperationContext
2829

2930

3031
@overload
3132
def workflow_run_operation(
3233
start: Callable[
33-
[ServiceHandlerT, StartOperationContext, InputT],
34+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
3435
Awaitable[WorkflowHandle[OutputT]],
3536
],
3637
) -> Callable[
37-
[ServiceHandlerT, StartOperationContext, InputT],
38+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
3839
Awaitable[WorkflowHandle[OutputT]],
3940
]: ...
4041

@@ -46,12 +47,12 @@ def workflow_run_operation(
4647
) -> Callable[
4748
[
4849
Callable[
49-
[ServiceHandlerT, StartOperationContext, InputT],
50+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
5051
Awaitable[WorkflowHandle[OutputT]],
5152
]
5253
],
5354
Callable[
54-
[ServiceHandlerT, StartOperationContext, InputT],
55+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
5556
Awaitable[WorkflowHandle[OutputT]],
5657
],
5758
]: ...
@@ -60,26 +61,26 @@ def workflow_run_operation(
6061
def workflow_run_operation(
6162
start: Optional[
6263
Callable[
63-
[ServiceHandlerT, StartOperationContext, InputT],
64+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
6465
Awaitable[WorkflowHandle[OutputT]],
6566
]
6667
] = None,
6768
*,
6869
name: Optional[str] = None,
6970
) -> Union[
7071
Callable[
71-
[ServiceHandlerT, StartOperationContext, InputT],
72+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
7273
Awaitable[WorkflowHandle[OutputT]],
7374
],
7475
Callable[
7576
[
7677
Callable[
77-
[ServiceHandlerT, StartOperationContext, InputT],
78+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
7879
Awaitable[WorkflowHandle[OutputT]],
7980
]
8081
],
8182
Callable[
82-
[ServiceHandlerT, StartOperationContext, InputT],
83+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
8384
Awaitable[WorkflowHandle[OutputT]],
8485
],
8586
],
@@ -90,11 +91,11 @@ def workflow_run_operation(
9091

9192
def decorator(
9293
start: Callable[
93-
[ServiceHandlerT, StartOperationContext, InputT],
94+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
9495
Awaitable[WorkflowHandle[OutputT]],
9596
],
9697
) -> Callable[
97-
[ServiceHandlerT, StartOperationContext, InputT],
98+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
9899
Awaitable[WorkflowHandle[OutputT]],
99100
]:
100101
(
@@ -108,7 +109,7 @@ def operation_handler_factory(
108109
async def _start(
109110
ctx: StartOperationContext, input: InputT
110111
) -> WorkflowHandle[OutputT]:
111-
return await start(self, ctx, input)
112+
return await start(self, WorkflowRunOperationContext(ctx), input)
112113

113114
_start.__doc__ = start.__doc__
114115
return WorkflowRunOperationHandler(_start, input_type, output_type)

temporalio/nexus/_operation_handlers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ class WorkflowRunOperationHandler(OperationHandler[InputT, OutputT]):
4141
4242
Use this class to create an operation handler that starts a workflow by passing your
4343
``start`` method to the constructor. Your ``start`` method must use
44-
:py:func:`temporalio.nexus.start_workflow` to start the workflow.
44+
:py:func:`temporalio.nexus.WorkflowRunOperationContext.start_workflow` to start the
45+
workflow.
4546
"""
4647

4748
def __init__(

temporalio/nexus/_util.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,22 @@
1313
Union,
1414
)
1515

16-
from nexusrpc.handler import StartOperationContext
1716
from nexusrpc.types import (
1817
InputT,
1918
OutputT,
2019
ServiceHandlerT,
2120
)
2221

22+
from temporalio.nexus._workflow import WorkflowRunOperationContext
23+
2324
from ._token import (
2425
WorkflowHandle as WorkflowHandle,
2526
)
2627

2728

2829
def get_workflow_run_start_method_input_and_output_type_annotations(
2930
start: Callable[
30-
[ServiceHandlerT, StartOperationContext, InputT],
31+
[ServiceHandlerT, WorkflowRunOperationContext, InputT],
3132
Awaitable[WorkflowHandle[OutputT]],
3233
],
3334
) -> tuple[

temporalio/nexus/_workflow.py

Lines changed: 116 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
from dataclasses import dataclass
34
from datetime import timedelta
45
from typing import (
56
Any,
@@ -9,6 +10,8 @@
910
Union,
1011
)
1112

13+
from nexusrpc.handler import StartOperationContext
14+
1215
import temporalio.api.common.v1
1316
import temporalio.api.enums.v1
1417
import temporalio.common
@@ -22,116 +25,118 @@
2225
)
2326

2427

25-
# Overload for single-param workflow
26-
# TODO(nexus-prerelease): bring over other overloads
27-
async def start_workflow(
28-
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
29-
arg: ParamType,
30-
*,
31-
id: str,
32-
task_queue: Optional[str] = None,
33-
execution_timeout: Optional[timedelta] = None,
34-
run_timeout: Optional[timedelta] = None,
35-
task_timeout: Optional[timedelta] = None,
36-
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
37-
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
38-
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
39-
cron_schedule: str = "",
40-
memo: Optional[Mapping[str, Any]] = None,
41-
search_attributes: Optional[
42-
Union[
43-
temporalio.common.TypedSearchAttributes,
44-
temporalio.common.SearchAttributes,
45-
]
46-
] = None,
47-
static_summary: Optional[str] = None,
48-
static_details: Optional[str] = None,
49-
start_delay: Optional[timedelta] = None,
50-
start_signal: Optional[str] = None,
51-
start_signal_args: Sequence[Any] = [],
52-
rpc_metadata: Mapping[str, str] = {},
53-
rpc_timeout: Optional[timedelta] = None,
54-
request_eager_start: bool = False,
55-
priority: temporalio.common.Priority = temporalio.common.Priority.default,
56-
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
57-
) -> WorkflowHandle[ReturnType]:
58-
"""Start a workflow that will deliver the result of the Nexus operation.
59-
60-
The workflow will be started in the same namespace as the Nexus worker, using
61-
the same client as the worker. If task queue is not specified, the worker's task
62-
queue will be used.
63-
64-
See :py:meth:`temporalio.client.Client.start_workflow` for all arguments.
65-
66-
The return value is :py:class:`temporalio.nexus.WorkflowOperationToken`.
67-
Use :py:meth:`temporalio.nexus.WorkflowOperationToken.to_workflow_handle`
68-
to get a :py:class:`temporalio.client.WorkflowHandle` for interacting with the
69-
workflow.
70-
71-
The workflow will be started as usual, with the following modifications:
72-
73-
- On workflow completion, Temporal server will deliver the workflow result to
74-
the Nexus operation caller, using the callback from the Nexus operation start
75-
request.
76-
77-
- The request ID from the Nexus operation start request will be used as the
78-
request ID for the start workflow request.
79-
80-
- Inbound links to the caller that were submitted in the Nexus start operation
81-
request will be attached to the started workflow and, outbound links to the
82-
started workflow will be added to the Nexus start operation response. If the
83-
Nexus caller is itself a workflow, this means that the workflow in the caller
84-
namespace web UI will contain links to the started workflow, and vice versa.
85-
"""
86-
tctx = _TemporalNexusOperationContext.get()
87-
start_operation_context = tctx._temporal_start_operation_context
88-
if not start_operation_context:
89-
raise RuntimeError(
90-
"temporalio.nexus.start_workflow() must be called from "
91-
"within a Nexus start operation context"
28+
@dataclass
29+
class WorkflowRunOperationContext:
30+
start_operation_context: StartOperationContext
31+
32+
# Overload for single-param workflow
33+
# TODO(nexus-prerelease): bring over other overloads
34+
async def start_workflow(
35+
self,
36+
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
37+
arg: ParamType,
38+
*,
39+
id: str,
40+
task_queue: Optional[str] = None,
41+
execution_timeout: Optional[timedelta] = None,
42+
run_timeout: Optional[timedelta] = None,
43+
task_timeout: Optional[timedelta] = None,
44+
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
45+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
46+
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
47+
cron_schedule: str = "",
48+
memo: Optional[Mapping[str, Any]] = None,
49+
search_attributes: Optional[
50+
Union[
51+
temporalio.common.TypedSearchAttributes,
52+
temporalio.common.SearchAttributes,
53+
]
54+
] = None,
55+
static_summary: Optional[str] = None,
56+
static_details: Optional[str] = None,
57+
start_delay: Optional[timedelta] = None,
58+
start_signal: Optional[str] = None,
59+
start_signal_args: Sequence[Any] = [],
60+
rpc_metadata: Mapping[str, str] = {},
61+
rpc_timeout: Optional[timedelta] = None,
62+
request_eager_start: bool = False,
63+
priority: temporalio.common.Priority = temporalio.common.Priority.default,
64+
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
65+
) -> WorkflowHandle[ReturnType]:
66+
"""Start a workflow that will deliver the result of the Nexus operation.
67+
68+
The workflow will be started in the same namespace as the Nexus worker, using
69+
the same client as the worker. If task queue is not specified, the worker's task
70+
queue will be used.
71+
72+
See :py:meth:`temporalio.client.Client.start_workflow` for all arguments.
73+
74+
The return value is :py:class:`temporalio.nexus.WorkflowHandle`.
75+
76+
The workflow will be started as usual, with the following modifications:
77+
78+
- On workflow completion, Temporal server will deliver the workflow result to
79+
the Nexus operation caller, using the callback from the Nexus operation start
80+
request.
81+
82+
- The request ID from the Nexus operation start request will be used as the
83+
request ID for the start workflow request.
84+
85+
- Inbound links to the caller that were submitted in the Nexus start operation
86+
request will be attached to the started workflow and, outbound links to the
87+
started workflow will be added to the Nexus start operation response. If the
88+
Nexus caller is itself a workflow, this means that the workflow in the caller
89+
namespace web UI will contain links to the started workflow, and vice versa.
90+
"""
91+
tctx = _TemporalNexusOperationContext.get()
92+
start_operation_context = tctx._temporal_start_operation_context
93+
if not start_operation_context:
94+
raise RuntimeError(
95+
"WorkflowRunOperationContext.start_workflow() must be called from "
96+
"within a Nexus start operation context"
97+
)
98+
99+
# TODO(nexus-preview): When sdk-python supports on_conflict_options, Typescript does this:
100+
# if (workflowOptions.workflowIdConflictPolicy === 'USE_EXISTING') {
101+
# internalOptions.onConflictOptions = {
102+
# attachLinks: true,
103+
# attachCompletionCallbacks: true,
104+
# attachRequestId: true,
105+
# };
106+
# }
107+
108+
# We must pass nexus_completion_callbacks, workflow_event_links, and request_id,
109+
# but these are deliberately not exposed in overloads, hence the type-check
110+
# violation.
111+
wf_handle = await tctx.client.start_workflow( # type: ignore
112+
workflow=workflow,
113+
arg=arg,
114+
id=id,
115+
task_queue=task_queue or tctx.info().task_queue,
116+
execution_timeout=execution_timeout,
117+
run_timeout=run_timeout,
118+
task_timeout=task_timeout,
119+
id_reuse_policy=id_reuse_policy,
120+
id_conflict_policy=id_conflict_policy,
121+
retry_policy=retry_policy,
122+
cron_schedule=cron_schedule,
123+
memo=memo,
124+
search_attributes=search_attributes,
125+
static_summary=static_summary,
126+
static_details=static_details,
127+
start_delay=start_delay,
128+
start_signal=start_signal,
129+
start_signal_args=start_signal_args,
130+
rpc_metadata=rpc_metadata,
131+
rpc_timeout=rpc_timeout,
132+
request_eager_start=request_eager_start,
133+
priority=priority,
134+
versioning_override=versioning_override,
135+
nexus_completion_callbacks=start_operation_context.get_completion_callbacks(),
136+
workflow_event_links=start_operation_context.get_workflow_event_links(),
137+
request_id=start_operation_context.nexus_operation_context.request_id,
92138
)
93139

94-
# TODO(nexus-preview): When sdk-python supports on_conflict_options, Typescript does this:
95-
# if (workflowOptions.workflowIdConflictPolicy === 'USE_EXISTING') {
96-
# internalOptions.onConflictOptions = {
97-
# attachLinks: true,
98-
# attachCompletionCallbacks: true,
99-
# attachRequestId: true,
100-
# };
101-
# }
102-
103-
# We must pass nexus_completion_callbacks, workflow_event_links, and request_id,
104-
# but these are deliberately not exposed in overloads, hence the type-check
105-
# violation.
106-
wf_handle = await tctx.client.start_workflow( # type: ignore
107-
workflow=workflow,
108-
arg=arg,
109-
id=id,
110-
task_queue=task_queue or tctx.info().task_queue,
111-
execution_timeout=execution_timeout,
112-
run_timeout=run_timeout,
113-
task_timeout=task_timeout,
114-
id_reuse_policy=id_reuse_policy,
115-
id_conflict_policy=id_conflict_policy,
116-
retry_policy=retry_policy,
117-
cron_schedule=cron_schedule,
118-
memo=memo,
119-
search_attributes=search_attributes,
120-
static_summary=static_summary,
121-
static_details=static_details,
122-
start_delay=start_delay,
123-
start_signal=start_signal,
124-
start_signal_args=start_signal_args,
125-
rpc_metadata=rpc_metadata,
126-
rpc_timeout=rpc_timeout,
127-
request_eager_start=request_eager_start,
128-
priority=priority,
129-
versioning_override=versioning_override,
130-
nexus_completion_callbacks=start_operation_context.get_completion_callbacks(),
131-
workflow_event_links=start_operation_context.get_workflow_event_links(),
132-
request_id=start_operation_context.nexus_operation_context.request_id,
133-
)
134-
135-
start_operation_context.add_outbound_links(wf_handle)
136-
137-
return WorkflowHandle[ReturnType]._unsafe_from_client_workflow_handle(wf_handle)
140+
start_operation_context.add_outbound_links(wf_handle)
141+
142+
return WorkflowHandle[ReturnType]._unsafe_from_client_workflow_handle(wf_handle)

0 commit comments

Comments
 (0)