Skip to content

Commit f673f1d

Browse files
authored
Fix deprecated patch removal when no subequent command (#671)
1 parent a56b28d commit f673f1d

File tree

3 files changed

+64
-14
lines changed

3 files changed

+64
-14
lines changed

client/src/workflow_handle/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ where
9191
}
9292
}
9393

94+
pub fn info(&self) -> &WorkflowExecutionInfo {
95+
&self.info
96+
}
97+
9498
pub async fn get_workflow_result(
9599
&self,
96100
opts: GetWorkflowResultOpts,

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -861,16 +861,14 @@ impl WorkflowMachines {
861861
let event_id = event.event_id;
862862

863863
let consumed_cmd = loop {
864-
if let Some(peek_machine) = self.commands.front() {
865-
let mach = self.machine(peek_machine.machine);
866-
match patch_marker_handling(event, mach, next_event)? {
867-
EventHandlingOutcome::SkipCommand => {
868-
self.commands.pop_front();
869-
continue;
870-
}
871-
eho @ EventHandlingOutcome::SkipEvent { .. } => return Ok(eho),
872-
EventHandlingOutcome::Normal => {}
864+
let maybe_machine = self.commands.front().map(|mk| self.machine(mk.machine));
865+
match patch_marker_handling(event, maybe_machine, next_event)? {
866+
EventHandlingOutcome::SkipCommand => {
867+
self.commands.pop_front();
868+
continue;
873869
}
870+
eho @ EventHandlingOutcome::SkipEvent { .. } => return Ok(eho),
871+
EventHandlingOutcome::Normal => {}
874872
}
875873

876874
let maybe_command = self.commands.pop_front();
@@ -1598,11 +1596,11 @@ enum EventHandlingOutcome {
15981596
/// [WorkflowMachines::handle_command_event]
15991597
fn patch_marker_handling(
16001598
event: &HistoryEvent,
1601-
mach: &Machines,
1599+
mach: Option<&Machines>,
16021600
next_event: Option<&HistoryEvent>,
16031601
) -> Result<EventHandlingOutcome> {
16041602
let patch_machine = match mach {
1605-
Machines::PatchMachine(pm) => Some(pm),
1603+
Some(Machines::PatchMachine(pm)) => Some(pm),
16061604
_ => None,
16071605
};
16081606
let patch_details = event.get_patch_marker_details();
@@ -1633,9 +1631,9 @@ fn patch_marker_handling(
16331631
Ok(EventHandlingOutcome::Normal)
16341632
}
16351633
} else {
1636-
// Version markers can be skipped in the event they are deprecated
1637-
// Is deprecated. We can simply ignore this event, as deprecated change
1638-
// markers are allowed without matching changed calls.
1634+
// Version markers can be skipped in the event they are deprecated. We can simply
1635+
// ignore this event, as deprecated change markers are allowed without matching changed
1636+
// calls.
16391637
if deprecated {
16401638
debug!("Deprecated patch marker tried against non-patch machine, skipping.");
16411639
skip_one_or_two_events(next_event)

tests/integ_tests/workflow_tests/patches.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ use std::{
55
},
66
time::Duration,
77
};
8+
use temporal_client::WorkflowClientTrait;
9+
use tokio::{join, sync::Notify};
10+
use tokio_stream::StreamExt;
811

912
use temporal_sdk::{WfContext, WorkflowResult};
1013
use temporal_sdk_core_test_utils::CoreWfStarter;
@@ -151,3 +154,48 @@ async fn can_remove_deprecated_patch_near_other_patch() {
151154
starter.start_with_worker(wf_name, &mut worker).await;
152155
worker.run_until_done().await.unwrap();
153156
}
157+
158+
#[tokio::test]
159+
async fn deprecated_patch_removal() {
160+
let wf_name = "deprecated_patch_removal";
161+
let mut starter = CoreWfStarter::new(wf_name);
162+
starter.no_remote_activities();
163+
let mut worker = starter.worker().await;
164+
let client = starter.get_client().await;
165+
let wf_id = starter.get_task_queue().to_string();
166+
let did_die = Arc::new(AtomicBool::new(false));
167+
let send_sig = Arc::new(Notify::new());
168+
let send_sig_c = send_sig.clone();
169+
worker.register_wf(wf_name, move |ctx: WfContext| {
170+
let did_die = did_die.clone();
171+
let send_sig_c = send_sig_c.clone();
172+
async move {
173+
if !did_die.load(Ordering::Acquire) {
174+
assert!(ctx.deprecate_patch("getting-deprecated"));
175+
}
176+
send_sig_c.notify_one();
177+
ctx.make_signal_channel("sig").next().await;
178+
179+
ctx.timer(Duration::from_millis(1)).await;
180+
181+
if !did_die.load(Ordering::Acquire) {
182+
did_die.store(true, Ordering::Release);
183+
ctx.force_task_fail(anyhow::anyhow!("i'm ded"));
184+
}
185+
Ok(().into())
186+
}
187+
});
188+
189+
starter.start_with_worker(wf_name, &mut worker).await;
190+
let sig_fut = async {
191+
send_sig.notified().await;
192+
client
193+
.signal_workflow_execution(wf_id, "".to_string(), "sig".to_string(), None, None)
194+
.await
195+
.unwrap()
196+
};
197+
let run_fut = async {
198+
worker.run_until_done().await.unwrap();
199+
};
200+
join!(sig_fut, run_fut);
201+
}

0 commit comments

Comments
 (0)