Skip to content

Commit f1ae39a

Browse files
authored
Nexus cancellation types (#927)
* Adding different Nexus cancellation type behaviors * Add additional comment info, remove debug logs * Formatting * A bit more PR cleanup * Remove sleeps from test * Comment * Add replay validation
1 parent 4d82754 commit f1ae39a

File tree

5 files changed

+395
-19
lines changed

5 files changed

+395
-19
lines changed

core/src/worker/workflow/machines/nexus_operation_state_machine.rs

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use crate::worker::workflow::{
77
};
88
use itertools::Itertools;
99
use rustfsm::{MachineError, StateMachine, TransitionResult, fsm};
10+
use temporal_sdk_core_protos::coresdk::nexus::NexusOperationCancellationType;
11+
use temporal_sdk_core_protos::temporal::api::command::v1::command;
1012
use temporal_sdk_core_protos::{
1113
coresdk::{
1214
nexus::{NexusOperationResult, nexus_operation_result},
@@ -16,7 +18,7 @@ use temporal_sdk_core_protos::{
1618
workflow_commands::ScheduleNexusOperation,
1719
},
1820
temporal::api::{
19-
command::v1::{RequestCancelNexusOperationCommandAttributes, command},
21+
command::v1::RequestCancelNexusOperationCommandAttributes,
2022
common::v1::Payload,
2123
enums::v1::{CommandType, EventType},
2224
failure::v1::{self as failure, Failure, failure::FailureInfo},
@@ -54,6 +56,7 @@ fsm! {
5456
--(NexusOperationStarted(NexusOperationStartedEventAttributes), on_started)--> Started;
5557

5658
Started --(Cancel, shared on_issue_cancel)--> Started;
59+
Started --(Cancel, shared on_issue_cancel)--> Cancelled;
5760
Started --(CommandRequestCancelNexusOperation)--> Started;
5861
Started --(NexusOperationCancelRequested)--> Started;
5962
Started
@@ -65,10 +68,17 @@ fsm! {
6568
Started
6669
--(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), on_timed_out)--> TimedOut;
6770

71+
Cancelled --(Cancel)--> Cancelled;
72+
Cancelled --(CommandRequestCancelNexusOperation)--> Cancelled;
73+
Cancelled --(NexusOperationCancelRequested)--> Cancelled;
74+
Cancelled --(NexusOperationCompleted(NexusOperationCompletedEventAttributes), shared on_completed)--> Cancelled;
75+
Cancelled --(NexusOperationFailed(NexusOperationFailedEventAttributes), shared on_failed)--> Cancelled;
76+
Cancelled --(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), shared on_timed_out)--> Cancelled;
77+
Cancelled --(NexusOperationCanceled(NexusOperationCanceledEventAttributes))--> Cancelled;
78+
6879
// Ignore cancels in all terminal states
6980
Completed --(Cancel)--> Completed;
7081
Failed --(Cancel)--> Failed;
71-
Cancelled --(Cancel)--> Cancelled;
7282
TimedOut --(Cancel)--> TimedOut;
7383
}
7484

@@ -102,6 +112,7 @@ pub(super) struct SharedState {
102112

103113
cancelled_before_sent: bool,
104114
cancel_sent: bool,
115+
cancel_type: NexusOperationCancellationType,
105116
}
106117

107118
impl NexusOperationMachine {
@@ -116,6 +127,7 @@ impl NexusOperationMachine {
116127
operation: attribs.operation.clone(),
117128
cancelled_before_sent: false,
118129
cancel_sent: false,
130+
cancel_type: attribs.cancellation_type(),
119131
},
120132
);
121133
NewMachineWithCommand {
@@ -249,12 +261,20 @@ impl Started {
249261
pub(crate) fn on_issue_cancel(
250262
&self,
251263
ss: &mut SharedState,
252-
) -> NexusOperationMachineTransition<Started> {
264+
) -> NexusOperationMachineTransition<StartedOrCancelled> {
253265
if !ss.cancel_sent {
254266
ss.cancel_sent = true;
255-
NexusOperationMachineTransition::commands([NexusOperationCommand::IssueCancel])
267+
let dest = if matches!(
268+
ss.cancel_type,
269+
NexusOperationCancellationType::Abandon | NexusOperationCancellationType::TryCancel
270+
) {
271+
StartedOrCancelled::Cancelled(Default::default())
272+
} else {
273+
StartedOrCancelled::Started(Default::default())
274+
};
275+
TransitionResult::ok([NexusOperationCommand::IssueCancel], dest)
256276
} else {
257-
NexusOperationMachineTransition::default()
277+
TransitionResult::ok([], StartedOrCancelled::Started(Default::default()))
258278
}
259279
}
260280

@@ -315,6 +335,49 @@ pub(super) struct TimedOut;
315335
#[derive(Default, Clone)]
316336
pub(super) struct Cancelled;
317337

338+
fn completion_of_not_abandoned_err() -> WFMachinesError {
339+
WFMachinesError::Nondeterminism(
340+
"Nexus operation which don't have the ABANDON cancellation type cannot complete after \
341+
being cancelled."
342+
.to_string(),
343+
)
344+
}
345+
346+
impl Cancelled {
347+
pub(super) fn on_completed(
348+
self,
349+
ss: &mut SharedState,
350+
_: NexusOperationCompletedEventAttributes,
351+
) -> NexusOperationMachineTransition<Cancelled> {
352+
if ss.cancel_type == NexusOperationCancellationType::Abandon {
353+
return NexusOperationMachineTransition::Err(completion_of_not_abandoned_err());
354+
}
355+
NexusOperationMachineTransition::ok([], self)
356+
}
357+
358+
pub(super) fn on_failed(
359+
self,
360+
ss: &mut SharedState,
361+
_: NexusOperationFailedEventAttributes,
362+
) -> NexusOperationMachineTransition<Cancelled> {
363+
if ss.cancel_type == NexusOperationCancellationType::Abandon {
364+
return NexusOperationMachineTransition::Err(completion_of_not_abandoned_err());
365+
}
366+
NexusOperationMachineTransition::ok([], self)
367+
}
368+
369+
pub(super) fn on_timed_out(
370+
self,
371+
ss: &mut SharedState,
372+
_: NexusOperationTimedOutEventAttributes,
373+
) -> NexusOperationMachineTransition<Cancelled> {
374+
if ss.cancel_type == NexusOperationCancellationType::Abandon {
375+
return NexusOperationMachineTransition::Err(completion_of_not_abandoned_err());
376+
}
377+
NexusOperationMachineTransition::ok([], self)
378+
}
379+
}
380+
318381
impl TryFrom<HistEventData> for NexusOperationMachineEvents {
319382
type Error = WFMachinesError;
320383

@@ -497,14 +560,38 @@ impl WFMachinesAdapter for NexusOperationMachine {
497560
]
498561
}
499562
NexusOperationCommand::IssueCancel => {
500-
vec![MachineResponse::IssueNewCommand(
501-
command::Attributes::RequestCancelNexusOperationCommandAttributes(
502-
RequestCancelNexusOperationCommandAttributes {
503-
scheduled_event_id: self.shared_state.scheduled_event_id,
504-
},
563+
let mut resps = vec![];
564+
if self.shared_state.cancel_type != NexusOperationCancellationType::Abandon {
565+
resps.push(MachineResponse::IssueNewCommand(
566+
command::Attributes::RequestCancelNexusOperationCommandAttributes(
567+
RequestCancelNexusOperationCommandAttributes {
568+
scheduled_event_id: self.shared_state.scheduled_event_id,
569+
},
570+
)
571+
.into(),
572+
))
573+
}
574+
// Immediately resolve abandon/trycancel modes
575+
if matches!(
576+
self.shared_state.cancel_type,
577+
NexusOperationCancellationType::Abandon
578+
| NexusOperationCancellationType::TryCancel
579+
) {
580+
resps.push(
581+
ResolveNexusOperation {
582+
seq: self.shared_state.lang_seq_num,
583+
result: Some(NexusOperationResult {
584+
status: Some(nexus_operation_result::Status::Cancelled(
585+
self.cancelled_failure(
586+
"Nexus operation cancelled after starting".to_owned(),
587+
),
588+
)),
589+
}),
590+
}
591+
.into(),
505592
)
506-
.into(),
507-
)]
593+
}
594+
resps
508595
}
509596
})
510597
}

sdk-core-protos/protos/local/temporal/sdk/core/nexus/nexus.proto

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,19 @@ enum NexusTaskCancelReason {
6868
TIMED_OUT = 0;
6969
// The worker is shutting down
7070
WORKER_SHUTDOWN = 1;
71-
}
71+
}
72+
73+
// Controls at which point to report back to lang when a nexus operation is cancelled
74+
enum NexusOperationCancellationType {
75+
// Do not request cancellation of the nexus operation if already scheduled
76+
ABANDON = 0;
77+
78+
// Initiate a cancellation request for the Nexus operation and immediately report cancellation
79+
// to the caller. Note that it doesn't guarantee that cancellation is delivered to the operation if calling workflow exits before the delivery is done.
80+
// If you want to ensure that cancellation is delivered to the operation, use WAIT_CANCELLATION_REQUESTED.
81+
TRY_CANCEL = 1;
82+
// Request cancellation of the operation and wait for confirmation that the request was received.
83+
WAIT_CANCELLATION_REQUESTED = 2;
84+
// Wait for operation cancellation completion. Default.
85+
WAIT_CANCELLATION_COMPLETED = 3;
86+
}

sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import "temporal/api/enums/v1/workflow.proto";
1616
import "temporal/api/failure/v1/message.proto";
1717
import "temporal/api/sdk/v1/user_metadata.proto";
1818
import "temporal/sdk/core/child_workflow/child_workflow.proto";
19+
import "temporal/sdk/core/nexus/nexus.proto";
1920
import "temporal/sdk/core/common/common.proto";
2021

2122
message WorkflowCommand {
@@ -375,6 +376,8 @@ message ScheduleNexusOperation {
375376
// activities and child workflows, these are transmitted to Nexus operations that may be
376377
// external and are not traditional payloads.
377378
map<string, string> nexus_header = 7;
379+
// Defines behaviour of the underlying nexus operation when operation cancellation has been requested.
380+
nexus.NexusOperationCancellationType cancellation_type = 8;
378381
}
379382

380383
// Request cancellation of a nexus operation started via `ScheduleNexusOperation`

sdk/src/workflow_context/options.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{collections::HashMap, time::Duration};
22

33
use temporal_client::{Priority, WorkflowOptions};
4+
use temporal_sdk_core_protos::coresdk::nexus::NexusOperationCancellationType;
45
use temporal_sdk_core_protos::{
56
coresdk::{
67
child_workflow::ChildWorkflowCancellationType,
@@ -397,6 +398,8 @@ pub struct NexusOperationOptions {
397398
/// activities and child workflows, these are transmitted to Nexus operations that may be
398399
/// external and are not traditional payloads.
399400
pub nexus_header: HashMap<String, String>,
401+
/// Cancellation type for the operation
402+
pub cancellation_type: Option<NexusOperationCancellationType>,
400403
}
401404

402405
impl IntoWorkflowCommand for NexusOperationOptions {
@@ -414,6 +417,10 @@ impl IntoWorkflowCommand for NexusOperationOptions {
414417
.schedule_to_close_timeout
415418
.and_then(|t| t.try_into().ok()),
416419
nexus_header: self.nexus_header,
420+
cancellation_type: self
421+
.cancellation_type
422+
.unwrap_or(NexusOperationCancellationType::WaitCancellationCompleted)
423+
.into(),
417424
}
418425
.into(),
419426
),

0 commit comments

Comments
 (0)