Skip to content

Commit c79cc61

Browse files
authored
Workflow replayer support (#204)
Fixes #187
1 parent 17cb2b5 commit c79cc61

File tree

21 files changed

+976
-154
lines changed

21 files changed

+976
-154
lines changed

README.md

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ until the SDK is marked stable.
6262
- [Activity Worker Shutdown](#activity-worker-shutdown)
6363
- [Activity Concurrency and Executors](#activity-concurrency-and-executors)
6464
- [Activity Testing](#activity-testing)
65+
- [Ractors](#ractors)
6566
- [Platform Support](#platform-support)
6667
- [Development](#development)
6768
- [Build](#build)
@@ -853,7 +854,48 @@ the mock activity class to make it appear as the real name.
853854

854855
#### Workflow Replay
855856

856-
TODO: Workflow replayer not yet implemented
857+
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
858+
assuming the `history_json` parameter below is given a JSON string of history exported from the CLI or web UI, the
859+
following function will replay it:
860+
861+
```ruby
862+
def replay_from_json(history_json)
863+
replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow])
864+
replayer.replay_workflow(Temporalio::WorkflowHistory.from_history_json(history_json))
865+
end
866+
```
867+
868+
If there is a non-determinism, this will raise an exception by default.
869+
870+
Workflow history can be loaded from more than just JSON. It can be fetched individually from a workflow handle, or even
871+
in a list. For example, the following code will check that all workflow histories for a certain workflow type (i.e.
872+
workflow class) are safe with the current workflow code.
873+
874+
```ruby
875+
def check_past_histories(client)
876+
replayer = Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow])
877+
results = replayer.replay_workflows(client.list_workflows("WorkflowType = 'MyWorkflow'").map do |desc|
878+
client.workflow_handle(desc.id, run_id: desc.run_id).fetch_history
879+
end)
880+
results.each { |res| raise res.replay_failure if res.replay_failure }
881+
end
882+
```
883+
884+
But this only raises at the end because by default `replay_workflows` does not raise on failure like `replay_workflow`
885+
does. The `raise_on_replay_failure: true` parameter could be set, or the replay worker can be used to process each one
886+
like so:
887+
888+
```ruby
889+
def check_past_histories(client)
890+
Temporalio::Worker::WorkflowReplayer.new(workflows: [MyWorkflow]) do |worker|
891+
client.list_workflows("WorkflowType = 'MyWorkflow'").each do |desc|
892+
worker.replay_workflow(client.workflow_handle(desc.id, run_id: desc.run_id).fetch_history)
893+
end
894+
end
895+
end
896+
```
897+
898+
See the `WorkflowReplayer` API documentation for more details.
857899

858900
### Activities
859901

temporalio/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/ext/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["
2020
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
2121
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
2222
tokio = "1.26"
23+
tokio-stream = "0.1"
2324
tokio-util = "0.7"
2425
tonic = "0.12"
2526
tracing = "0.1"

temporalio/ext/src/worker.rs

Lines changed: 120 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
use crate::{
99
client::Client,
1010
enter_sync, error, id, new_error,
11-
runtime::{AsyncCommand, RuntimeHandle},
11+
runtime::{AsyncCommand, Runtime, RuntimeHandle},
1212
util::{AsyncCallback, Struct},
1313
ROOT_MOD,
1414
};
@@ -20,15 +20,19 @@ use magnus::{
2020
};
2121
use prost::Message;
2222
use temporal_sdk_core::{
23+
replay::{HistoryForReplay, ReplayWorkerInput},
2324
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
24-
SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfigBuilder,
25+
SlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder, WorkerConfig, WorkerConfigBuilder,
2526
};
2627
use temporal_sdk_core_api::{
2728
errors::{PollError, WorkflowErrorType},
2829
worker::SlotKind,
2930
};
3031
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
3132
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
33+
use temporal_sdk_core_protos::temporal::api::history::v1::History;
34+
use tokio::sync::mpsc::{channel, Sender};
35+
use tokio_stream::wrappers::ReceiverStream;
3236

3337
pub fn init(ruby: &Ruby) -> Result<(), Error> {
3438
let class = ruby
@@ -55,6 +59,11 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
5559
)?;
5660
class.define_method("replace_client", method!(Worker::replace_client, 1))?;
5761
class.define_method("initiate_shutdown", method!(Worker::initiate_shutdown, 0))?;
62+
63+
let inner_class = class.define_class("WorkflowReplayer", class::object())?;
64+
inner_class.define_singleton_method("new", function!(WorkflowReplayer::new, 2))?;
65+
inner_class.define_method("push_history", method!(WorkflowReplayer::push_history, 2))?;
66+
5867
Ok(())
5968
}
6069

@@ -89,70 +98,13 @@ impl Worker {
8998
let activity = options.member::<bool>(id!("activity"))?;
9099
let workflow = options.member::<bool>(id!("workflow"))?;
91100

92-
// Build config
93-
let config = WorkerConfigBuilder::default()
94-
.namespace(options.member::<String>(id!("namespace"))?)
95-
.task_queue(options.member::<String>(id!("task_queue"))?)
96-
.worker_build_id(options.member::<String>(id!("build_id"))?)
97-
.client_identity_override(options.member::<Option<String>>(id!("identity_override"))?)
98-
.max_cached_workflows(options.member::<usize>(id!("max_cached_workflows"))?)
99-
.max_concurrent_wft_polls(
100-
options.member::<usize>(id!("max_concurrent_workflow_task_polls"))?,
101-
)
102-
.nonsticky_to_sticky_poll_ratio(
103-
options.member::<f32>(id!("nonsticky_to_sticky_poll_ratio"))?,
104-
)
105-
.max_concurrent_at_polls(
106-
options.member::<usize>(id!("max_concurrent_activity_task_polls"))?,
107-
)
108-
.no_remote_activities(options.member::<bool>(id!("no_remote_activities"))?)
109-
.sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64(
110-
options.member(id!("sticky_queue_schedule_to_start_timeout"))?,
111-
))
112-
.max_heartbeat_throttle_interval(Duration::from_secs_f64(
113-
options.member(id!("max_heartbeat_throttle_interval"))?,
114-
))
115-
.default_heartbeat_throttle_interval(Duration::from_secs_f64(
116-
options.member(id!("default_heartbeat_throttle_interval"))?,
117-
))
118-
.max_worker_activities_per_second(
119-
options.member::<Option<f64>>(id!("max_worker_activities_per_second"))?,
120-
)
121-
.max_task_queue_activities_per_second(
122-
options.member::<Option<f64>>(id!("max_task_queue_activities_per_second"))?,
123-
)
124-
.graceful_shutdown_period(Duration::from_secs_f64(
125-
options.member(id!("graceful_shutdown_period"))?,
126-
))
127-
.use_worker_versioning(options.member::<bool>(id!("use_worker_versioning"))?)
128-
.tuner(Arc::new(build_tuner(
129-
options
130-
.child(id!("tuner"))?
131-
.ok_or_else(|| error!("Missing tuner"))?,
132-
)?))
133-
.workflow_failure_errors(
134-
if options.member::<bool>(id!("nondeterminism_as_workflow_fail"))? {
135-
HashSet::from([WorkflowErrorType::Nondeterminism])
136-
} else {
137-
HashSet::new()
138-
},
139-
)
140-
.workflow_types_to_failure_errors(
141-
options
142-
.member::<Vec<String>>(id!("nondeterminism_as_workflow_fail_for_types"))?
143-
.into_iter()
144-
.map(|s| (s, HashSet::from([WorkflowErrorType::Nondeterminism])))
145-
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
146-
)
147-
.build()
148-
.map_err(|err| error!("Invalid worker options: {}", err))?;
149-
150101
let worker = temporal_sdk_core::init_worker(
151102
&client.runtime_handle.core,
152-
config,
103+
build_config(options)?,
153104
client.core.clone().into_inner(),
154105
)
155106
.map_err(|err| error!("Failed creating worker: {}", err))?;
107+
156108
Ok(Worker {
157109
core: RefCell::new(Some(Arc::new(worker))),
158110
runtime_handle: client.runtime_handle.clone(),
@@ -435,6 +387,113 @@ impl Worker {
435387
}
436388
}
437389

390+
#[derive(DataTypeFunctions, TypedData)]
391+
#[magnus(
392+
class = "Temporalio::Internal::Bridge::Worker::WorkflowReplayer",
393+
free_immediately
394+
)]
395+
pub struct WorkflowReplayer {
396+
tx: Sender<HistoryForReplay>,
397+
runtime_handle: RuntimeHandle,
398+
}
399+
400+
impl WorkflowReplayer {
401+
pub fn new(runtime: &Runtime, options: Struct) -> Result<(Self, Worker), Error> {
402+
enter_sync!(runtime.handle.clone());
403+
404+
let (tx, rx) = channel(1);
405+
406+
let core_worker = temporal_sdk_core::init_replay_worker(ReplayWorkerInput::new(
407+
build_config(options)?,
408+
ReceiverStream::new(rx),
409+
))
410+
.map_err(|err| error!("Failed creating worker: {}", err))?;
411+
412+
Ok((
413+
WorkflowReplayer {
414+
tx,
415+
runtime_handle: runtime.handle.clone(),
416+
},
417+
Worker {
418+
core: RefCell::new(Some(Arc::new(core_worker))),
419+
runtime_handle: runtime.handle.clone(),
420+
activity: false,
421+
workflow: true,
422+
},
423+
))
424+
}
425+
426+
pub fn push_history(&self, workflow_id: String, proto: RString) -> Result<(), Error> {
427+
let history = History::decode(unsafe { proto.as_slice() })
428+
.map_err(|err| error!("Invalid proto: {}", err))?;
429+
let tx = self.tx.clone();
430+
self.runtime_handle.core.tokio_handle().spawn(async move {
431+
// Intentionally ignoring error here
432+
let _ = tx.send(HistoryForReplay::new(history, workflow_id)).await;
433+
});
434+
Ok(())
435+
}
436+
}
437+
438+
fn build_config(options: Struct) -> Result<WorkerConfig, Error> {
439+
WorkerConfigBuilder::default()
440+
.namespace(options.member::<String>(id!("namespace"))?)
441+
.task_queue(options.member::<String>(id!("task_queue"))?)
442+
.worker_build_id(options.member::<String>(id!("build_id"))?)
443+
.client_identity_override(options.member::<Option<String>>(id!("identity_override"))?)
444+
.max_cached_workflows(options.member::<usize>(id!("max_cached_workflows"))?)
445+
.max_concurrent_wft_polls(
446+
options.member::<usize>(id!("max_concurrent_workflow_task_polls"))?,
447+
)
448+
.nonsticky_to_sticky_poll_ratio(
449+
options.member::<f32>(id!("nonsticky_to_sticky_poll_ratio"))?,
450+
)
451+
.max_concurrent_at_polls(
452+
options.member::<usize>(id!("max_concurrent_activity_task_polls"))?,
453+
)
454+
.no_remote_activities(options.member::<bool>(id!("no_remote_activities"))?)
455+
.sticky_queue_schedule_to_start_timeout(Duration::from_secs_f64(
456+
options.member(id!("sticky_queue_schedule_to_start_timeout"))?,
457+
))
458+
.max_heartbeat_throttle_interval(Duration::from_secs_f64(
459+
options.member(id!("max_heartbeat_throttle_interval"))?,
460+
))
461+
.default_heartbeat_throttle_interval(Duration::from_secs_f64(
462+
options.member(id!("default_heartbeat_throttle_interval"))?,
463+
))
464+
.max_worker_activities_per_second(
465+
options.member::<Option<f64>>(id!("max_worker_activities_per_second"))?,
466+
)
467+
.max_task_queue_activities_per_second(
468+
options.member::<Option<f64>>(id!("max_task_queue_activities_per_second"))?,
469+
)
470+
.graceful_shutdown_period(Duration::from_secs_f64(
471+
options.member(id!("graceful_shutdown_period"))?,
472+
))
473+
.use_worker_versioning(options.member::<bool>(id!("use_worker_versioning"))?)
474+
.tuner(Arc::new(build_tuner(
475+
options
476+
.child(id!("tuner"))?
477+
.ok_or_else(|| error!("Missing tuner"))?,
478+
)?))
479+
.workflow_failure_errors(
480+
if options.member::<bool>(id!("nondeterminism_as_workflow_fail"))? {
481+
HashSet::from([WorkflowErrorType::Nondeterminism])
482+
} else {
483+
HashSet::new()
484+
},
485+
)
486+
.workflow_types_to_failure_errors(
487+
options
488+
.member::<Vec<String>>(id!("nondeterminism_as_workflow_fail_for_types"))?
489+
.into_iter()
490+
.map(|s| (s, HashSet::from([WorkflowErrorType::Nondeterminism])))
491+
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
492+
)
493+
.build()
494+
.map_err(|err| error!("Invalid worker options: {}", err))
495+
}
496+
438497
fn build_tuner(options: Struct) -> Result<TunerHolder, Error> {
439498
let (workflow_slot_options, resource_slot_options) = build_tuner_slot_options(
440499
options

0 commit comments

Comments
 (0)