diff --git a/tests/integ_tests/workflow_tests/signals.rs b/tests/integ_tests/workflow_tests/signals.rs index 541be1ecf..2421fc448 100644 --- a/tests/integ_tests/workflow_tests/signals.rs +++ b/tests/integ_tests/workflow_tests/signals.rs @@ -1,12 +1,13 @@ -use std::collections::HashMap; - use futures_util::StreamExt; +use std::collections::HashMap; +use std::time::Duration; use temporal_client::{SignalWithStartOptions, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ - ChildWorkflowOptions, Signal, SignalWorkflowOptions, WfContext, WorkflowResult, + ChildWorkflowOptions, Signal, SignalWorkflowOptions, WfContext, WfExitValue, WorkflowResult, }; use temporal_sdk_core_protos::{coresdk::IntoPayloadsExt, temporal::api::common::v1::Payload}; use temporal_sdk_core_test_utils::CoreWfStarter; +use tracing::info; use uuid::Uuid; const SIGNAME: &str = "signame"; @@ -162,3 +163,47 @@ async fn sends_signal_to_child() { .unwrap(); worker.run_until_done().await.unwrap(); } + +async fn drain_sigchan_wf(ctx: WfContext) -> WorkflowResult<()> { + let sigchan = ctx.make_signal_channel(SIGNAME); + + // If we drain the channel with drain_all here, we get no signals and the workflow stays stuck. + // let signals = sigchan.drain_all(); + // info!("Signals received: {:?}", signals); + + // If we add this timer, the workflow gets unstuck (even without drain_all). + ctx.timer(Duration::from_secs(0)).await; + + // If we drain the channel with drain_all here, *after* the timer, we get signals and the workflow is not stuck. + let signals = sigchan.drain_all(); + info!("Signals received: {:?}", signals); + + Ok(WfExitValue::Normal(())) +} + +#[tokio::test] +async fn signal_with_start_drain() { + let wf_name = "drain_sigchan"; + let mut starter = CoreWfStarter::new(wf_name); + starter.worker_config.no_remote_activities(true); + let mut worker = starter.worker().await; + worker.register_wf(wf_name.to_string(), drain_sigchan_wf); + + let client = starter.get_client().await; + let options = SignalWithStartOptions::builder() + .task_queue(worker.inner_mut().task_queue()) + .workflow_id(wf_name) + .workflow_type(wf_name) + .input(vec![[1].into()].into_payloads().unwrap()) + .signal_name(SIGNAME) + .signal_input(vec![b"tada".into()].into_payloads()) + .build() + .unwrap(); + let res = client + .signal_with_start_workflow_execution(options, WorkflowOptions::default()) + .await + .expect("request succeeds.qed"); + + worker.expect_workflow_completion(wf_name, Some(res.run_id)); + worker.run_until_done().await.unwrap(); +}