Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 102 additions & 3 deletions core/src/worker/workflow/machines/nexus_operation_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use temporal_sdk_core_protos::{
enums::v1::{CommandType, EventType},
failure::v1::{self as failure, Failure, failure::FailureInfo},
history::v1::{
NexusOperationCancelRequestCompletedEventAttributes,
NexusOperationCancelRequestFailedEventAttributes,
NexusOperationCanceledEventAttributes, NexusOperationCompletedEventAttributes,
NexusOperationFailedEventAttributes, NexusOperationStartedEventAttributes,
NexusOperationTimedOutEventAttributes, history_event,
Expand Down Expand Up @@ -63,6 +65,12 @@ fsm! {
--(NexusOperationFailed(NexusOperationFailedEventAttributes), on_failed)--> Failed;
Started
--(NexusOperationCanceled(NexusOperationCanceledEventAttributes), on_canceled)--> Cancelled;


Started --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), shared on_cancel_request_completed)--> Started;
Started --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), shared on_cancel_request_completed)--> Cancelled;
Started --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes), shared on_cancel_request_failed)--> Started;

Started
--(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), on_timed_out)--> TimedOut;

Expand All @@ -72,12 +80,20 @@ fsm! {
Cancelled --(NexusOperationCompleted(NexusOperationCompletedEventAttributes), shared on_completed)--> Cancelled;
Cancelled --(NexusOperationFailed(NexusOperationFailedEventAttributes), shared on_failed)--> Cancelled;
Cancelled --(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), shared on_timed_out)--> Cancelled;
Cancelled --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), on_cancel_request_completed)--> Cancelled;
Cancelled --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Cancelled;
Cancelled --(NexusOperationCanceled(NexusOperationCanceledEventAttributes))--> Cancelled;

// Ignore cancels in all terminal states
Completed --(Cancel)--> Completed;
Failed --(Cancel)--> Failed;
TimedOut --(Cancel)--> TimedOut;
Completed --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes))--> Completed;
Failed --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes))--> Failed;
TimedOut --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes))--> TimedOut;
Completed --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Completed;
Failed --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Failed;
TimedOut --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> TimedOut;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[I haven't yet proved that I can make these transitions happen in tests.]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine. Some of these "if it happens just ignore" transitions can be like that.

}

#[derive(Debug, derive_more::Display)]
Expand Down Expand Up @@ -311,6 +327,45 @@ impl Started {
)])
}

pub(super) fn on_cancel_request_completed(
self,
ss: &mut SharedState,
_: NexusOperationCancelRequestCompletedEventAttributes,
) -> NexusOperationMachineTransition<StartedOrCancelled> {
if ss.cancel_type == NexusOperationCancellationType::WaitCancellationRequested {
TransitionResult::ok(
[NexusOperationCommand::Cancel(ss.cancelled_failure(
"Nexus operation cancellation request completed".to_owned(),
))],
StartedOrCancelled::Cancelled(Default::default()),
)
} else {
TransitionResult::ok([], StartedOrCancelled::Started(Default::default()))
}
}

pub(super) fn on_cancel_request_failed(
self,
ss: &mut SharedState,
fa: NexusOperationCancelRequestFailedEventAttributes,
) -> NexusOperationMachineTransition<Started> {
if ss.cancel_type == NexusOperationCancellationType::WaitCancellationRequested {
let message = "Nexus operation cancellation request failed".to_string();
TransitionResult::ok(
[NexusOperationCommand::Fail(ss.failure(
message.clone(),
fa.failure.unwrap_or_else(|| Failure {
message,
..Default::default()
}),
))],
self,
)
} else {
TransitionResult::ok([], self)
}
}

pub(super) fn on_timed_out(
self,
toa: NexusOperationTimedOutEventAttributes,
Expand Down Expand Up @@ -356,6 +411,13 @@ impl Cancelled {
NexusOperationMachineTransition::ok([], self)
}

pub(super) fn on_cancel_request_completed(
self,
_: NexusOperationCancelRequestCompletedEventAttributes,
) -> NexusOperationMachineTransition<Cancelled> {
NexusOperationMachineTransition::ok([], self)
}

pub(super) fn on_failed(
self,
ss: &mut SharedState,
Expand Down Expand Up @@ -454,6 +516,36 @@ impl TryFrom<HistEventData> for NexusOperationMachineEvents {
}
}
Ok(EventType::NexusOperationCancelRequested) => Self::NexusOperationCancelRequested,
Ok(EventType::NexusOperationCancelRequestCompleted) => {
if let Some(
history_event::Attributes::NexusOperationCancelRequestCompletedEventAttributes(
attrs,
),
) = e.attributes
{
Self::NexusOperationCancelRequestCompleted(attrs)
} else {
return Err(WFMachinesError::Nondeterminism(
"NexusOperationCancelRequestCompleted attributes were unset or malformed"
.to_string(),
));
}
}
Ok(EventType::NexusOperationCancelRequestFailed) => {
if let Some(
history_event::Attributes::NexusOperationCancelRequestFailedEventAttributes(
attrs,
),
) = e.attributes
{
Self::NexusOperationCancelRequestFailed(attrs)
} else {
return Err(WFMachinesError::Nondeterminism(
"NexusOperationCancelRequestFailed attributes were unset or malformed"
.to_string(),
));
}
}
_ => {
return Err(WFMachinesError::Nondeterminism(format!(
"Nexus operation machine does not handle this event: {e:?}"
Expand Down Expand Up @@ -599,12 +691,19 @@ impl TryFrom<CommandType> for NexusOperationMachineEvents {

impl SharedState {
fn cancelled_failure(&self, message: String) -> Failure {
Failure {
self.failure(
message,
cause: Some(Box::new(Failure {
Failure {
failure_info: Some(FailureInfo::CanceledFailureInfo(Default::default())),
..Default::default()
})),
},
)
}

fn failure(&self, message: String, cause: Failure) -> Failure {
Failure {
message,
cause: Some(Box::new(cause)),
failure_info: Some(FailureInfo::NexusOperationExecutionFailureInfo(
failure::NexusOperationFailureInfo {
scheduled_event_id: self.scheduled_event_id,
Expand Down
7 changes: 6 additions & 1 deletion fsm/rustfsm_procmacro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ impl StateMachineDefinition {
}
};
let mut multi_dest_enums = vec![];
let mut multi_dest_enum_names = HashSet::new();
let state_branches: Vec<_> = statemap.into_iter().map(|(from, transitions)| {
let occupied_current_state = quote! { Some(#state_enum_name::#from(state_data)) };
// Merge transition dest states with the same handler
Expand Down Expand Up @@ -468,7 +469,11 @@ impl StateMachineDefinition {
}
}
};
multi_dest_enums.push(multi_dest_enum);
// Deduplicate; two different events may each result in a transition
// set with the same set of dest states
if multi_dest_enum_names.insert(enum_ident.clone()) {
multi_dest_enums.push(multi_dest_enum);
}
quote! {
#transition_result_name<#enum_ident>
}
Expand Down
10 changes: 8 additions & 2 deletions tests/integ_tests/workflow_tests/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,8 +765,14 @@ async fn nexus_cancellation_types(
);
assert_eq!(f.message, "Nexus operation cancelled after starting");
}
NexusOperationCancellationType::WaitCancellationRequested
| NexusOperationCancellationType::WaitCancellationCompleted => {
NexusOperationCancellationType::WaitCancellationRequested => {
let f = assert_matches!(
res.status,
Some(nexus_operation_result::Status::Cancelled(f)) => f
);
assert_eq!(f.message, "Nexus operation cancellation request completed");
}
NexusOperationCancellationType::WaitCancellationCompleted => {
let f = assert_matches!(
res.status,
Some(nexus_operation_result::Status::Cancelled(f)) => f
Expand Down
2 changes: 2 additions & 0 deletions tests/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ async fn main() -> Result<(), anyhow::Error> {
"frontend.workerVersioningDataAPIs=true".to_owned(),
"--dynamic-config-value".to_owned(),
"system.enableDeploymentVersions=true".to_owned(),
"--dynamic-config-value".to_owned(),
"component.nexusoperations.recordCancelRequestCompletionEvents=true".to_owned(),
"--http-port".to_string(),
"7243".to_string(),
"--search-attribute".to_string(),
Expand Down
Loading