Skip to content

Commit 28f627b

Browse files
committed
Fix bug where an abandoned child which was cancelled externally errors (#524)
when the event is processed after cancelling inside the workflow.
1 parent df8f0b7 commit 28f627b

File tree

6 files changed

+152
-20
lines changed

6 files changed

+152
-20
lines changed

core/src/core_tests/child_workflows.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,12 @@ async fn parent_cancels_child_wf(ctx: WfContext) -> WorkflowResult<()> {
8888
.await
8989
.into_started()
9090
.expect("Child should get started");
91-
let cancel_fut = start_res.cancel(&ctx);
92-
let resfut = start_res.result();
93-
let (cancel_res, res) = join!(cancel_fut, resfut);
94-
cancel_res.expect("cancel result is ok");
95-
let stat = res.status.expect("child wf result is ok");
91+
start_res.cancel(&ctx);
92+
let stat = start_res
93+
.result()
94+
.await
95+
.status
96+
.expect("child wf result is ok");
9697
assert_matches!(stat, child_workflow_result::Status::Cancelled(_));
9798
Ok(().into())
9899
}

core/src/worker/workflow/machines/child_workflow_state_machine.rs

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ fsm! {
7474

7575
// Ignore any spurious cancellations after resolution
7676
Cancelled --(Cancel) --> Cancelled;
77+
Cancelled --(ChildWorkflowExecutionCancelled,
78+
on_child_workflow_execution_cancelled) --> Cancelled;
7779
Failed --(Cancel) --> Failed;
7880
StartFailed --(Cancel) --> StartFailed;
7981
TimedOut --(Cancel) --> TimedOut;
@@ -111,7 +113,30 @@ pub(super) struct ChildWorkflowInitiatedData {
111113
}
112114

113115
#[derive(Default, Clone)]
114-
pub(super) struct Cancelled {}
116+
pub(super) struct Cancelled {
117+
seen_cancelled_event: bool,
118+
}
119+
120+
impl Cancelled {
121+
pub(super) fn on_child_workflow_execution_cancelled(
122+
self,
123+
) -> ChildWorkflowMachineTransition<Cancelled> {
124+
if self.seen_cancelled_event {
125+
ChildWorkflowMachineTransition::Err(WFMachinesError::Fatal(
126+
"Child workflow has already seen a ChildWorkflowExecutionCanceledEvent, and now \
127+
another is being applied! This is a bug, please report."
128+
.to_string(),
129+
))
130+
} else {
131+
ChildWorkflowMachineTransition::ok(
132+
[],
133+
Cancelled {
134+
seen_cancelled_event: true,
135+
},
136+
)
137+
}
138+
}
139+
}
115140

116141
#[derive(Default, Clone)]
117142
pub(super) struct Completed {}
@@ -263,7 +288,12 @@ impl Started {
263288
)
264289
}
265290
fn on_child_workflow_execution_cancelled(self) -> ChildWorkflowMachineTransition<Cancelled> {
266-
ChildWorkflowMachineTransition::ok(vec![ChildWorkflowCommand::Cancel], Cancelled::default())
291+
ChildWorkflowMachineTransition::ok(
292+
vec![ChildWorkflowCommand::Cancel],
293+
Cancelled {
294+
seen_cancelled_event: true,
295+
},
296+
)
267297
}
268298
fn on_child_workflow_execution_terminated(
269299
self,
@@ -870,7 +900,9 @@ mod test {
870900
#[test]
871901
fn cancels_ignored_terminal() {
872902
for state in [
873-
ChildWorkflowMachineState::Cancelled(Cancelled {}),
903+
ChildWorkflowMachineState::Cancelled(Cancelled {
904+
seen_cancelled_event: false,
905+
}),
874906
Failed {}.into(),
875907
StartFailed {}.into(),
876908
TimedOut {}.into(),

sdk/src/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,11 +549,13 @@ pub enum TimerResult {
549549
}
550550

551551
/// Successful result of sending a signal to an external workflow
552+
#[derive(Debug)]
552553
pub struct SignalExternalOk;
553554
/// Result of awaiting on sending a signal to an external workflow
554555
pub type SignalExternalWfResult = Result<SignalExternalOk, Failure>;
555556

556557
/// Successful result of sending a cancel request to an external workflow
558+
#[derive(Debug)]
557559
pub struct CancelExternalOk;
558560
/// Result of awaiting on sending a cancel request to an external workflow
559561
pub type CancelExternalWfResult = Result<CancelExternalOk, Failure>;
@@ -656,6 +658,20 @@ pub enum CancellableID {
656658
},
657659
}
658660

661+
impl CancellableID {
662+
/// Returns the type-specific sequence number used for this command
663+
pub fn seq_num(&self) -> u32 {
664+
match self {
665+
CancellableID::Timer(seq) => *seq,
666+
CancellableID::Activity(seq) => *seq,
667+
CancellableID::LocalActivity(seq) => *seq,
668+
CancellableID::ChildWorkflow(seq) => *seq,
669+
CancellableID::SignalExternalWorkflow(seq) => *seq,
670+
CancellableID::ExternalWorkflow { seqnum, .. } => *seqnum,
671+
}
672+
}
673+
}
674+
659675
#[derive(derive_more::From)]
660676
#[allow(clippy::large_enum_variant)]
661677
enum RustWfCmd {

sdk/src/workflow_context.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ use temporal_sdk_core_protos::{
3434
workflow_commands::{
3535
request_cancel_external_workflow_execution as cancel_we,
3636
signal_external_workflow_execution as sig_we, workflow_command,
37-
ModifyWorkflowProperties, RequestCancelExternalWorkflowExecution, SetPatchMarker,
37+
CancelChildWorkflowExecution, ModifyWorkflowProperties,
38+
RequestCancelExternalWorkflowExecution, SetPatchMarker,
3839
SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes,
3940
},
4041
},
@@ -679,13 +680,13 @@ impl StartedChildWorkflow {
679680
}
680681

681682
/// Cancel the child workflow
682-
pub fn cancel(&self, cx: &WfContext) -> impl Future<Output = CancelExternalWfResult> {
683-
let target = NamespacedWorkflowExecution {
684-
namespace: cx.namespace().to_string(),
685-
workflow_id: self.common.workflow_id.clone(),
686-
..Default::default()
687-
};
688-
cx.cancel_external(target)
683+
pub fn cancel(&self, cx: &WfContext) {
684+
cx.send(RustWfCmd::NewNonblockingCmd(
685+
CancelChildWorkflowExecution {
686+
child_workflow_seq: self.common.result_future.cancellable_id.seq_num(),
687+
}
688+
.into(),
689+
));
689690
}
690691

691692
/// Signal the child workflow

sdk/src/workflow_context/options.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use temporal_sdk_core_protos::{
99
StartChildWorkflowExecution,
1010
},
1111
},
12-
temporal::api::common::v1::{Payload, RetryPolicy},
12+
temporal::api::{
13+
common::v1::{Payload, RetryPolicy},
14+
enums::v1::ParentClosePolicy,
15+
},
1316
};
1417

1518
// TODO: Before release, probably best to avoid using proto types entirely here. They're awkward.
@@ -180,6 +183,8 @@ pub struct ChildWorkflowOptions {
180183
pub cancel_type: ChildWorkflowCancellationType,
181184
/// Common options
182185
pub options: WorkflowOptions,
186+
/// How to respond to parent workflow ending
187+
pub parent_close_policy: ParentClosePolicy,
183188
}
184189

185190
impl IntoWorkflowCommand for ChildWorkflowOptions {
@@ -203,6 +208,7 @@ impl IntoWorkflowCommand for ChildWorkflowOptions {
203208
workflow_task_timeout: self.options.task_timeout.and_then(|d| d.try_into().ok()),
204209
search_attributes: self.options.search_attributes.unwrap_or_default(),
205210
cron_schedule: self.options.cron_schedule.unwrap_or_default(),
211+
parent_close_policy: self.parent_close_policy as i32,
206212
..Default::default()
207213
}
208214
}

tests/integ_tests/workflow_tests/child_workflows.rs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
use anyhow::anyhow;
2-
use temporal_client::WorkflowOptions;
3-
use temporal_sdk::{ChildWorkflowOptions, WfContext, WorkflowResult};
4-
use temporal_sdk_core_protos::coresdk::child_workflow::{child_workflow_result, Success};
2+
use std::time::Duration;
3+
use temporal_client::{WorkflowClientTrait, WorkflowOptions};
4+
use temporal_sdk::{ChildWorkflowOptions, WfContext, WfExitValue, WorkflowResult};
5+
use temporal_sdk_core_protos::{
6+
coresdk::child_workflow::{child_workflow_result, ChildWorkflowCancellationType, Success},
7+
temporal::api::enums::v1::ParentClosePolicy,
8+
};
59
use temporal_sdk_core_test_utils::CoreWfStarter;
10+
use tokio::sync::Barrier;
611

712
static PARENT_WF_TYPE: &str = "parent_wf";
813
static CHILD_WF_TYPE: &str = "child_wf";
@@ -49,3 +54,74 @@ async fn child_workflow_happy_path() {
4954
.unwrap();
5055
worker.run_until_done().await.unwrap();
5156
}
57+
58+
#[tokio::test]
59+
async fn abandoned_child_bug_repro() {
60+
let mut starter = CoreWfStarter::new("child-workflow-abandon-bug");
61+
starter.no_remote_activities();
62+
let mut worker = starter.worker().await;
63+
let barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2)));
64+
65+
worker.register_wf(
66+
PARENT_WF_TYPE.to_string(),
67+
move |mut ctx: WfContext| async move {
68+
let child = ctx.child_workflow(ChildWorkflowOptions {
69+
workflow_id: "abandoned-child".to_owned(),
70+
workflow_type: CHILD_WF_TYPE.to_owned(),
71+
parent_close_policy: ParentClosePolicy::Abandon,
72+
cancel_type: ChildWorkflowCancellationType::Abandon,
73+
..Default::default()
74+
});
75+
76+
let started = child
77+
.start(&ctx)
78+
.await
79+
.into_started()
80+
.expect("Child chould start OK");
81+
barr.wait().await;
82+
// Wait for cancel signal
83+
ctx.cancelled().await;
84+
// Cancel the child immediately
85+
started.cancel(&ctx);
86+
// Need to do something else, so we'll see the ChildWorkflowExecutionCanceled event
87+
ctx.timer(Duration::from_secs(1)).await;
88+
started.result().await;
89+
Ok(().into())
90+
},
91+
);
92+
worker.register_wf(CHILD_WF_TYPE.to_string(), |mut ctx: WfContext| async move {
93+
ctx.cancelled().await;
94+
Ok(WfExitValue::Cancelled)
95+
});
96+
97+
worker
98+
.submit_wf(
99+
"parent-abandoner".to_string(),
100+
PARENT_WF_TYPE.to_owned(),
101+
vec![],
102+
WorkflowOptions::default(),
103+
)
104+
.await
105+
.unwrap();
106+
let client = starter.get_client().await;
107+
let canceller = async {
108+
barr.wait().await;
109+
client
110+
.cancel_workflow_execution(
111+
"parent-abandoner".to_string(),
112+
None,
113+
"die".to_string(),
114+
None,
115+
)
116+
.await
117+
.unwrap();
118+
client
119+
.cancel_workflow_execution("abandoned-child".to_string(), None, "die".to_string(), None)
120+
.await
121+
.unwrap();
122+
};
123+
let runner = async move {
124+
worker.run_until_done().await.unwrap();
125+
};
126+
tokio::join!(canceller, runner);
127+
}

0 commit comments

Comments
 (0)