Skip to content

Commit 8889838

Browse files
committed
Move WorkflowRunOperationContext to _operation_context module
1 parent 7540596 commit 8889838

File tree

7 files changed

+134
-148
lines changed

7 files changed

+134
-148
lines changed

temporalio/nexus/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
from ._decorators import workflow_run_operation as workflow_run_operation
1010
from ._operation_context import Info as Info
11+
from ._operation_context import (
12+
WorkflowRunOperationContext as WorkflowRunOperationContext,
13+
)
1114
from ._operation_context import (
1215
_temporal_operation_context as _temporal_operation_context,
1316
)
@@ -18,7 +21,6 @@
1821
from ._operation_context import info as info
1922
from ._operation_handlers import cancel_operation as cancel_operation
2023
from ._token import WorkflowHandle as WorkflowHandle
21-
from ._workflow import WorkflowRunOperationContext as WorkflowRunOperationContext
2224

2325

2426
class LoggerAdapter(logging.LoggerAdapter):

temporalio/nexus/_decorators.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from nexusrpc.types import InputT, OutputT, ServiceHandlerT
1717

18+
from temporalio.nexus._operation_context import WorkflowRunOperationContext
1819
from temporalio.nexus._operation_handlers import (
1920
WorkflowRunOperationHandler,
2021
)
@@ -25,7 +26,6 @@
2526
get_callable_name,
2627
get_workflow_run_start_method_input_and_output_type_annotations,
2728
)
28-
from temporalio.nexus._workflow import WorkflowRunOperationContext
2929

3030

3131
@overload

temporalio/nexus/_operation_context.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
import urllib.parse
66
from contextvars import ContextVar
77
from dataclasses import dataclass
8+
from datetime import timedelta
89
from typing import (
910
Any,
1011
Callable,
12+
Mapping,
1113
Optional,
14+
Sequence,
1215
Union,
1316
)
1417

@@ -19,6 +22,13 @@
1922
import temporalio.api.enums.v1
2023
import temporalio.client
2124
import temporalio.common
25+
from temporalio.nexus._token import WorkflowHandle
26+
from temporalio.types import (
27+
MethodAsyncSingleParam,
28+
ParamType,
29+
ReturnType,
30+
SelfType,
31+
)
2232

2333
logger = logging.getLogger(__name__)
2434

@@ -92,6 +102,123 @@ def _temporal_cancel_operation_context(
92102
return _TemporalCancelOperationContext(ctx)
93103

94104

105+
@dataclass
106+
class WorkflowRunOperationContext:
107+
start_operation_context: StartOperationContext
108+
109+
# Overload for single-param workflow
110+
# TODO(nexus-prerelease): bring over other overloads
111+
async def start_workflow(
112+
self,
113+
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
114+
arg: ParamType,
115+
*,
116+
id: str,
117+
task_queue: Optional[str] = None,
118+
execution_timeout: Optional[timedelta] = None,
119+
run_timeout: Optional[timedelta] = None,
120+
task_timeout: Optional[timedelta] = None,
121+
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
122+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
123+
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
124+
cron_schedule: str = "",
125+
memo: Optional[Mapping[str, Any]] = None,
126+
search_attributes: Optional[
127+
Union[
128+
temporalio.common.TypedSearchAttributes,
129+
temporalio.common.SearchAttributes,
130+
]
131+
] = None,
132+
static_summary: Optional[str] = None,
133+
static_details: Optional[str] = None,
134+
start_delay: Optional[timedelta] = None,
135+
start_signal: Optional[str] = None,
136+
start_signal_args: Sequence[Any] = [],
137+
rpc_metadata: Mapping[str, str] = {},
138+
rpc_timeout: Optional[timedelta] = None,
139+
request_eager_start: bool = False,
140+
priority: temporalio.common.Priority = temporalio.common.Priority.default,
141+
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
142+
) -> WorkflowHandle[ReturnType]:
143+
"""Start a workflow that will deliver the result of the Nexus operation.
144+
145+
The workflow will be started in the same namespace as the Nexus worker, using
146+
the same client as the worker. If task queue is not specified, the worker's task
147+
queue will be used.
148+
149+
See :py:meth:`temporalio.client.Client.start_workflow` for all arguments.
150+
151+
The return value is :py:class:`temporalio.nexus.WorkflowHandle`.
152+
153+
The workflow will be started as usual, with the following modifications:
154+
155+
- On workflow completion, Temporal server will deliver the workflow result to
156+
the Nexus operation caller, using the callback from the Nexus operation start
157+
request.
158+
159+
- The request ID from the Nexus operation start request will be used as the
160+
request ID for the start workflow request.
161+
162+
- Inbound links to the caller that were submitted in the Nexus start operation
163+
request will be attached to the started workflow and, outbound links to the
164+
started workflow will be added to the Nexus start operation response. If the
165+
Nexus caller is itself a workflow, this means that the workflow in the caller
166+
namespace web UI will contain links to the started workflow, and vice versa.
167+
"""
168+
tctx = _TemporalNexusOperationContext.get()
169+
start_operation_context = tctx._temporal_start_operation_context
170+
if not start_operation_context:
171+
raise RuntimeError(
172+
"WorkflowRunOperationContext.start_workflow() must be called from "
173+
"within a Nexus start operation context"
174+
)
175+
176+
# TODO(nexus-preview): When sdk-python supports on_conflict_options, Typescript does this:
177+
# if (workflowOptions.workflowIdConflictPolicy === 'USE_EXISTING') {
178+
# internalOptions.onConflictOptions = {
179+
# attachLinks: true,
180+
# attachCompletionCallbacks: true,
181+
# attachRequestId: true,
182+
# };
183+
# }
184+
185+
# We must pass nexus_completion_callbacks, workflow_event_links, and request_id,
186+
# but these are deliberately not exposed in overloads, hence the type-check
187+
# violation.
188+
wf_handle = await tctx.client.start_workflow( # type: ignore
189+
workflow=workflow,
190+
arg=arg,
191+
id=id,
192+
task_queue=task_queue or tctx.info().task_queue,
193+
execution_timeout=execution_timeout,
194+
run_timeout=run_timeout,
195+
task_timeout=task_timeout,
196+
id_reuse_policy=id_reuse_policy,
197+
id_conflict_policy=id_conflict_policy,
198+
retry_policy=retry_policy,
199+
cron_schedule=cron_schedule,
200+
memo=memo,
201+
search_attributes=search_attributes,
202+
static_summary=static_summary,
203+
static_details=static_details,
204+
start_delay=start_delay,
205+
start_signal=start_signal,
206+
start_signal_args=start_signal_args,
207+
rpc_metadata=rpc_metadata,
208+
rpc_timeout=rpc_timeout,
209+
request_eager_start=request_eager_start,
210+
priority=priority,
211+
versioning_override=versioning_override,
212+
nexus_completion_callbacks=start_operation_context.get_completion_callbacks(),
213+
workflow_event_links=start_operation_context.get_workflow_event_links(),
214+
request_id=start_operation_context.nexus_operation_context.request_id,
215+
)
216+
217+
start_operation_context.add_outbound_links(wf_handle)
218+
219+
return WorkflowHandle[ReturnType]._unsafe_from_client_workflow_handle(wf_handle)
220+
221+
95222
@dataclass
96223
class _TemporalStartOperationContext:
97224
nexus_operation_context: StartOperationContext

temporalio/nexus/_util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
ServiceHandlerT,
2020
)
2121

22-
from temporalio.nexus._workflow import WorkflowRunOperationContext
22+
from temporalio.nexus._operation_context import WorkflowRunOperationContext
2323

2424
from ._token import (
2525
WorkflowHandle as WorkflowHandle,

temporalio/nexus/_workflow.py

Lines changed: 0 additions & 142 deletions
This file was deleted.

tests/nexus/test_workflow_caller.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@
3838
)
3939
from temporalio.common import WorkflowIDConflictPolicy
4040
from temporalio.exceptions import CancelledError, NexusHandlerError, NexusOperationError
41-
from temporalio.nexus import workflow_run_operation
42-
from temporalio.nexus._workflow import WorkflowRunOperationContext
41+
from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation
4342
from temporalio.service import RPCError, RPCStatusCode
4443
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
4544
from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name

tests/nexus/test_workflow_run_operation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
from nexusrpc.handler._decorators import operation_handler
1414

1515
from temporalio import workflow
16+
from temporalio.nexus import WorkflowRunOperationContext
1617
from temporalio.nexus._operation_handlers import WorkflowRunOperationHandler
17-
from temporalio.nexus._workflow import WorkflowRunOperationContext
1818
from temporalio.testing import WorkflowEnvironment
1919
from temporalio.worker import Worker
2020
from tests.helpers.nexus import (

0 commit comments

Comments
 (0)