Skip to content

Commit df8f0b7

Browse files
committed
Fix replay hanging on empty WFT complete @ end of history (#522)
1 parent 1cbe5c8 commit df8f0b7

File tree

7 files changed

+105
-46
lines changed

7 files changed

+105
-46
lines changed

core/src/replay/mod.rs

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
//! users during testing.
44
55
use crate::{
6-
worker::client::{mocks::mock_manual_workflow_client, WorkerClient},
6+
worker::{
7+
client::{mocks::mock_manual_workflow_client, WorkerClient},
8+
PostActivateHookData,
9+
},
710
Worker,
811
};
912
use futures::{FutureExt, Stream, StreamExt};
1013
use once_cell::sync::OnceCell;
1114
use parking_lot::Mutex;
1215
use std::{
13-
collections::HashMap,
1416
pin::Pin,
1517
sync::Arc,
1618
task::{Context, Poll},
@@ -147,25 +149,18 @@ impl Historator {
147149

148150
/// Returns a callback that can be used as the post-activation hook for a worker to indicate
149151
/// we're ready to replay the next history, or whatever else.
150-
pub(crate) fn get_post_activate_hook(&self) -> impl Fn(&Worker, &str, usize) + Send + Sync {
151-
let dat = self.dat.clone();
152+
pub(crate) fn get_post_activate_hook(
153+
&self,
154+
) -> impl Fn(&Worker, PostActivateHookData) + Send + Sync {
152155
let done_tx = self.replay_done_tx.clone();
153-
move |worker, activated_run_id, last_processed_event| {
154-
// We can't hold the lock while evaluating the hook, or we'd deadlock.
155-
let last_event_in_hist = dat
156-
.lock()
157-
.run_id_to_last_event_num
158-
.get(activated_run_id)
159-
.cloned();
160-
if let Some(le) = last_event_in_hist {
161-
if last_processed_event >= le {
162-
worker.request_wf_eviction(
163-
activated_run_id,
164-
"Always evict workflows after replay",
165-
EvictionReason::LangRequested,
166-
);
167-
done_tx.send(activated_run_id.to_string()).unwrap();
168-
}
156+
move |worker, data| {
157+
if !data.replaying {
158+
worker.request_wf_eviction(
159+
data.run_id,
160+
"Always evict workflows after replay",
161+
EvictionReason::LangRequested,
162+
);
163+
done_tx.send(data.run_id.to_string()).unwrap();
169164
}
170165
}
171166
}
@@ -184,19 +179,14 @@ impl Stream for Historator {
184179
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185180
match self.iter.poll_next_unpin(cx) {
186181
Poll::Ready(Some(history)) => {
187-
let run_id = history
182+
history
188183
.hist
189184
.extract_run_id_from_start()
190185
.expect(
191186
"Histories provided for replay must contain run ids in their workflow \
192187
execution started events",
193188
)
194189
.to_string();
195-
let last_event = history.hist.last_event_id();
196-
self.dat
197-
.lock()
198-
.run_id_to_last_event_num
199-
.insert(run_id, last_event as usize);
200190
Poll::Ready(Some(history))
201191
}
202192
Poll::Ready(None) => {
@@ -210,6 +200,5 @@ impl Stream for Historator {
210200

211201
#[derive(Default)]
212202
struct HistoratorDat {
213-
run_id_to_last_event_num: HashMap<String, usize>,
214203
all_dispatched: bool,
215204
}

core/src/worker/mod.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub struct Worker {
7878
shutdown_token: CancellationToken,
7979
/// Will be called at the end of each activation completion
8080
#[allow(clippy::type_complexity)] // Sorry clippy, there's no simple way to re-use here.
81-
post_activate_hook: Option<Box<dyn Fn(&Self, &str, usize) + Send + Sync>>,
81+
post_activate_hook: Option<Box<dyn Fn(&Self, PostActivateHookData) + Send + Sync>>,
8282
/// Set when non-local activities are complete and should stop being polled
8383
non_local_activities_complete: Arc<AtomicBool>,
8484
/// Set when local activities are complete and should stop being polled
@@ -538,9 +538,9 @@ impl Worker {
538538
self.workflows
539539
.activation_completed(
540540
completion,
541-
self.post_activate_hook.as_ref().map(|h| {
542-
|run_id: &str, most_recent_event: usize| h(self, run_id, most_recent_event)
543-
}),
541+
self.post_activate_hook
542+
.as_ref()
543+
.map(|h| |data: PostActivateHookData| h(self, data)),
544544
)
545545
.await?;
546546
Ok(())
@@ -559,7 +559,7 @@ impl Worker {
559559
/// Sets a function to be called at the end of each activation completion
560560
pub(crate) fn set_post_activate_hook(
561561
&mut self,
562-
callback: impl Fn(&Self, &str, usize) + Send + Sync + 'static,
562+
callback: impl Fn(&Self, PostActivateHookData) + Send + Sync + 'static,
563563
) {
564564
self.post_activate_hook = Some(Box::new(callback))
565565
}
@@ -588,6 +588,12 @@ impl Worker {
588588
}
589589
}
590590

591+
pub struct PostActivateHookData<'a> {
592+
pub run_id: &'a str,
593+
pub most_recent_event: usize,
594+
pub replaying: bool,
595+
}
596+
591597
fn build_wf_basics(
592598
config: &mut WorkerConfig,
593599
metrics: MetricsContext,

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,8 @@ impl WorkflowMachines {
420420
// then we don't need to do anything here, and in fact we need to avoid re-applying the
421421
// final WFT.
422422
if self.have_seen_terminal_event {
423+
// Replay clearly counts as done now, since we return here and never do anything else.
424+
self.replaying = false;
423425
return Ok(0);
424426
}
425427

core/src/worker/workflow/managed_run.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,8 @@ impl ManagedRun {
988988
let should_respond = !(self.wfm.machines.has_pending_jobs()
989989
|| outgoing_cmds.replaying
990990
|| is_query_playback
991-
|| data.activation_was_only_eviction);
991+
|| data.activation_was_only_eviction
992+
|| self.wfm.machines.have_seen_terminal_event);
992993
// If there are pending LA resolutions, and we're responding to a query here,
993994
// we want to make sure to force a new task, as otherwise once we tell lang about
994995
// the LA resolution there wouldn't be any task to reply to with the result of iterating
@@ -1011,10 +1012,7 @@ impl ManagedRun {
10111012
ActivationCompleteOutcome::DoNothing
10121013
};
10131014
FulfillableActivationComplete {
1014-
result: ActivationCompleteResult {
1015-
most_recently_processed_event: self.wfm.machines.last_processed_event as usize,
1016-
outcome,
1017-
},
1015+
result: self.build_activation_complete_result(outcome),
10181016
resp_chan,
10191017
}
10201018
}
@@ -1042,11 +1040,19 @@ impl ManagedRun {
10421040
chan: Option<oneshot::Sender<ActivationCompleteResult>>,
10431041
) {
10441042
if let Some(chan) = chan {
1045-
chan.send(ActivationCompleteResult {
1046-
most_recently_processed_event: self.most_recently_processed_event_number() as usize,
1047-
outcome,
1048-
})
1049-
.expect("Rcv half of activation reply not dropped");
1043+
chan.send(self.build_activation_complete_result(outcome))
1044+
.expect("Rcv half of activation reply not dropped");
1045+
}
1046+
}
1047+
1048+
fn build_activation_complete_result(
1049+
&self,
1050+
outcome: ActivationCompleteOutcome,
1051+
) -> ActivationCompleteResult {
1052+
ActivationCompleteResult {
1053+
outcome,
1054+
most_recently_processed_event: self.most_recently_processed_event_number() as usize,
1055+
replaying: self.wfm.machines.replaying,
10501056
}
10511057
}
10521058

core/src/worker/workflow/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::{
4343
workflow_stream::{LocalInput, LocalInputs, WFStream},
4444
},
4545
LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution,
46+
PostActivateHookData,
4647
},
4748
MetricsContext,
4849
};
@@ -295,7 +296,7 @@ impl Workflows {
295296
},
296297
// We need to say a type, but the type is irrelevant, so imagine some
297298
// boxed function we'll never call.
298-
Option::<Box<dyn Fn(&str, usize) + Send>>::None,
299+
Option::<Box<dyn Fn(PostActivateHookData) + Send>>::None,
299300
)
300301
.await?;
301302
}
@@ -308,7 +309,7 @@ impl Workflows {
308309
run_id,
309310
status: Some(auto_fail_to_complete_status(machines_err)),
310311
},
311-
Option::<Box<dyn Fn(&str, usize) + Send>>::None,
312+
Option::<Box<dyn Fn(PostActivateHookData) + Send>>::None,
312313
)
313314
.await?;
314315
}
@@ -323,7 +324,7 @@ impl Workflows {
323324
pub(super) async fn activation_completed(
324325
&self,
325326
completion: WorkflowActivationCompletion,
326-
post_activate_hook: Option<impl Fn(&str, usize)>,
327+
post_activate_hook: Option<impl Fn(PostActivateHookData)>,
327328
) -> Result<usize, CompleteWfError> {
328329
let is_empty_completion = completion.is_empty();
329330
let completion = validate_completion(completion)?;
@@ -448,7 +449,11 @@ impl Workflows {
448449
};
449450

450451
if let Some(h) = post_activate_hook {
451-
h(&run_id, completion_outcome.most_recently_processed_event);
452+
h(PostActivateHookData {
453+
run_id: &run_id,
454+
most_recent_event: completion_outcome.most_recently_processed_event,
455+
replaying: completion_outcome.replaying,
456+
});
452457
}
453458

454459
self.post_activation(PostActivationMsg {
@@ -956,6 +961,7 @@ struct GetStateInfoMsg {
956961
#[derive(Debug)]
957962
struct ActivationCompleteResult {
958963
most_recently_processed_event: usize,
964+
replaying: bool,
959965
outcome: ActivationCompleteOutcome,
960966
}
961967
/// What needs to be done after calling [Workflows::activation_completed]

histories/ends_empty_wft_complete.bin

1.18 KB
Binary file not shown.

tests/integ_tests/workflow_tests/replay.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use temporal_sdk_core_protos::{
1111
workflow_commands::{ScheduleActivity, StartTimer},
1212
workflow_completion::WorkflowActivationCompletion,
1313
},
14+
temporal::api::enums::v1::EventType,
1415
TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE,
1516
};
1617
use temporal_sdk_core_test_utils::{
@@ -130,6 +131,23 @@ async fn replay_using_wf_function() {
130131
worker.run().await.unwrap();
131132
}
132133

134+
#[tokio::test]
135+
async fn replay_ending_wft_complete_with_commands_but_no_scheduled_started() {
136+
let mut t = TestHistoryBuilder::default();
137+
t.add_by_type(EventType::WorkflowExecutionStarted);
138+
t.add_full_wf_task();
139+
140+
for i in 1..=2 {
141+
let timer_started_event_id = t.add_by_type(EventType::TimerStarted);
142+
t.add_timer_fired(timer_started_event_id, i.to_string());
143+
t.add_full_wf_task();
144+
}
145+
let func = timers_wf(3);
146+
let mut worker = replay_sdk_worker([test_hist_to_replay(t)]);
147+
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
148+
worker.run().await.unwrap();
149+
}
150+
133151
async fn replay_abrupt_ending(t: TestHistoryBuilder) {
134152
let func = timers_wf(1);
135153
let mut worker = replay_sdk_worker([test_hist_to_replay(t)]);
@@ -219,6 +237,38 @@ async fn replay_old_patch_format() {
219237
worker.run().await.unwrap();
220238
}
221239

240+
#[tokio::test]
241+
async fn replay_ends_with_empty_wft() {
242+
let core = init_core_replay_preloaded(
243+
"SayHelloWorkflow",
244+
[HistoryForReplay::new(
245+
history_from_proto_binary("histories/ends_empty_wft_complete.bin")
246+
.await
247+
.unwrap(),
248+
"fake".to_owned(),
249+
)],
250+
);
251+
let task = core.poll_workflow_activation().await.unwrap();
252+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
253+
task.run_id,
254+
vec![ScheduleActivity {
255+
seq: 1,
256+
activity_id: "1".to_string(),
257+
activity_type: "say_hello".to_string(),
258+
..Default::default()
259+
}
260+
.into()],
261+
))
262+
.await
263+
.unwrap();
264+
let task = core.poll_workflow_activation().await.unwrap();
265+
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
266+
.await
267+
.unwrap();
268+
let task = core.poll_workflow_activation().await.unwrap();
269+
assert!(task.eviction_reason().is_some());
270+
}
271+
222272
fn timers_wf(num_timers: u32) -> WorkflowFunction {
223273
WorkflowFunction::new(move |ctx: WfContext| async move {
224274
for _ in 1..=num_timers {

0 commit comments

Comments
 (0)