Skip to content

Commit 4165fc5

Browse files
authored
Nexus: worker, workflow-backed operations, and workflow caller (#813)
1 parent 6fbc4c5 commit 4165fc5

37 files changed

+7802
-865
lines changed

README.md

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ informal introduction to the features and their implementation.
9494
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
9595
- [Worker Shutdown](#worker-shutdown)
9696
- [Testing](#testing-1)
97+
- [Nexus](#nexus)
9798
- [Workflow Replay](#workflow-replay)
9899
- [Observability](#observability)
99100
- [Metrics](#metrics)
@@ -1308,6 +1309,113 @@ affect calls activity code might make to functions on the `temporalio.activity`
13081309
* `cancel()` can be invoked to simulate a cancellation of the activity
13091310
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity
13101311

1312+
1313+
### Nexus
1314+
1315+
⚠️ **Nexus support is currently at an experimental release stage. Backwards-incompatible changes are anticipated until a stable release is announced.** ⚠️
1316+
1317+
[Nexus](https://github.com/nexus-rpc/) is a synchronous RPC protocol. Arbitrary duration operations that can respond
1318+
asynchronously are modeled on top of a set of pre-defined synchronous RPCs.
1319+
1320+
Temporal supports calling Nexus operations **from a workflow**. See https://docs.temporal.io/nexus. There is no support
1321+
currently for calling a Nexus operation from non-workflow code.
1322+
1323+
To get started quickly using Nexus with Temporal, see the Python Nexus sample:
1324+
https://github.com/temporalio/samples-python/tree/nexus/hello_nexus.
1325+
1326+
1327+
Two types of Nexus operation are supported, each using a decorator:
1328+
1329+
- `@temporalio.nexus.workflow_run_operation`: a Nexus operation that is backed by a Temporal workflow. The operation
1330+
handler you write will start the handler workflow and then respond with a token indicating that the handler workflow
1331+
is in progress. When the handler workflow completes, Temporal server will automatically deliver the result (success or
1332+
failure) to the caller workflow.
1333+
- `@nexusrpc.handler.sync_operation`: an operation that responds synchronously. It may be `def` or `async def` and it
1334+
may do network I/O, but it must respond within 10 seconds.
1335+
1336+
The following steps are an overview of the [Python Nexus sample](
1337+
https://github.com/temporalio/samples-python/tree/nexus/hello_nexus).
1338+
1339+
1. Create the caller and handler namespaces, and the Nexus endpoint. For example,
1340+
```
1341+
temporal operator namespace create --namespace my-handler-namespace
1342+
temporal operator namespace create --namespace my-caller-namespace
1343+
1344+
temporal operator nexus endpoint create \
1345+
--name my-nexus-endpoint \
1346+
--target-namespace my-handler-namespace \
1347+
--target-task-queue my-handler-task-queue
1348+
```
1349+
1350+
2. Define your service contract. This specifies the names and input/output types of your operations. You will use this
1351+
to refer to the operations when calling them from a workflow.
1352+
```python
1353+
@nexusrpc.service
1354+
class MyNexusService:
1355+
my_sync_operation: nexusrpc.Operation[MyInput, MyOutput]
1356+
my_workflow_run_operation: nexusrpc.Operation[MyInput, MyOutput]
1357+
```
1358+
1359+
3. Implement your operation handlers in a service handler:
1360+
```python
1361+
@service_handler(service=MyNexusService)
1362+
class MyNexusServiceHandler:
1363+
@sync_operation
1364+
async def my_sync_operation(
1365+
self, ctx: StartOperationContext, input: MyInput
1366+
) -> MyOutput:
1367+
return MyOutput(message=f"Hello {input.name} from sync operation!")
1368+
1369+
@workflow_run_operation
1370+
async def my_workflow_run_operation(
1371+
self, ctx: WorkflowRunOperationContext, input: MyInput
1372+
) -> nexus.WorkflowHandle[MyOutput]:
1373+
return await ctx.start_workflow(
1374+
WorkflowStartedByNexusOperation.run,
1375+
input,
1376+
id=str(uuid.uuid4()),
1377+
)
1378+
```
1379+
1380+
4. Register your service handler with a Temporal worker.
1381+
```python
1382+
client = await Client.connect("localhost:7233", namespace="my-handler-namespace")
1383+
worker = Worker(
1384+
client,
1385+
task_queue="my-handler-task-queue",
1386+
workflows=[WorkflowStartedByNexusOperation],
1387+
nexus_service_handlers=[MyNexusServiceHandler()],
1388+
)
1389+
await worker.run()
1390+
```
1391+
1392+
5. Call your Nexus operations from your caller workflow.
1393+
```python
1394+
@workflow.defn
1395+
class CallerWorkflow:
1396+
def __init__(self):
1397+
self.nexus_client = workflow.create_nexus_client(
1398+
service=MyNexusService, endpoint="my-nexus-endpoint"
1399+
)
1400+
1401+
@workflow.run
1402+
async def run(self, name: str) -> tuple[MyOutput, MyOutput]:
1403+
# Start the Nexus operation and wait for the result in one go, using execute_operation.
1404+
wf_result = await self.nexus_client.execute_operation(
1405+
MyNexusService.my_workflow_run_operation,
1406+
MyInput(name),
1407+
)
1408+
# Or alternatively, obtain the operation handle using start_operation,
1409+
# and then use it to get the result:
1410+
sync_operation_handle = await self.nexus_client.start_operation(
1411+
MyNexusService.my_sync_operation,
1412+
MyInput(name),
1413+
)
1414+
sync_result = await sync_operation_handle
1415+
return sync_result, wf_result
1416+
```
1417+
1418+
13111419
### Workflow Replay
13121420
13131421
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,

pyproject.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ keywords = [
1111
"workflow",
1212
]
1313
dependencies = [
14+
"nexus-rpc>=1.1.0",
1415
"protobuf>=3.20,<6",
1516
"python-dateutil>=2.8.2,<3 ; python_version < '3.11'",
1617
"types-protobuf>=3.20",
@@ -44,7 +45,7 @@ dev = [
4445
"psutil>=5.9.3,<6",
4546
"pydocstyle>=6.3.0,<7",
4647
"pydoctor>=24.11.1,<25",
47-
"pyright==1.1.377",
48+
"pyright==1.1.402",
4849
"pytest~=7.4",
4950
"pytest-asyncio>=0.21,<0.22",
5051
"pytest-timeout~=2.2",
@@ -53,6 +54,8 @@ dev = [
5354
"twine>=4.0.1,<5",
5455
"ruff>=0.5.0,<0.6",
5556
"maturin>=1.8.2",
57+
"pytest-cov>=6.1.1",
58+
"httpx>=0.28.1",
5659
"pytest-pretty>=1.3.0",
5760
]
5861

@@ -162,6 +165,7 @@ exclude = [
162165
"tests/worker/workflow_sandbox/testmodules/proto",
163166
"temporalio/bridge/worker.py",
164167
"temporalio/contrib/opentelemetry.py",
168+
"temporalio/contrib/pydantic.py",
165169
"temporalio/converter.py",
166170
"temporalio/testing/_workflow.py",
167171
"temporalio/worker/_activity.py",
@@ -173,6 +177,10 @@ exclude = [
173177
"tests/api/test_grpc_stub.py",
174178
"tests/conftest.py",
175179
"tests/contrib/test_opentelemetry.py",
180+
"tests/contrib/pydantic/models.py",
181+
"tests/contrib/pydantic/models_2.py",
182+
"tests/contrib/pydantic/test_pydantic.py",
183+
"tests/contrib/pydantic/workflows.py",
176184
"tests/test_converter.py",
177185
"tests/test_service.py",
178186
"tests/test_workflow.py",

temporalio/bridge/src/worker.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use temporal_sdk_core_api::worker::{
2020
};
2121
use temporal_sdk_core_api::Worker;
2222
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
23-
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
23+
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion};
2424
use temporal_sdk_core_protos::temporal::api::history::v1::History;
2525
use tokio::sync::mpsc::{channel, Sender};
2626
use tokio_stream::wrappers::ReceiverStream;
@@ -60,6 +60,7 @@ pub struct WorkerConfig {
6060
graceful_shutdown_period_millis: u64,
6161
nondeterminism_as_workflow_fail: bool,
6262
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
63+
nexus_task_poller_behavior: PollerBehavior,
6364
}
6465

6566
#[derive(FromPyObject)]
@@ -565,6 +566,18 @@ impl WorkerRef {
565566
})
566567
}
567568

569+
fn poll_nexus_task<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
570+
let worker = self.worker.as_ref().unwrap().clone();
571+
self.runtime.future_into_py(py, async move {
572+
let bytes = match worker.poll_nexus_task().await {
573+
Ok(task) => task.encode_to_vec(),
574+
Err(PollError::ShutDown) => return Err(PollShutdownError::new_err(())),
575+
Err(err) => return Err(PyRuntimeError::new_err(format!("Poll failure: {err}"))),
576+
};
577+
Ok(bytes)
578+
})
579+
}
580+
568581
fn complete_workflow_activation<'p>(
569582
&self,
570583
py: Python<'p>,
@@ -599,6 +612,22 @@ impl WorkerRef {
599612
})
600613
}
601614

615+
fn complete_nexus_task<'p>(&self,
616+
py: Python<'p>,
617+
proto: &Bound<'_, PyBytes>,
618+
) -> PyResult<Bound<'p, PyAny>> {
619+
let worker = self.worker.as_ref().unwrap().clone();
620+
let completion = NexusTaskCompletion::decode(proto.as_bytes())
621+
.map_err(|err| PyValueError::new_err(format!("Invalid proto: {err}")))?;
622+
self.runtime.future_into_py(py, async move {
623+
worker
624+
.complete_nexus_task(completion)
625+
.await
626+
.context("Completion failure")
627+
.map_err(Into::into)
628+
})
629+
}
630+
602631
fn record_activity_heartbeat(&self, proto: &Bound<'_, PyBytes>) -> PyResult<()> {
603632
enter_sync!(self.runtime);
604633
let heartbeat = ActivityHeartbeat::decode(proto.as_bytes())
@@ -696,6 +725,7 @@ fn convert_worker_config(
696725
})
697726
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
698727
)
728+
.nexus_task_poller_behavior(conf.nexus_task_poller_behavior)
699729
.build()
700730
.map_err(|err| PyValueError::new_err(format!("Invalid worker config: {err}")))
701731
}

temporalio/bridge/worker.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import temporalio.bridge.client
2727
import temporalio.bridge.proto
2828
import temporalio.bridge.proto.activity_task
29+
import temporalio.bridge.proto.nexus
2930
import temporalio.bridge.proto.workflow_activation
3031
import temporalio.bridge.proto.workflow_completion
3132
import temporalio.bridge.runtime
@@ -35,7 +36,7 @@
3536
from temporalio.bridge.temporal_sdk_bridge import (
3637
CustomSlotSupplier as BridgeCustomSlotSupplier,
3738
)
38-
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError
39+
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError # type: ignore
3940

4041

4142
@dataclass
@@ -60,6 +61,7 @@ class WorkerConfig:
6061
graceful_shutdown_period_millis: int
6162
nondeterminism_as_workflow_fail: bool
6263
nondeterminism_as_workflow_fail_for_types: Set[str]
64+
nexus_task_poller_behavior: PollerBehavior
6365

6466

6567
@dataclass
@@ -216,6 +218,14 @@ async def poll_activity_task(
216218
await self._ref.poll_activity_task()
217219
)
218220

221+
async def poll_nexus_task(
222+
self,
223+
) -> temporalio.bridge.proto.nexus.NexusTask:
224+
"""Poll for a nexus task."""
225+
return temporalio.bridge.proto.nexus.NexusTask.FromString(
226+
await self._ref.poll_nexus_task()
227+
)
228+
219229
async def complete_workflow_activation(
220230
self,
221231
comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
@@ -229,6 +239,12 @@ async def complete_activity_task(
229239
"""Complete an activity task."""
230240
await self._ref.complete_activity_task(comp.SerializeToString())
231241

242+
async def complete_nexus_task(
243+
self, comp: temporalio.bridge.proto.nexus.NexusTaskCompletion
244+
) -> None:
245+
"""Complete a nexus task."""
246+
await self._ref.complete_nexus_task(comp.SerializeToString())
247+
232248
def record_activity_heartbeat(
233249
self, comp: temporalio.bridge.proto.ActivityHeartbeat
234250
) -> None:

temporalio/client.py

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -464,9 +464,17 @@ async def start_workflow(
464464
rpc_metadata: Mapping[str, str] = {},
465465
rpc_timeout: Optional[timedelta] = None,
466466
request_eager_start: bool = False,
467-
stack_level: int = 2,
468467
priority: temporalio.common.Priority = temporalio.common.Priority.default,
469468
versioning_override: Optional[temporalio.common.VersioningOverride] = None,
469+
# The following options should not be considered part of the public API. They
470+
# are deliberately not exposed in overloads, and are not subject to any
471+
# backwards compatibility guarantees.
472+
callbacks: Sequence[Callback] = [],
473+
workflow_event_links: Sequence[
474+
temporalio.api.common.v1.Link.WorkflowEvent
475+
] = [],
476+
request_id: Optional[str] = None,
477+
stack_level: int = 2,
470478
) -> WorkflowHandle[Any, Any]:
471479
"""Start a workflow and return its handle.
472480
@@ -529,7 +537,6 @@ async def start_workflow(
529537
name, result_type_from_type_hint = (
530538
temporalio.workflow._Definition.get_name_and_result_type(workflow)
531539
)
532-
533540
return await self._impl.start_workflow(
534541
StartWorkflowInput(
535542
workflow=name,
@@ -557,6 +564,9 @@ async def start_workflow(
557564
rpc_timeout=rpc_timeout,
558565
request_eager_start=request_eager_start,
559566
priority=priority,
567+
callbacks=callbacks,
568+
workflow_event_links=workflow_event_links,
569+
request_id=request_id,
560570
)
561571
)
562572

@@ -5193,6 +5203,10 @@ class StartWorkflowInput:
51935203
rpc_timeout: Optional[timedelta]
51945204
request_eager_start: bool
51955205
priority: temporalio.common.Priority
5206+
# The following options are experimental and unstable.
5207+
callbacks: Sequence[Callback]
5208+
workflow_event_links: Sequence[temporalio.api.common.v1.Link.WorkflowEvent]
5209+
request_id: Optional[str]
51965210
versioning_override: Optional[temporalio.common.VersioningOverride] = None
51975211

51985212

@@ -5807,8 +5821,30 @@ async def _build_start_workflow_execution_request(
58075821
self, input: StartWorkflowInput
58085822
) -> temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest:
58095823
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest()
5810-
req.request_eager_execution = input.request_eager_start
58115824
await self._populate_start_workflow_execution_request(req, input)
5825+
# _populate_start_workflow_execution_request is used for both StartWorkflowInput
5826+
# and UpdateWithStartStartWorkflowInput. UpdateWithStartStartWorkflowInput does
5827+
# not have the following two fields so they are handled here.
5828+
req.request_eager_execution = input.request_eager_start
5829+
if input.request_id:
5830+
req.request_id = input.request_id
5831+
5832+
links = [
5833+
temporalio.api.common.v1.Link(workflow_event=link)
5834+
for link in input.workflow_event_links
5835+
]
5836+
req.completion_callbacks.extend(
5837+
temporalio.api.common.v1.Callback(
5838+
nexus=temporalio.api.common.v1.Callback.Nexus(
5839+
url=callback.url,
5840+
header=callback.headers,
5841+
),
5842+
links=links,
5843+
)
5844+
for callback in input.callbacks
5845+
)
5846+
# Links are duplicated on request for compatibility with older server versions.
5847+
req.links.extend(links)
58125848
return req
58135849

58145850
async def _build_signal_with_start_workflow_execution_request(
@@ -7231,6 +7267,25 @@ def api_key(self, value: Optional[str]) -> None:
72317267
self.service_client.update_api_key(value)
72327268

72337269

7270+
@dataclass(frozen=True)
7271+
class NexusCallback:
7272+
"""Nexus callback to attach to events such as workflow completion.
7273+
7274+
.. warning::
7275+
This API is experimental and unstable.
7276+
"""
7277+
7278+
url: str
7279+
"""Callback URL."""
7280+
7281+
headers: Mapping[str, str]
7282+
"""Header to attach to callback request."""
7283+
7284+
7285+
# Intended to become a union of callback types
7286+
Callback = NexusCallback
7287+
7288+
72347289
async def _encode_user_metadata(
72357290
converter: temporalio.converter.DataConverter,
72367291
summary: Optional[Union[str, temporalio.api.common.v1.Payload]],

0 commit comments

Comments
 (0)