diff --git a/core/src/worker/workflow/machines/nexus_operation_state_machine.rs b/core/src/worker/workflow/machines/nexus_operation_state_machine.rs index a87af524a..ea1ba1c3a 100644 --- a/core/src/worker/workflow/machines/nexus_operation_state_machine.rs +++ b/core/src/worker/workflow/machines/nexus_operation_state_machine.rs @@ -21,6 +21,8 @@ use temporal_sdk_core_protos::{ enums::v1::{CommandType, EventType}, failure::v1::{self as failure, Failure, failure::FailureInfo}, history::v1::{ + NexusOperationCancelRequestCompletedEventAttributes, + NexusOperationCancelRequestFailedEventAttributes, NexusOperationCanceledEventAttributes, NexusOperationCompletedEventAttributes, NexusOperationFailedEventAttributes, NexusOperationStartedEventAttributes, NexusOperationTimedOutEventAttributes, history_event, @@ -63,6 +65,12 @@ fsm! { --(NexusOperationFailed(NexusOperationFailedEventAttributes), on_failed)--> Failed; Started --(NexusOperationCanceled(NexusOperationCanceledEventAttributes), on_canceled)--> Cancelled; + + + Started --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), shared on_cancel_request_completed)--> Started; + Started --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), shared on_cancel_request_completed)--> Cancelled; + Started --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes), shared on_cancel_request_failed)--> Started; + Started --(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), on_timed_out)--> TimedOut; @@ -72,12 +80,20 @@ fsm! { Cancelled --(NexusOperationCompleted(NexusOperationCompletedEventAttributes), shared on_completed)--> Cancelled; Cancelled --(NexusOperationFailed(NexusOperationFailedEventAttributes), shared on_failed)--> Cancelled; Cancelled --(NexusOperationTimedOut(NexusOperationTimedOutEventAttributes), shared on_timed_out)--> Cancelled; + Cancelled --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes), on_cancel_request_completed)--> Cancelled; + Cancelled --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Cancelled; Cancelled --(NexusOperationCanceled(NexusOperationCanceledEventAttributes))--> Cancelled; // Ignore cancels in all terminal states Completed --(Cancel)--> Completed; Failed --(Cancel)--> Failed; TimedOut --(Cancel)--> TimedOut; + Completed --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes))--> Completed; + Failed --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes))--> Failed; + TimedOut --(NexusOperationCancelRequestCompleted(NexusOperationCancelRequestCompletedEventAttributes))--> TimedOut; + Completed --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Completed; + Failed --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> Failed; + TimedOut --(NexusOperationCancelRequestFailed(NexusOperationCancelRequestFailedEventAttributes))--> TimedOut; } #[derive(Debug, derive_more::Display)] @@ -311,6 +327,45 @@ impl Started { )]) } + pub(super) fn on_cancel_request_completed( + self, + ss: &mut SharedState, + _: NexusOperationCancelRequestCompletedEventAttributes, + ) -> NexusOperationMachineTransition { + if ss.cancel_type == NexusOperationCancellationType::WaitCancellationRequested { + TransitionResult::ok( + [NexusOperationCommand::Cancel(ss.cancelled_failure( + "Nexus operation cancellation request completed".to_owned(), + ))], + StartedOrCancelled::Cancelled(Default::default()), + ) + } else { + TransitionResult::ok([], StartedOrCancelled::Started(Default::default())) + } + } + + pub(super) fn on_cancel_request_failed( + self, + ss: &mut SharedState, + fa: NexusOperationCancelRequestFailedEventAttributes, + ) -> NexusOperationMachineTransition { + if ss.cancel_type == NexusOperationCancellationType::WaitCancellationRequested { + let message = "Nexus operation cancellation request failed".to_string(); + TransitionResult::ok( + [NexusOperationCommand::Fail(ss.failure( + message.clone(), + fa.failure.unwrap_or_else(|| Failure { + message, + ..Default::default() + }), + ))], + self, + ) + } else { + TransitionResult::ok([], self) + } + } + pub(super) fn on_timed_out( self, toa: NexusOperationTimedOutEventAttributes, @@ -356,6 +411,13 @@ impl Cancelled { NexusOperationMachineTransition::ok([], self) } + pub(super) fn on_cancel_request_completed( + self, + _: NexusOperationCancelRequestCompletedEventAttributes, + ) -> NexusOperationMachineTransition { + NexusOperationMachineTransition::ok([], self) + } + pub(super) fn on_failed( self, ss: &mut SharedState, @@ -454,6 +516,36 @@ impl TryFrom for NexusOperationMachineEvents { } } Ok(EventType::NexusOperationCancelRequested) => Self::NexusOperationCancelRequested, + Ok(EventType::NexusOperationCancelRequestCompleted) => { + if let Some( + history_event::Attributes::NexusOperationCancelRequestCompletedEventAttributes( + attrs, + ), + ) = e.attributes + { + Self::NexusOperationCancelRequestCompleted(attrs) + } else { + return Err(WFMachinesError::Nondeterminism( + "NexusOperationCancelRequestCompleted attributes were unset or malformed" + .to_string(), + )); + } + } + Ok(EventType::NexusOperationCancelRequestFailed) => { + if let Some( + history_event::Attributes::NexusOperationCancelRequestFailedEventAttributes( + attrs, + ), + ) = e.attributes + { + Self::NexusOperationCancelRequestFailed(attrs) + } else { + return Err(WFMachinesError::Nondeterminism( + "NexusOperationCancelRequestFailed attributes were unset or malformed" + .to_string(), + )); + } + } _ => { return Err(WFMachinesError::Nondeterminism(format!( "Nexus operation machine does not handle this event: {e:?}" @@ -599,12 +691,19 @@ impl TryFrom for NexusOperationMachineEvents { impl SharedState { fn cancelled_failure(&self, message: String) -> Failure { - Failure { + self.failure( message, - cause: Some(Box::new(Failure { + Failure { failure_info: Some(FailureInfo::CanceledFailureInfo(Default::default())), ..Default::default() - })), + }, + ) + } + + fn failure(&self, message: String, cause: Failure) -> Failure { + Failure { + message, + cause: Some(Box::new(cause)), failure_info: Some(FailureInfo::NexusOperationExecutionFailureInfo( failure::NexusOperationFailureInfo { scheduled_event_id: self.scheduled_event_id, diff --git a/fsm/rustfsm_procmacro/src/lib.rs b/fsm/rustfsm_procmacro/src/lib.rs index c216a607b..e6d34017f 100644 --- a/fsm/rustfsm_procmacro/src/lib.rs +++ b/fsm/rustfsm_procmacro/src/lib.rs @@ -434,6 +434,7 @@ impl StateMachineDefinition { } }; let mut multi_dest_enums = vec![]; + let mut multi_dest_enum_names = HashSet::new(); let state_branches: Vec<_> = statemap.into_iter().map(|(from, transitions)| { let occupied_current_state = quote! { Some(#state_enum_name::#from(state_data)) }; // Merge transition dest states with the same handler @@ -468,7 +469,11 @@ impl StateMachineDefinition { } } }; - multi_dest_enums.push(multi_dest_enum); + // Deduplicate; two different events may each result in a transition + // set with the same set of dest states + if multi_dest_enum_names.insert(enum_ident.clone()) { + multi_dest_enums.push(multi_dest_enum); + } quote! { #transition_result_name<#enum_ident> } diff --git a/tests/integ_tests/workflow_tests/nexus.rs b/tests/integ_tests/workflow_tests/nexus.rs index b774bb386..dbc1d35ca 100644 --- a/tests/integ_tests/workflow_tests/nexus.rs +++ b/tests/integ_tests/workflow_tests/nexus.rs @@ -29,6 +29,7 @@ use tokio::{ join, sync::{mpsc, watch}, }; +use tokio_stream::StreamExt; #[derive(Debug, PartialEq, Eq, Clone, Copy)] enum Outcome { @@ -557,10 +558,10 @@ async fn nexus_cancellation_types( let endpoint = mk_nexus_endpoint(&mut starter).await; let schedule_to_close_timeout = Some(Duration::from_secs(5)); - let (cancel_call_completion_tx, cancel_call_completion_rx) = watch::channel(false); + let (caller_op_future_tx, caller_op_future_rx) = watch::channel(false); worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| { let endpoint = endpoint.clone(); - let cancel_call_completion_tx = cancel_call_completion_tx.clone(); + let caller_op_future_tx = caller_op_future_tx.clone(); async move { let options = NexusOperationOptions { endpoint, @@ -577,13 +578,14 @@ async fn nexus_cancellation_types( started.cancel(&ctx); let res = result.await; - cancel_call_completion_tx.send(true).unwrap(); + caller_op_future_tx.send(true).unwrap(); - // Make sure cancel after completion doesn't cause problems + // Make sure cancel after op completion doesn't cause problems started.cancel(&ctx); // We need to wait slightly so that the workflow is not complete at the same time - // cancellation is invoked. If it does, the caller workflow will close and the server won't attempt to send the cancellation to the handler + // cancellation is invoked. If it does, the caller workflow will close and the server + // won't attempt to send the cancellation to the handler ctx.timer(Duration::from_millis(1)).await; Ok(res.into()) } @@ -591,21 +593,37 @@ async fn nexus_cancellation_types( let (cancellation_wait_tx, cancellation_wait_rx) = watch::channel(false); let (cancellation_tx, mut cancellation_rx) = watch::channel(false); + let (handler_exited_tx, mut handler_exited_rx) = watch::channel(false); worker.register_wf("async_completer".to_owned(), move |ctx: WfContext| { let cancellation_tx = cancellation_tx.clone(); let mut cancellation_wait_rx = cancellation_wait_rx.clone(); + let handler_exited_tx = handler_exited_tx.clone(); async move { + // Wait for cancellation ctx.cancelled().await; cancellation_tx.send(true).unwrap(); + if cancellation_type == NexusOperationCancellationType::WaitCancellationCompleted { cancellation_wait_rx.changed().await.unwrap(); + } else if cancellation_type == NexusOperationCancellationType::WaitCancellationRequested + { + // For WAIT_REQUESTED, wait until the caller nexus op future has been resolved. This + // allows the test to verify that it resolved due to + // NexusOperationCancelRequestCompleted (written after cancel handler responds) + // rather than NexusOperationCanceled (written after handler workflow completes as + // cancelled). + let mut signal_chan = ctx.make_signal_channel("proceed-to-exit"); + signal_chan.next().await; } + + handler_exited_tx.send(true).unwrap(); Ok(WfExitValue::<()>::Cancelled) } }); let submitter = worker.get_submitter_handle(); let wf_handle = starter.start_with_worker(wf_name, &mut worker).await; let client = starter.get_client().await.get_client().clone(); + let (handler_wf_id_tx, mut handler_wf_id_rx) = tokio::sync::oneshot::channel(); let nexus_task_handle = async { let nt = core_worker.poll_nexus_task().await.unwrap().unwrap_task(); let start_req = assert_matches!( @@ -613,6 +631,7 @@ async fn nexus_cancellation_types( request::Variant::StartOperation(sr) => sr ); let completer_id = format!("completer-{}", rand_6_chars()); + let _ = handler_wf_id_tx.send(completer_id.clone()); let links = start_req .links .iter() @@ -668,16 +687,19 @@ async fn nexus_cancellation_types( match cancellation_type { NexusOperationCancellationType::WaitCancellationCompleted | NexusOperationCancellationType::WaitCancellationRequested => { - assert!(!*cancel_call_completion_rx.borrow()); + // The nexus op future should not have been resolved + assert!(!*caller_op_future_rx.borrow()); } NexusOperationCancellationType::Abandon | NexusOperationCancellationType::TryCancel => { wf_handle .get_workflow_result(Default::default()) .await .unwrap(); - assert!(*cancel_call_completion_rx.borrow()) + // The nexus op future should have been resolved + assert!(*caller_op_future_rx.borrow()) } } + let (cancel_handler_responded_tx, _cancel_handler_responded_rx) = watch::channel(false); if cancellation_type != NexusOperationCancellationType::Abandon { let nt = core_worker.poll_nexus_task().await.unwrap(); let nt = nt.unwrap_task(); @@ -702,19 +724,20 @@ async fn nexus_cancellation_types( }) .await .unwrap(); + // Mark that the cancel handler has responded + cancel_handler_responded_tx.send(true).unwrap(); } - // Confirm the caller WF has not completed even after the handling of the cancel request + // Check that the nexus op future resolves only _after_ the handler WF completes if cancellation_type == NexusOperationCancellationType::WaitCancellationCompleted { - assert!(!*cancel_call_completion_rx.borrow()); + assert!(!*caller_op_future_rx.borrow()); - // It only completes after the handler WF terminates cancellation_wait_tx.send(true).unwrap(); wf_handle .get_workflow_result(Default::default()) .await .unwrap(); - assert!(*cancel_call_completion_rx.borrow()); + assert!(*caller_op_future_rx.borrow()); } assert_matches!( @@ -724,6 +747,38 @@ async fn nexus_cancellation_types( }; let shutdown_handle = worker.inner_mut().shutdown_handle(); + + let check_caller_op_future_resolved_then_allow_handler_to_complete = async { + // The caller nexus op future has been resolved + assert!(*caller_op_future_rx.borrow()); + + // Verify the handler workflow has not exited yet. This proves that the caller op future + // was resolved as a result of NexusOperationCancelRequestCompleted (written after cancel + // handler responds), as opposed to NexusOperationCanceled (written after handler workflow + // exits). + assert!( + !*handler_exited_rx.borrow(), + "Handler should not have exited yet" + ); + + let handler_wf_id = handler_wf_id_rx + .try_recv() + .expect("Should have received handler workflow ID"); + client + .signal_workflow_execution( + handler_wf_id, + "".to_string(), + "proceed-to-exit".to_string(), + None, + None, + ) + .await + .unwrap(); + + handler_exited_rx.changed().await.unwrap(); + assert!(*handler_exited_rx.borrow()); + }; + join!( nexus_task_handle, async { worker.inner_mut().run().await.unwrap() }, @@ -735,6 +790,9 @@ async fn nexus_cancellation_types( if cancellation_type == NexusOperationCancellationType::TryCancel { cancellation_rx.changed().await.unwrap(); } + if cancellation_type == NexusOperationCancellationType::WaitCancellationRequested { + check_caller_op_future_resolved_then_allow_handler_to_complete.await; + } shutdown_handle(); } ); @@ -765,8 +823,14 @@ async fn nexus_cancellation_types( ); assert_eq!(f.message, "Nexus operation cancelled after starting"); } - NexusOperationCancellationType::WaitCancellationRequested - | NexusOperationCancellationType::WaitCancellationCompleted => { + NexusOperationCancellationType::WaitCancellationRequested => { + let f = assert_matches!( + res.status, + Some(nexus_operation_result::Status::Cancelled(f)) => f + ); + assert_eq!(f.message, "Nexus operation cancellation request completed"); + } + NexusOperationCancellationType::WaitCancellationCompleted => { let f = assert_matches!( res.status, Some(nexus_operation_result::Status::Cancelled(f)) => f diff --git a/tests/runner.rs b/tests/runner.rs index b89fdd8be..e1489980e 100644 --- a/tests/runner.rs +++ b/tests/runner.rs @@ -107,6 +107,8 @@ async fn main() -> Result<(), anyhow::Error> { "frontend.workerVersioningDataAPIs=true".to_owned(), "--dynamic-config-value".to_owned(), "system.enableDeploymentVersions=true".to_owned(), + "--dynamic-config-value".to_owned(), + "component.nexusoperations.recordCancelRequestCompletionEvents=true".to_owned(), "--http-port".to_string(), "7243".to_string(), "--search-attribute".to_string(),