From e00fa497c253c4b581239010dd89f37774637c6e Mon Sep 17 00:00:00 2001 From: Harsha Teja Kanna Date: Thu, 4 May 2023 18:43:28 -0500 Subject: [PATCH] Updated Activity/Workflow Function bounds to allow a more generic user function --- core/src/core_tests/local_activities.rs | 82 +++++------ sdk/src/activity_context.rs | 61 ++++---- sdk/src/lib.rs | 135 ++++++++++++++++-- test-utils/src/lib.rs | 6 +- tests/fuzzy_workflow.rs | 6 +- tests/heavy_tests.rs | 10 +- tests/integ_tests/heartbeat_tests.rs | 6 +- tests/integ_tests/workflow_tests.rs | 6 +- .../integ_tests/workflow_tests/activities.rs | 39 +++-- .../workflow_tests/appdata_propagation.rs | 6 +- .../workflow_tests/local_activities.rs | 95 ++++++------ 11 files changed, 278 insertions(+), 174 deletions(-) diff --git a/core/src/core_tests/local_activities.rs b/core/src/core_tests/local_activities.rs index 638525249..a5a696b49 100644 --- a/core/src/core_tests/local_activities.rs +++ b/core/src/core_tests/local_activities.rs @@ -21,8 +21,8 @@ use std::{ }; use temporal_client::WorkflowOptions; use temporal_sdk::{ - ActContext, ActivityCancelledError, LocalActivityOptions, WfContext, WorkflowFunction, - WorkflowResult, + ActContext, ActExitValue, ActivityCancelledError, ActivityFunction, LocalActivityOptions, + WfContext, WorkflowFunction, WorkflowResult, }; use temporal_sdk_core_api::{ errors::{PollActivityError, PollWfError}, @@ -101,7 +101,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached: Ok(().into()) }, ); - worker.register_activity(DEFAULT_ACTIVITY_TYPE, echo); + worker.register_activity(DEFAULT_ACTIVITY_TYPE, ActivityFunction::from(echo)); worker .submit_wf( wf_id.to_owned(), @@ -151,7 +151,7 @@ async fn local_act_many_concurrent() { let mut worker = mock_sdk(mh); worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf); - worker.register_activity("echo", echo); + worker.register_activity("echo", ActivityFunction::from(echo)); worker .submit_wf( wf_id.to_owned(), @@ -207,14 +207,17 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) { Ok(().into()) }, ); - worker.register_activity("echo", move |_ctx: ActContext, str: String| async move { - if shutdown_middle { - shutdown_barr.wait().await; - } - // Take slightly more than two workflow tasks - tokio::time::sleep(wft_timeout.mul_f32(2.2)).await; - Ok(str) - }); + worker.register_activity( + "echo", + ActivityFunction::from(move |_ctx: ActContext, str: String| async move { + if shutdown_middle { + shutdown_barr.wait().await; + } + // Take slightly more than two workflow tasks + tokio::time::sleep(wft_timeout.mul_f32(2.2)).await; + Ok(str) + }), + ); worker .submit_wf( wf_id.to_owned(), @@ -276,10 +279,10 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) { }, ); let attempts: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); - worker.register_activity("echo", move |_ctx: ActContext, _: String| async move { + worker.register_activity("echo", move |_ctx: ActContext| async move { // Succeed on 3rd attempt (which is ==2 since fetch_add returns prev val) if 2 == attempts.fetch_add(1, Ordering::Relaxed) && eventually_pass { - Ok(()) + Ok(().into()) } else { Err(anyhow!("Oh no I failed!")) } @@ -355,12 +358,9 @@ async fn local_act_retry_long_backoff_uses_timer() { Ok(().into()) }, ); - worker.register_activity( - DEFAULT_ACTIVITY_TYPE, - move |_ctx: ActContext, _: String| async move { - Result::<(), _>::Err(anyhow!("Oh no I failed!")) - }, - ); + worker.register_activity(DEFAULT_ACTIVITY_TYPE, move |_ctx: ActContext| async move { + Result::, _>::Err(anyhow!("Oh no I failed!")) + }); worker .submit_wf( wf_id.to_owned(), @@ -398,7 +398,7 @@ async fn local_act_null_result() { Ok(().into()) }, ); - worker.register_activity("nullres", |_ctx: ActContext, _: String| async { Ok(()) }); + worker.register_activity("nullres", |_ctx: ActContext| async { Ok(().into()) }); worker .submit_wf( wf_id.to_owned(), @@ -442,7 +442,7 @@ async fn local_act_command_immediately_follows_la_marker() { Ok(().into()) }, ); - worker.register_activity("nullres", |_ctx: ActContext, _: String| async { Ok(()) }); + worker.register_activity("nullres", |_ctx: ActContext| async { Ok(().into()) }); worker .submit_wf( wf_id.to_owned(), @@ -778,10 +778,7 @@ async fn test_schedule_to_start_timeout() { Ok(().into()) }, ); - worker.register_activity( - "echo", - move |_ctx: ActContext, _: String| async move { Ok(()) }, - ); + worker.register_activity("echo", move |_ctx: ActContext| async move { Ok(().into()) }); worker .submit_wf( wf_id.to_owned(), @@ -868,10 +865,7 @@ async fn test_schedule_to_start_timeout_not_based_on_original_time( Ok(().into()) }, ); - worker.register_activity( - "echo", - move |_ctx: ActContext, _: String| async move { Ok(()) }, - ); + worker.register_activity("echo", move |_ctx: ActContext| async move { Ok(().into()) }); worker .submit_wf( wf_id.to_owned(), @@ -917,16 +911,13 @@ async fn wft_failure_cancels_running_las() { Ok(().into()) }, ); - worker.register_activity( - DEFAULT_ACTIVITY_TYPE, - move |ctx: ActContext, _: String| async move { - let res = tokio::time::timeout(Duration::from_millis(500), ctx.cancelled()).await; - if res.is_err() { - panic!("Activity must be cancelled!!!!"); - } - Result::<(), _>::Err(ActivityCancelledError::default().into()) - }, - ); + worker.register_activity(DEFAULT_ACTIVITY_TYPE, move |ctx: ActContext| async move { + let res = tokio::time::timeout(Duration::from_millis(500), ctx.cancelled()).await; + if res.is_err() { + panic!("Activity must be cancelled!!!!"); + } + Result::, _>::Err(ActivityCancelledError::default().into()) + }); worker .submit_wf( wf_id.to_owned(), @@ -980,7 +971,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() { ); worker.register_activity( "echo", - move |_: ActContext, _: String| async move { Ok(()) }, + ActivityFunction::from(move |_: ActContext, _: String| async move { Ok(()) }), ); worker .submit_wf( @@ -1039,9 +1030,12 @@ async fn local_act_records_nonfirst_attempts_ok() { Ok(().into()) }, ); - worker.register_activity("echo", move |_ctx: ActContext, _: String| async move { - Result::<(), _>::Err(anyhow!("I fail")) - }); + worker.register_activity( + "echo", + ActivityFunction::from(move |_ctx: ActContext, _: String| async move { + Result::<(), _>::Err(anyhow!("I fail")) + }), + ); worker .submit_wf( wf_id.to_owned(), diff --git a/sdk/src/activity_context.rs b/sdk/src/activity_context.rs index a2cf11cdb..b3297ccbd 100644 --- a/sdk/src/activity_context.rs +++ b/sdk/src/activity_context.rs @@ -60,7 +60,7 @@ impl ActContext { task_queue: String, task_token: Vec, task: activity_task::Start, - ) -> (Self, Payload) { + ) -> Self { let activity_task::Start { workflow_namespace, workflow_type, @@ -68,7 +68,7 @@ impl ActContext { activity_id, activity_type, header_fields, - mut input, + input, heartbeat_details, scheduled_time, current_attempt_scheduled_time, @@ -86,37 +86,32 @@ impl ActContext { start_to_close_timeout.as_ref(), schedule_to_close_timeout.as_ref(), ); - let first_arg = input.pop().unwrap_or_default(); - ( - ActContext { - worker, - app_data, - cancellation_token, - input, - heartbeat_details, - header_fields, - info: ActivityInfo { - task_token, - task_queue, - workflow_type, - workflow_namespace, - workflow_execution, - activity_id, - activity_type, - heartbeat_timeout: heartbeat_timeout.try_into_or_none(), - scheduled_time: scheduled_time.try_into_or_none(), - started_time: started_time.try_into_or_none(), - deadline, - attempt, - current_attempt_scheduled_time: current_attempt_scheduled_time - .try_into_or_none(), - retry_policy, - is_local, - }, + ActContext { + worker, + app_data, + cancellation_token, + input, + heartbeat_details, + header_fields, + info: ActivityInfo { + task_token, + task_queue, + workflow_type, + workflow_namespace, + workflow_execution, + activity_id, + activity_type, + heartbeat_timeout: heartbeat_timeout.try_into_or_none(), + scheduled_time: scheduled_time.try_into_or_none(), + started_time: started_time.try_into_or_none(), + deadline, + attempt, + current_attempt_scheduled_time: current_attempt_scheduled_time.try_into_or_none(), + retry_policy, + is_local, }, - first_arg, - ) + } } /// Returns a future the completes if and when the activity this was called inside has been @@ -133,8 +128,8 @@ impl ActContext { /// Retrieve extra parameters to the Activity. The first input is always popped and passed to /// the Activity function for the currently executing activity. However, if more parameters are /// passed, perhaps from another language's SDK, explicit access is available from extra_inputs - pub fn extra_inputs(&mut self) -> &mut [Payload] { - &mut self.input + pub fn get_args(&self) -> &[Payload] { + &self.input } /// Extract heartbeat details from last failed attempt. This is used in combination with retry policy. diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index ccf8b06b8..6cc50150b 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -9,7 +9,7 @@ //! An example of running an activity worker: //! ```no_run //! use std::{str::FromStr, sync::Arc}; -//! use temporal_sdk::{sdk_client_options, ActContext, Worker}; +//! use temporal_sdk::{sdk_client_options, ActContext, ActExitValue, ActivityFunction, Worker}; //! use temporal_sdk_core::{init_worker, Url, CoreRuntime}; //! use temporal_sdk_core_api::{worker::WorkerConfigBuilder, telemetry::TelemetryOptionsBuilder}; //! @@ -32,7 +32,7 @@ //! let mut worker = Worker::new_from_core(Arc::new(core_worker), "task_queue"); //! worker.register_activity( //! "echo_activity", -//! |_ctx: ActContext, echo_me: String| async move { Ok(echo_me) }, +//! ActivityFunction::from(|ctx: ActContext, echo: String| async move { Ok(echo) }), //! ); //! //! worker.run().await?; @@ -207,17 +207,14 @@ impl Worker { /// Register an Activity function to invoke when the Worker is asked to run an activity of /// `activity_type` - pub fn register_activity( + pub fn register_activity( &mut self, activity_type: impl Into, - act_function: impl IntoActivityFunc, + act_function: impl Into, ) { - self.activity_half.activity_fns.insert( - activity_type.into(), - ActivityFunction { - act_func: act_function.into_activity_fn(), - }, - ); + self.activity_half + .activity_fns + .insert(activity_type.into(), act_function.into()); } /// Insert Custom App Context for Workflows and Activities @@ -477,7 +474,7 @@ impl ActivityHalf { self.task_tokens_to_cancels .insert(task_token.clone().into(), ct.clone()); - let (ctx, arg) = ActContext::new( + let ctx = ActContext::new( worker.clone(), app_data, ct, @@ -486,7 +483,7 @@ impl ActivityHalf { start, ); tokio::spawn(async move { - let output = AssertUnwindSafe((act_fn.act_func)(ctx, arg)) + let output = AssertUnwindSafe((act_fn.act_func)(ctx)) .catch_unwind() .await; let result = match output { @@ -743,6 +740,39 @@ impl WorkflowFunction { }), } } + + /// Build a workflow function from a closure or function pointer which accepts a [WfContext] and takes one argument + pub fn from(f: F) -> Self + where + A: FromJsonPayloadExt + Send, + F: Fn(WfContext, A) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + R: Into>, + O: Serialize + Debug, + { + Self { + wf_func: Box::new(move |ctx: WfContext| { + match A::from_json_payload(&ctx.get_args()[0]) { + Ok(a) => (f)(ctx, a) + .map(|r| { + r.and_then(|r| { + let r = r.into(); + Ok(match r { + WfExitValue::ContinueAsNew(b) => WfExitValue::ContinueAsNew(b), + WfExitValue::Cancelled => WfExitValue::Cancelled, + WfExitValue::Evicted => WfExitValue::Evicted, + WfExitValue::Normal(o) => { + WfExitValue::Normal(o.as_json_payload()?) + } + }) + }) + }) + .boxed(), + Err(e) => async move { Err(anyhow::Error::new(e)) }.boxed(), + } + }), + } + } } /// The result of running a workflow @@ -771,6 +801,9 @@ impl WfExitValue { } } +/// The result of running an activity +pub type ActivityResult = Result, anyhow::Error>; + /// Activity functions may return these values when exiting #[derive(derive_more::From)] pub enum ActExitValue { @@ -782,7 +815,7 @@ pub enum ActExitValue { } type BoxActFn = Arc< - dyn Fn(ActContext, Payload) -> BoxFuture<'static, Result, anyhow::Error>> + dyn Fn(ActContext) -> BoxFuture<'static, Result, anyhow::Error>> + Send + Sync, >; @@ -793,6 +826,77 @@ pub struct ActivityFunction { act_func: BoxActFn, } +impl From for ActivityFunction +where + F: Fn(ActContext) -> Fut + Send + Sync + 'static, + Fut: Future, anyhow::Error>> + Send + 'static, + O: Serialize + Debug, +{ + fn from(act_func: F) -> Self { + Self::new(act_func) + } +} + +impl ActivityFunction { + /// Build an activity function from a closure or function pointer which accepts a [ActContext] + pub fn new(act_func: F) -> Self + where + F: Fn(ActContext) -> Fut + Send + Sync + 'static, + Fut: Future, anyhow::Error>> + Send + 'static, + O: Serialize + Debug, + { + Self { + act_func: Arc::new(move |ctx: ActContext| { + (act_func)(ctx) + .map(|r| { + r.and_then(|r| { + Ok(match r { + ActExitValue::WillCompleteAsync => ActExitValue::WillCompleteAsync, + ActExitValue::Normal(o) => { + ActExitValue::Normal(o.as_json_payload()?) + } + }) + }) + }) + .boxed() + }), + } + } + + /// Build an activity function from a closure or function pointer which accepts a [ActContext] and takes one argument + pub fn from(act_func: F) -> Self + where + A: FromJsonPayloadExt + Send, + F: Fn(ActContext, A) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + R: Into>, + O: Serialize + Debug, + { + Self { + act_func: Arc::new(move |ctx: ActContext| { + match A::from_json_payload(&ctx.get_args()[0]) { + Ok(a) => (act_func)(ctx, a) + .map(|r| { + r.and_then(|r| { + let r = r.into(); + Ok(match r { + ActExitValue::WillCompleteAsync => { + ActExitValue::WillCompleteAsync + } + ActExitValue::Normal(o) => { + ActExitValue::Normal(o.as_json_payload()?) + } + }) + }) + }) + .boxed(), + Err(e) => async move { Err(anyhow::Error::new(e)) }.boxed(), + } + }), + } + } +} + /// Return this error to indicate your activity is cancelling #[derive(Debug, Default)] pub struct ActivityCancelledError { @@ -829,6 +933,7 @@ pub trait IntoActivityFunc { fn into_activity_fn(self) -> BoxActFn; } +// TODO: Remove this later, not needed with ActivityFunction new and from functions. impl IntoActivityFunc for F where F: (Fn(ActContext, A) -> Rf) + Sync + Send + 'static, @@ -838,9 +943,9 @@ where O: AsJsonPayloadExt + Debug, { fn into_activity_fn(self) -> BoxActFn { - let wrapper = move |ctx: ActContext, input: Payload| { + let wrapper = move |ctx: ActContext| { // Some minor gymnastics are required to avoid needing to clone the function - match A::from_json_payload(&input) { + match A::from_json_payload(&ctx.get_args()[0]) { Ok(deser) => (self)(ctx, deser) .map(|r| { r.and_then(|r| { diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index cd0f3170f..4c625ea06 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -27,7 +27,7 @@ use temporal_client::{ }; use temporal_sdk::{ interceptors::{FailOnNondeterminismInterceptor, WorkerInterceptor}, - IntoActivityFunc, Worker, WorkflowFunction, + ActivityFunction, Worker, WorkflowFunction, }; use temporal_sdk_core::{ ephemeral_server::{EphemeralExe, EphemeralExeVersion}, @@ -385,10 +385,10 @@ impl TestWorker { self.inner.register_wf(workflow_type, wf_function) } - pub fn register_activity( + pub fn register_activity( &mut self, activity_type: impl Into, - act_function: impl IntoActivityFunc, + act_function: impl Into, ) { self.inner.register_activity(activity_type, act_function) } diff --git a/tests/fuzzy_workflow.rs b/tests/fuzzy_workflow.rs index 8c7c10660..d79b32a1a 100644 --- a/tests/fuzzy_workflow.rs +++ b/tests/fuzzy_workflow.rs @@ -2,7 +2,9 @@ use futures_util::{sink, stream::FuturesUnordered, FutureExt, StreamExt}; use rand::{prelude::Distribution, rngs::SmallRng, Rng, SeedableRng}; use std::{future, time::Duration}; use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions}; -use temporal_sdk::{ActContext, ActivityOptions, LocalActivityOptions, WfContext, WorkflowResult}; +use temporal_sdk::{ + ActContext, ActivityFunction, ActivityOptions, LocalActivityOptions, WfContext, WorkflowResult, +}; use temporal_sdk_core_protos::coresdk::{AsJsonPayloadExt, FromJsonPayloadExt, IntoPayloadsExt}; use temporal_sdk_core_test_utils::CoreWfStarter; use tokio_util::sync::CancellationToken; @@ -82,7 +84,7 @@ async fn fuzzy_workflow() { // .enable_wf_state_input_recording(); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), fuzzy_wf_def); - worker.register_activity("echo_activity", echo); + worker.register_activity("echo_activity", ActivityFunction::from(echo)); let client = starter.get_client().await; let mut workflow_handles = vec![]; diff --git a/tests/heavy_tests.rs b/tests/heavy_tests.rs index 68b1f7774..584451645 100644 --- a/tests/heavy_tests.rs +++ b/tests/heavy_tests.rs @@ -1,7 +1,7 @@ use futures::{future::join_all, sink, stream::FuturesUnordered, StreamExt}; use std::time::{Duration, Instant}; use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions}; -use temporal_sdk::{ActContext, ActivityOptions, WfContext, WorkflowResult}; +use temporal_sdk::{ActContext, ActivityFunction, ActivityOptions, WfContext, WorkflowResult}; use temporal_sdk_core_protos::coresdk::{ workflow_commands::ActivityCancellationType, AsJsonPayloadExt, }; @@ -52,7 +52,7 @@ async fn activity_load() { worker.register_wf(wf_type.to_owned(), wf_fn); worker.register_activity( "test_activity", - |_ctx: ActContext, echo: String| async move { Ok(echo) }, + ActivityFunction::from(|_ctx: ActContext, echo: String| async move { Ok(echo) }), ); join_all((0..CONCURRENCY).map(|i| { let worker = &worker; @@ -115,7 +115,7 @@ async fn workflow_load() { }); worker.register_activity( "echo_activity", - |_ctx: ActContext, echo_me: String| async move { Ok(echo_me) }, + ActivityFunction::from(|_ctx: ActContext, echo_me: String| async move { Ok(echo_me) }), ); let client = starter.get_client().await; @@ -170,9 +170,9 @@ async fn evict_while_la_running_no_interference() { let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), la_problem_workflow); - worker.register_activity("delay", |_: ActContext, _: String| async { + worker.register_activity("delay", |_: ActContext| async { tokio::time::sleep(Duration::from_secs(15)).await; - Ok(()) + Ok(().into()) }); let client = starter.get_client().await; diff --git a/tests/integ_tests/heartbeat_tests.rs b/tests/integ_tests/heartbeat_tests.rs index 35d00609b..9ac2d746b 100644 --- a/tests/integ_tests/heartbeat_tests.rs +++ b/tests/integ_tests/heartbeat_tests.rs @@ -1,7 +1,7 @@ use assert_matches::assert_matches; use std::time::Duration; use temporal_client::{WfClientExt, WorkflowOptions}; -use temporal_sdk::{ActContext, ActivityOptions, WfContext}; +use temporal_sdk::{ActContext, ActivityFunction, ActivityOptions, WfContext}; use temporal_sdk_core_protos::{ coresdk::{ activity_result::{ @@ -182,10 +182,10 @@ async fn activity_doesnt_heartbeat_hits_timeout_then_completes() { let client = starter.get_client().await; worker.register_activity( "echo_activity", - |_ctx: ActContext, echo_me: String| async move { + ActivityFunction::from(|_ctx: ActContext, echo_me: String| async move { sleep(Duration::from_secs(4)).await; Ok(echo_me) - }, + }), ); worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { let res = ctx diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index 83b3670c6..52c15c2e4 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -27,7 +27,9 @@ use std::{ time::Duration, }; use temporal_client::{WorkflowClientTrait, WorkflowOptions}; -use temporal_sdk::{interceptors::WorkerInterceptor, ActivityOptions, WfContext, WorkflowResult}; +use temporal_sdk::{ + interceptors::WorkerInterceptor, ActivityFunction, ActivityOptions, WfContext, WorkflowResult, +}; use temporal_sdk_core::replay::HistoryForReplay; use temporal_sdk_core_api::{errors::PollWfError, Worker}; use temporal_sdk_core_protos::{ @@ -564,7 +566,7 @@ async fn slow_completes_with_small_cache() { } Ok(().into()) }); - worker.register_activity("echo_activity", echo); + worker.register_activity("echo_activity", ActivityFunction::from(echo)); for i in 0..20 { worker .submit_wf( diff --git a/tests/integ_tests/workflow_tests/activities.rs b/tests/integ_tests/workflow_tests/activities.rs index 1c04868e8..046f14ddb 100644 --- a/tests/integ_tests/workflow_tests/activities.rs +++ b/tests/integ_tests/workflow_tests/activities.rs @@ -5,8 +5,8 @@ use futures_util::future::join_all; use std::time::Duration; use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowExecutionResult, WorkflowOptions}; use temporal_sdk::{ - ActContext, ActExitValue, ActivityCancelledError, ActivityOptions, CancellableFuture, - WfContext, WorkflowResult, + ActContext, ActExitValue, ActivityCancelledError, ActivityFunction, ActivityOptions, + CancellableFuture, WfContext, WorkflowResult, }; use temporal_sdk_core_protos::{ coresdk::{ @@ -55,7 +55,7 @@ async fn one_activity() { let mut worker = starter.worker().await; let client = starter.get_client().await; worker.register_wf(wf_name.to_owned(), one_activity_wf); - worker.register_activity("echo_activity", echo); + worker.register_activity("echo_activity", ActivityFunction::from(echo)); let run_id = worker .submit_wf( @@ -806,10 +806,10 @@ async fn one_activity_abandon_cancelled_after_complete() { }); worker.register_activity( "echo_activity", - |_ctx: ActContext, echo_me: String| async move { + ActivityFunction::from(|_ctx: ActContext, echo_me: String| async move { sleep(Duration::from_secs(2)).await; Ok(echo_me) - }, + }), ); let run_id = worker @@ -863,20 +863,17 @@ async fn it_can_complete_async() { }); let shared_token_ref = shared_token.clone(); - worker.register_activity( - "complete_async_activity", - move |ctx: ActContext, _: String| { - let shared_token_ref = shared_token_ref.clone(); - async move { - // set the `activity_task_token` - let activity_info = ctx.get_info(); - let task_token = &activity_info.task_token; - let mut shared = shared_token_ref.lock().await; - *shared = Some(task_token.clone()); - Ok::, _>(ActExitValue::WillCompleteAsync) - } - }, - ); + worker.register_activity("complete_async_activity", move |ctx: ActContext| { + let shared_token_ref = shared_token_ref.clone(); + async move { + // set the `activity_task_token` + let activity_info = ctx.get_info(); + let task_token = &activity_info.task_token; + let mut shared = shared_token_ref.lock().await; + *shared = Some(task_token.clone()); + Ok::, _>(ActExitValue::WillCompleteAsync) + } + }); let shared_token_ref2 = shared_token.clone(); tokio::spawn(async move { @@ -938,12 +935,12 @@ async fn graceful_shutdown() { }); static ACTS_STARTED: Semaphore = Semaphore::const_new(0); static ACTS_DONE: Semaphore = Semaphore::const_new(0); - worker.register_activity("sleeper", |ctx: ActContext, _: String| async move { + worker.register_activity("sleeper", |ctx: ActContext| async move { ACTS_STARTED.add_permits(1); // just wait to be cancelled ctx.cancelled().await; ACTS_DONE.add_permits(1); - Result::<(), _>::Err(ActivityCancelledError::default().into()) + Result::, _>::Err(ActivityCancelledError::default().into()) }); worker diff --git a/tests/integ_tests/workflow_tests/appdata_propagation.rs b/tests/integ_tests/workflow_tests/appdata_propagation.rs index df7972bec..69976af09 100644 --- a/tests/integ_tests/workflow_tests/appdata_propagation.rs +++ b/tests/integ_tests/workflow_tests/appdata_propagation.rs @@ -1,7 +1,7 @@ use assert_matches::assert_matches; use std::time::Duration; use temporal_client::{WfClientExt, WorkflowExecutionResult, WorkflowOptions}; -use temporal_sdk::{ActContext, ActivityOptions, WfContext, WorkflowResult}; +use temporal_sdk::{ActContext, ActivityFunction, ActivityOptions, WfContext, WorkflowResult}; use temporal_sdk_core_protos::coresdk::AsJsonPayloadExt; use temporal_sdk_core_test_utils::CoreWfStarter; @@ -35,11 +35,11 @@ async fn appdata_access_in_activities_and_workflows() { worker.register_wf(wf_name.to_owned(), appdata_activity_wf); worker.register_activity( "echo_activity", - |ctx: ActContext, echo_me: String| async move { + ActivityFunction::from(|ctx: ActContext, echo_me: String| async move { let data = ctx.app_data::().expect("appdata exists. qed"); assert_eq!(data.message, TEST_APPDATA_MESSAGE.to_owned()); Ok(echo_me) - }, + }), ); let run_id = worker diff --git a/tests/integ_tests/workflow_tests/local_activities.rs b/tests/integ_tests/workflow_tests/local_activities.rs index 76b522afc..156fb9c50 100644 --- a/tests/integ_tests/workflow_tests/local_activities.rs +++ b/tests/integ_tests/workflow_tests/local_activities.rs @@ -7,8 +7,8 @@ use std::{ }; use temporal_client::WorkflowOptions; use temporal_sdk::{ - interceptors::WorkerInterceptor, ActContext, ActivityCancelledError, CancellableFuture, - LocalActivityOptions, WfContext, WorkflowResult, + interceptors::WorkerInterceptor, ActContext, ActExitValue, ActivityCancelledError, + ActivityFunction, CancellableFuture, LocalActivityOptions, WfContext, WorkflowResult, }; use temporal_sdk_core::replay::HistoryForReplay; use temporal_sdk_core_protos::{ @@ -46,7 +46,7 @@ async fn one_local_activity() { let mut starter = CoreWfStarter::new(wf_name); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), one_local_activity_wf); - worker.register_activity("echo_activity", echo); + worker.register_activity("echo_activity", ActivityFunction::from(echo)); starter.start_with_worker(wf_name, &mut worker).await; worker.run_until_done().await.unwrap(); @@ -69,7 +69,7 @@ async fn local_act_concurrent_with_timer() { let mut starter = CoreWfStarter::new(wf_name); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), local_act_concurrent_with_timer_wf); - worker.register_activity("echo_activity", echo); + worker.register_activity("echo_activity", ActivityFunction::from(echo)); starter.start_with_worker(wf_name, &mut worker).await; worker.run_until_done().await.unwrap(); @@ -93,7 +93,7 @@ async fn local_act_then_timer_then_wait_result() { let mut starter = CoreWfStarter::new(wf_name); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), local_act_then_timer_then_wait); - worker.register_activity("echo_activity", echo); + worker.register_activity("echo_activity", ActivityFunction::from(echo)); starter.start_with_worker(wf_name, &mut worker).await; worker.run_until_done().await.unwrap(); @@ -106,10 +106,13 @@ async fn long_running_local_act_with_timer() { starter.workflow_options.task_timeout = Some(Duration::from_secs(1)); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), local_act_then_timer_then_wait); - worker.register_activity("echo_activity", |_ctx: ActContext, str: String| async { - tokio::time::sleep(Duration::from_secs(4)).await; - Ok(str) - }); + worker.register_activity( + "echo_activity", + ActivityFunction::from(|_ctx: ActContext, str: String| async { + tokio::time::sleep(Duration::from_secs(4)).await; + Ok(str) + }), + ); starter.start_with_worker(wf_name, &mut worker).await; worker.run_until_done().await.unwrap(); @@ -139,7 +142,7 @@ async fn local_act_fanout() { starter.max_local_at(1); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), local_act_fanout_wf); - worker.register_activity("echo_activity", echo); + worker.register_activity("echo_activity", ActivityFunction::from(echo)); starter.start_with_worker(wf_name, &mut worker).await; worker.run_until_done().await.unwrap(); @@ -170,8 +173,8 @@ async fn local_act_retry_timer_backoff() { assert!(res.failed()); Ok(().into()) }); - worker.register_activity("echo", |_: ActContext, _: String| async { - Result::<(), _>::Err(anyhow!("Oh no I failed!")) + worker.register_activity("echo", |_: ActContext| async { + Result::, _>::Err(anyhow!("Oh no I failed!")) }); let run_id = worker @@ -216,19 +219,22 @@ async fn cancel_immediate(#[case] cancel_type: ActivityCancellationType) { let manual_cancel = CancellationToken::new(); let manual_cancel_act = manual_cancel.clone(); - worker.register_activity("echo", move |ctx: ActContext, _: String| { - let manual_cancel_act = manual_cancel_act.clone(); - async move { - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(10)) => {}, - _ = ctx.cancelled() => { - return Err(anyhow!(ActivityCancelledError::default())) + worker.register_activity( + "echo", + ActivityFunction::from(move |ctx: ActContext, _: String| { + let manual_cancel_act = manual_cancel_act.clone(); + async move { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => {}, + _ = ctx.cancelled() => { + return Err(anyhow!(ActivityCancelledError::default())) + } + _ = manual_cancel_act.cancelled() => {} } - _ = manual_cancel_act.cancelled() => {} + Ok(()) } - Ok(()) - } - }); + }), + ); starter.start_with_worker(wf_name, &mut worker).await; worker @@ -320,7 +326,7 @@ async fn cancel_after_act_starts( let manual_cancel = CancellationToken::new(); let manual_cancel_act = manual_cancel.clone(); - worker.register_activity("echo", move |ctx: ActContext, _: String| { + worker.register_activity("echo", move |ctx: ActContext| { let manual_cancel_act = manual_cancel_act.clone(); async move { if cancel_on_backoff.is_some() { @@ -336,7 +342,7 @@ async fn cancel_after_act_starts( return Err(anyhow!(ActivityCancelledError::default())) } _ = manual_cancel_act.cancelled() => { - return Ok(()) + return Ok(().into()) } } } @@ -400,14 +406,14 @@ async fn x_to_close_timeout(#[case] is_schedule: bool) { assert_eq!(res.timed_out(), Some(timeout_type)); Ok(().into()) }); - worker.register_activity("echo", |ctx: ActContext, _: String| async move { + worker.register_activity("echo", |ctx: ActContext| async move { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(100)) => {}, _ = ctx.cancelled() => { return Err(anyhow!(ActivityCancelledError::default())) } }; - Ok(()) + Ok(().into()) }); starter.start_with_worker(wf_name, &mut worker).await; @@ -449,9 +455,9 @@ async fn schedule_to_close_timeout_across_timer_backoff(#[case] cached: bool) { Ok(().into()) }); let num_attempts: &'static _ = Box::leak(Box::new(AtomicU8::new(0))); - worker.register_activity("echo", move |_: ActContext, _: String| async { + worker.register_activity("echo", move |_: ActContext| async { num_attempts.fetch_add(1, Ordering::Relaxed); - Result::<(), _>::Err(anyhow!("Oh no I failed!")) + Result::, _>::Err(anyhow!("Oh no I failed!")) }); starter.start_with_worker(wf_name, &mut worker).await; @@ -469,10 +475,13 @@ async fn eviction_wont_make_local_act_get_dropped(#[values(true, false)] short_w starter.max_cached_workflows(0); let mut worker = starter.worker().await; worker.register_wf(wf_name.to_owned(), local_act_then_timer_then_wait); - worker.register_activity("echo_activity", |_ctx: ActContext, str: String| async { - tokio::time::sleep(Duration::from_secs(4)).await; - Ok(str) - }); + worker.register_activity( + "echo_activity", + ActivityFunction::from(|_ctx: ActContext, str: String| async { + tokio::time::sleep(Duration::from_secs(4)).await; + Ok(str) + }), + ); let opts = if short_wft_timeout { WorkflowOptions { @@ -526,8 +535,8 @@ async fn timer_backoff_concurrent_with_non_timer_backoff() { assert!(r2.failed()); Ok(().into()) }); - worker.register_activity("echo", |_: ActContext, _: String| async { - Result::<(), _>::Err(anyhow!("Oh no I failed!")) + worker.register_activity("echo", |_: ActContext| async { + Result::, _>::Err(anyhow!("Oh no I failed!")) }); starter.start_with_worker(wf_name, &mut worker).await; @@ -565,9 +574,9 @@ async fn repro_nondeterminism_with_timer_bug() { ctx.timer(Duration::from_secs(1)).await; Ok(().into()) }); - worker.register_activity("delay", |_: ActContext, _: String| async { + worker.register_activity("delay", |_: ActContext| async { tokio::time::sleep(Duration::from_secs(2)).await; - Ok(()) + Ok(().into()) }); let run_id = worker @@ -609,9 +618,9 @@ async fn weird_la_nondeterminism_repro(#[values(true, false)] fix_hist: bool) { "evict_while_la_running_no_interference", la_problem_workflow, ); - worker.register_activity("delay", |_: ActContext, _: String| async { + worker.register_activity("delay", |_: ActContext| async { tokio::time::sleep(Duration::from_secs(15)).await; - Ok(()) + Ok(().into()) }); worker.run().await.unwrap(); } @@ -635,9 +644,9 @@ async fn second_weird_la_nondeterminism_repro() { "evict_while_la_running_no_interference", la_problem_workflow, ); - worker.register_activity("delay", |_: ActContext, _: String| async { + worker.register_activity("delay", |_: ActContext| async { tokio::time::sleep(Duration::from_secs(15)).await; - Ok(()) + Ok(().into()) }); worker.run().await.unwrap(); } @@ -659,9 +668,9 @@ async fn third_weird_la_nondeterminism_repro() { "evict_while_la_running_no_interference", la_problem_workflow, ); - worker.register_activity("delay", |_: ActContext, _: String| async { + worker.register_activity("delay", |_: ActContext| async { tokio::time::sleep(Duration::from_secs(15)).await; - Ok(()) + Ok(().into()) }); worker.run().await.unwrap(); }