Skip to content

Commit 5b202c0

Browse files
authored
Replayer (#108)
1 parent c965c47 commit 5b202c0

File tree

12 files changed

+719
-8
lines changed

12 files changed

+719
-8
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,22 @@ cancellation of all outstanding activities.
580580
The `shutdown()` invocation will wait on all activities to complete, so if a long-running activity does not at least
581581
respect cancellation, the shutdown may never complete.
582582

583+
### Workflow Replay
584+
585+
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
586+
assuming `history_json_str` is populated with a JSON string history either exported from the web UI or from `tctl`, the
587+
following function will replay it:
588+
589+
```python
590+
from temporalio.worker import Replayer
591+
592+
async def run_replayer(history_json_str: str):
593+
replayer = Replayer(workflows=[SayHello])
594+
await replayer.replay_workflow(history_json_str)
595+
```
596+
597+
This will throw an error if any non-determinism is detected.
598+
583599
### OpenTelemetry Support
584600

585601
OpenTelemetry support requires the optional `opentelemetry` dependencies which are part of the `opentelemetry` extra.

temporalio/bridge/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use pyo3::prelude::*;
2+
use pyo3::types::PyBytes;
23

34
mod client;
45
mod telemetry;
@@ -22,6 +23,7 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
2223
)?;
2324
m.add_class::<worker::WorkerRef>()?;
2425
m.add_function(wrap_pyfunction!(new_worker, m)?)?;
26+
m.add_function(wrap_pyfunction!(new_replay_worker, m)?)?;
2527
Ok(())
2628
}
2729

@@ -42,3 +44,11 @@ fn new_worker(
4244
) -> PyResult<worker::WorkerRef> {
4345
worker::new_worker(&client, config)
4446
}
47+
48+
#[pyfunction]
49+
fn new_replay_worker(
50+
history_proto: &PyBytes,
51+
config: worker::WorkerConfig,
52+
) -> PyResult<worker::WorkerRef> {
53+
worker::new_replay_worker(&history_proto, config)
54+
}

temporalio/bridge/src/worker.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use temporal_sdk_core::api::errors::{PollActivityError, PollWfError};
99
use temporal_sdk_core_api::Worker;
1010
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
1111
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
12+
use temporal_sdk_core_protos::temporal::api::history::v1::History;
1213

1314
use crate::client;
1415

@@ -50,6 +51,21 @@ pub fn new_worker(client: &client::ClientRef, config: WorkerConfig) -> PyResult<
5051
})
5152
}
5253

54+
pub fn new_replay_worker(history_proto: &PyBytes, config: WorkerConfig) -> PyResult<WorkerRef> {
55+
// This must be run with the Tokio context available
56+
let _guard = pyo3_asyncio::tokio::get_runtime().enter();
57+
let config: temporal_sdk_core::WorkerConfig = config.try_into()?;
58+
let history = History::decode(history_proto.as_bytes())
59+
.map_err(|err| PyValueError::new_err(format!("Invalid proto: {}", err)))?;
60+
Ok(WorkerRef {
61+
worker: Some(Arc::new(
62+
temporal_sdk_core::init_replay_worker(config, &history).map_err(|err| {
63+
PyValueError::new_err(format!("Failed creating replay worker: {}", err))
64+
})?,
65+
)),
66+
})
67+
}
68+
5369
#[pymethods]
5470
impl WorkerRef {
5571
fn poll_workflow_activation<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {

temporalio/bridge/worker.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
"""Worker using SDK Core."""
22

3+
from __future__ import annotations
4+
35
from dataclasses import dataclass
46
from typing import TYPE_CHECKING, Awaitable, Callable, Iterable, List, Mapping, Optional
57

68
import google.protobuf.internal.containers
79
from typing_extensions import TypeAlias
810

911
import temporalio.api.common.v1
12+
import temporalio.api.history.v1
1013
import temporalio.bridge.client
1114
import temporalio.bridge.proto
1215
import temporalio.bridge.proto.activity_task
@@ -42,14 +45,28 @@ class WorkerConfig:
4245
class Worker:
4346
"""SDK Core worker."""
4447

45-
def __init__(
46-
self, client: temporalio.bridge.client.Client, config: WorkerConfig
47-
) -> None:
48-
"""Create SDK core worker with a client and config."""
49-
self._ref = temporalio.bridge.temporal_sdk_bridge.new_worker(
50-
client._ref, config
48+
@staticmethod
49+
def create(client: temporalio.bridge.client.Client, config: WorkerConfig) -> Worker:
50+
"""Create a bridge worker from a bridge client."""
51+
return Worker(
52+
temporalio.bridge.temporal_sdk_bridge.new_worker(client._ref, config)
53+
)
54+
55+
@staticmethod
56+
def for_replay(
57+
history: temporalio.api.history.v1.History, config: WorkerConfig
58+
) -> Worker:
59+
"""Create a bridge replay worker from history."""
60+
return Worker(
61+
temporalio.bridge.temporal_sdk_bridge.new_replay_worker(
62+
history.SerializeToString(), config
63+
)
5164
)
5265

66+
def __init__(self, ref: temporalio.bridge.temporal_sdk_bridge.WorkerRef) -> None:
67+
"""Create SDK core worker from a bridge worker."""
68+
self._ref = ref
69+
5370
async def poll_workflow_activation(
5471
self,
5572
) -> temporalio.bridge.proto.workflow_activation.WorkflowActivation:

temporalio/worker/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
WorkflowInterceptorClassInput,
2020
WorkflowOutboundInterceptor,
2121
)
22+
from .replayer import Replayer, ReplayerConfig
2223
from .worker import Worker, WorkerConfig
2324
from .workflow_instance import (
2425
UnsandboxedWorkflowRunner,
@@ -31,6 +32,8 @@
3132
# Primary types
3233
"Worker",
3334
"WorkerConfig",
35+
"Replayer",
36+
"ReplayerConfig",
3437
# Interceptor base classes
3538
"Interceptor",
3639
"ActivityInboundInterceptor",

0 commit comments

Comments
 (0)