Skip to content

Updated Activity/Workflow Function bounds to allow a more generic use… #543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 38 additions & 44 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::{
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{
ActContext, ActivityCancelledError, LocalActivityOptions, WfContext, WorkflowFunction,
WorkflowResult,
ActContext, ActExitValue, ActivityCancelledError, ActivityFunction, LocalActivityOptions,
WfContext, WorkflowFunction, WorkflowResult,
};
use temporal_sdk_core_api::{
errors::{PollActivityError, PollWfError},
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
Ok(().into())
},
);
worker.register_activity(DEFAULT_ACTIVITY_TYPE, echo);
worker.register_activity(DEFAULT_ACTIVITY_TYPE, ActivityFunction::from(echo));
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -151,7 +151,7 @@ async fn local_act_many_concurrent() {
let mut worker = mock_sdk(mh);

worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf);
worker.register_activity("echo", echo);
worker.register_activity("echo", ActivityFunction::from(echo));
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -207,14 +207,17 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
Ok(().into())
},
);
worker.register_activity("echo", move |_ctx: ActContext, str: String| async move {
if shutdown_middle {
shutdown_barr.wait().await;
}
// Take slightly more than two workflow tasks
tokio::time::sleep(wft_timeout.mul_f32(2.2)).await;
Ok(str)
});
worker.register_activity(
"echo",
ActivityFunction::from(move |_ctx: ActContext, str: String| async move {
if shutdown_middle {
shutdown_barr.wait().await;
}
// Take slightly more than two workflow tasks
tokio::time::sleep(wft_timeout.mul_f32(2.2)).await;
Ok(str)
}),
);
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -276,10 +279,10 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
},
);
let attempts: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
worker.register_activity("echo", move |_ctx: ActContext, _: String| async move {
worker.register_activity("echo", move |_ctx: ActContext| async move {
// Succeed on 3rd attempt (which is ==2 since fetch_add returns prev val)
if 2 == attempts.fetch_add(1, Ordering::Relaxed) && eventually_pass {
Ok(())
Ok(().into())
} else {
Err(anyhow!("Oh no I failed!"))
}
Expand Down Expand Up @@ -355,12 +358,9 @@ async fn local_act_retry_long_backoff_uses_timer() {
Ok(().into())
},
);
worker.register_activity(
DEFAULT_ACTIVITY_TYPE,
move |_ctx: ActContext, _: String| async move {
Result::<(), _>::Err(anyhow!("Oh no I failed!"))
},
);
worker.register_activity(DEFAULT_ACTIVITY_TYPE, move |_ctx: ActContext| async move {
Result::<ActExitValue<()>, _>::Err(anyhow!("Oh no I failed!"))
});
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -398,7 +398,7 @@ async fn local_act_null_result() {
Ok(().into())
},
);
worker.register_activity("nullres", |_ctx: ActContext, _: String| async { Ok(()) });
worker.register_activity("nullres", |_ctx: ActContext| async { Ok(().into()) });
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -442,7 +442,7 @@ async fn local_act_command_immediately_follows_la_marker() {
Ok(().into())
},
);
worker.register_activity("nullres", |_ctx: ActContext, _: String| async { Ok(()) });
worker.register_activity("nullres", |_ctx: ActContext| async { Ok(().into()) });
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -778,10 +778,7 @@ async fn test_schedule_to_start_timeout() {
Ok(().into())
},
);
worker.register_activity(
"echo",
move |_ctx: ActContext, _: String| async move { Ok(()) },
);
worker.register_activity("echo", move |_ctx: ActContext| async move { Ok(().into()) });
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -868,10 +865,7 @@ async fn test_schedule_to_start_timeout_not_based_on_original_time(
Ok(().into())
},
);
worker.register_activity(
"echo",
move |_ctx: ActContext, _: String| async move { Ok(()) },
);
worker.register_activity("echo", move |_ctx: ActContext| async move { Ok(().into()) });
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -917,16 +911,13 @@ async fn wft_failure_cancels_running_las() {
Ok(().into())
},
);
worker.register_activity(
DEFAULT_ACTIVITY_TYPE,
move |ctx: ActContext, _: String| async move {
let res = tokio::time::timeout(Duration::from_millis(500), ctx.cancelled()).await;
if res.is_err() {
panic!("Activity must be cancelled!!!!");
}
Result::<(), _>::Err(ActivityCancelledError::default().into())
},
);
worker.register_activity(DEFAULT_ACTIVITY_TYPE, move |ctx: ActContext| async move {
let res = tokio::time::timeout(Duration::from_millis(500), ctx.cancelled()).await;
if res.is_err() {
panic!("Activity must be cancelled!!!!");
}
Result::<ActExitValue<()>, _>::Err(ActivityCancelledError::default().into())
});
worker
.submit_wf(
wf_id.to_owned(),
Expand Down Expand Up @@ -980,7 +971,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() {
);
worker.register_activity(
"echo",
move |_: ActContext, _: String| async move { Ok(()) },
ActivityFunction::from(move |_: ActContext, _: String| async move { Ok(()) }),
);
worker
.submit_wf(
Expand Down Expand Up @@ -1039,9 +1030,12 @@ async fn local_act_records_nonfirst_attempts_ok() {
Ok(().into())
},
);
worker.register_activity("echo", move |_ctx: ActContext, _: String| async move {
Result::<(), _>::Err(anyhow!("I fail"))
});
worker.register_activity(
"echo",
ActivityFunction::from(move |_ctx: ActContext, _: String| async move {
Result::<(), _>::Err(anyhow!("I fail"))
}),
);
worker
.submit_wf(
wf_id.to_owned(),
Expand Down
61 changes: 28 additions & 33 deletions sdk/src/activity_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ impl ActContext {
task_queue: String,
task_token: Vec<u8>,
task: activity_task::Start,
) -> (Self, Payload) {
) -> Self {
let activity_task::Start {
workflow_namespace,
workflow_type,
workflow_execution,
activity_id,
activity_type,
header_fields,
mut input,
input,
heartbeat_details,
scheduled_time,
current_attempt_scheduled_time,
Expand All @@ -86,37 +86,32 @@ impl ActContext {
start_to_close_timeout.as_ref(),
schedule_to_close_timeout.as_ref(),
);
let first_arg = input.pop().unwrap_or_default();

(
ActContext {
worker,
app_data,
cancellation_token,
input,
heartbeat_details,
header_fields,
info: ActivityInfo {
task_token,
task_queue,
workflow_type,
workflow_namespace,
workflow_execution,
activity_id,
activity_type,
heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
scheduled_time: scheduled_time.try_into_or_none(),
started_time: started_time.try_into_or_none(),
deadline,
attempt,
current_attempt_scheduled_time: current_attempt_scheduled_time
.try_into_or_none(),
retry_policy,
is_local,
},
ActContext {
worker,
app_data,
cancellation_token,
input,
heartbeat_details,
header_fields,
info: ActivityInfo {
task_token,
task_queue,
workflow_type,
workflow_namespace,
workflow_execution,
activity_id,
activity_type,
heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
scheduled_time: scheduled_time.try_into_or_none(),
started_time: started_time.try_into_or_none(),
deadline,
attempt,
current_attempt_scheduled_time: current_attempt_scheduled_time.try_into_or_none(),
retry_policy,
is_local,
},
first_arg,
)
}
}

/// Returns a future the completes if and when the activity this was called inside has been
Expand All @@ -133,8 +128,8 @@ impl ActContext {
/// Retrieve extra parameters to the Activity. The first input is always popped and passed to
/// the Activity function for the currently executing activity. However, if more parameters are
/// passed, perhaps from another language's SDK, explicit access is available from extra_inputs
pub fn extra_inputs(&mut self) -> &mut [Payload] {
&mut self.input
pub fn get_args(&self) -> &[Payload] {
&self.input
}

/// Extract heartbeat details from last failed attempt. This is used in combination with retry policy.
Expand Down
Loading