diff --git a/core-api/src/worker.rs b/core-api/src/worker.rs index fb1baa591..18638c1fb 100644 --- a/core-api/src/worker.rs +++ b/core-api/src/worker.rs @@ -161,6 +161,12 @@ pub struct WorkerConfig { /// A versioning strategy for this worker. pub versioning_strategy: WorkerVersioningStrategy, + + /// The interval within which the worker will send a heartbeat. + /// The timer is reset on each existing RPC call that also happens to send this data, like + /// `PollWorkflowTaskQueueRequest`. + #[builder(default = "Duration::from_secs(60)")] + pub heartbeat_interval: Duration, } impl WorkerConfig { diff --git a/core/Cargo.toml b/core/Cargo.toml index a7accd34c..a62fa20b8 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -36,6 +36,7 @@ enum-iterator = "2" flate2 = { version = "1.0", optional = true } futures-util = { version = "0.3", default-features = false } futures-channel = { version = "0.3", default-features = false, features = ["std"] } +gethostname = "1.0.2" governor = "0.8" http-body-util = { version = "0.1", optional = true } hyper = { version = "1.2", optional = true } @@ -95,6 +96,7 @@ criterion = { version = "0.6", features = ["async", "async_tokio"] } rstest = "0.25" temporal-sdk-core-test-utils = { path = "../test-utils" } temporal-sdk = { path = "../sdk" } +tokio = { version = "1.37", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs", "process", "test-util"] } tokio-stream = { version = "0.1", features = ["net"] } [[test]] diff --git a/core/src/core_tests/activity_tasks.rs b/core/src/core_tests/activity_tasks.rs index a01e2d605..c011b4f46 100644 --- a/core/src/core_tests/activity_tasks.rs +++ b/core/src/core_tests/activity_tasks.rs @@ -6,7 +6,7 @@ use crate::{ gen_assert_and_reply, mock_manual_poller, mock_poller, mock_poller_from_resps, mock_sdk_cfg, mock_worker, poll_and_reply, single_hist_mock_sg, test_worker_cfg, }, - worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client}, + worker::client::mocks::{mock_manual_worker_client, mock_worker_client}, }; use futures_util::FutureExt; use itertools::Itertools; @@ -86,7 +86,7 @@ fn three_tasks() -> VecDeque { async fn max_activities_respected() { let _task_q = "q"; let mut tasks = three_tasks(); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_poll_activity_task() .times(3) @@ -122,7 +122,7 @@ async fn max_activities_respected() { #[tokio::test] async fn activity_not_found_returns_ok() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); // Mock won't even be called, since we weren't tracking activity mock_client.expect_complete_activity_task().times(0); @@ -139,7 +139,7 @@ async fn activity_not_found_returns_ok() { #[tokio::test] async fn heartbeats_report_cancels_only_once() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_record_activity_heartbeat() .times(2) @@ -265,7 +265,7 @@ async fn activity_cancel_interrupts_poll() { .times(3) .returning(move || poll_resps.pop_front().unwrap()); - let mut mock_client = mock_manual_workflow_client(); + let mut mock_client = mock_manual_worker_client(); mock_client .expect_record_activity_heartbeat() .times(1) @@ -323,7 +323,7 @@ async fn activity_cancel_interrupts_poll() { #[tokio::test] async fn activity_poll_timeout_retries() { - let mock_client = mock_workflow_client(); + let mock_client = mock_worker_client(); let mut calls = 0; let mut mock_act_poller = mock_poller(); mock_act_poller.expect_poll().times(3).returning(move || { @@ -352,7 +352,7 @@ async fn many_concurrent_heartbeat_cancels() { // them after a few successful heartbeats const CONCURRENCY_NUM: usize = 5; - let mut mock_client = mock_manual_workflow_client(); + let mut mock_client = mock_manual_worker_client(); let mut poll_resps = VecDeque::from( (0..CONCURRENCY_NUM) .map(|i| { @@ -516,7 +516,7 @@ async fn activity_timeout_no_double_resolve() { #[tokio::test] async fn can_heartbeat_acts_during_shutdown() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_record_activity_heartbeat() .times(1) @@ -567,7 +567,7 @@ async fn can_heartbeat_acts_during_shutdown() { #[tokio::test] async fn complete_act_with_fail_flushes_heartbeat() { let last_hb = 50; - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let last_seen_payload = Rc::new(RefCell::new(None)); let lsp = last_seen_payload.clone(); mock_client @@ -622,7 +622,7 @@ async fn complete_act_with_fail_flushes_heartbeat() { #[tokio::test] async fn max_tq_acts_set_passed_to_poll_properly() { let rate = 9.28; - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_poll_activity_task() .returning(move |_, ao| { @@ -659,7 +659,7 @@ async fn no_eager_activities_requested_when_worker_options_disable_it( let num_eager_requested = Arc::new(AtomicUsize::new(0)); let num_eager_requested_clone = num_eager_requested.clone(); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_complete_workflow_task() .times(1) .returning(move |req| { @@ -747,7 +747,7 @@ async fn activity_tasks_from_completion_are_delivered() { // Clone it to move into the callback below let num_eager_requested_clone = num_eager_requested.clone(); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_complete_workflow_task() .times(1) .returning(move |req| { @@ -876,7 +876,7 @@ async fn activity_tasks_from_completion_reserve_slots() { t.add_full_wf_task(); t.add_workflow_execution_completed(); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); // Set up two tasks to be returned via normal activity polling let act_tasks = VecDeque::from(vec![ PollActivityTaskQueueResponse { @@ -1004,7 +1004,7 @@ async fn activity_tasks_from_completion_reserve_slots() { #[tokio::test] async fn retryable_net_error_exhaustion_is_nonfatal() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_complete_activity_task() .times(1) @@ -1033,7 +1033,7 @@ async fn retryable_net_error_exhaustion_is_nonfatal() { #[tokio::test] async fn cant_complete_activity_with_unset_result_payload() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_poll_activity_task() .returning(move |_, _| { @@ -1076,7 +1076,7 @@ async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) { .times(1) .returning(move || None); // They shall all be reported as failed - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_fail_activity_task() .times(3) @@ -1153,7 +1153,7 @@ async fn activities_must_be_flushed_to_server_on_shutdown(#[values(true, false)] .expect_poll() .times(1) .returning(move || None); - let mut mock_client = mock_manual_workflow_client(); + let mut mock_client = mock_manual_worker_client(); mock_client .expect_complete_activity_task() .times(1) @@ -1251,7 +1251,7 @@ async fn pass_activity_summary_to_metadata() { #[tokio::test] async fn heartbeat_response_can_be_paused() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); // First heartbeat returns pause only mock_client .expect_record_activity_heartbeat() diff --git a/core/src/core_tests/child_workflows.rs b/core/src/core_tests/child_workflows.rs index c1bf6a923..1593acf94 100644 --- a/core/src/core_tests/child_workflows.rs +++ b/core/src/core_tests/child_workflows.rs @@ -4,7 +4,7 @@ use crate::{ MockPollCfg, ResponseType, build_fake_sdk, canned_histories, mock_sdk, mock_sdk_cfg, mock_worker, single_hist_mock_sg, }, - worker::client::mocks::mock_workflow_client, + worker::client::mocks::mock_worker_client, }; use temporal_client::WorkflowOptions; use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowResult}; @@ -32,7 +32,7 @@ async fn signal_child_workflow(#[case] serial: bool) { let wf_id = "fakeid"; let wf_type = DEFAULT_WORKFLOW_TYPE; let t = canned_histories::single_child_workflow_signaled("child-id-1", SIGNAME); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut worker = mock_sdk(MockPollCfg::from_resp_batches( wf_id, t, @@ -130,7 +130,7 @@ async fn cancel_child_workflow_lang_thinks_not_started_but_is( } _ => canned_histories::single_child_workflow_cancelled("child-id-1"), }; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mock = single_hist_mock_sg("fakeid", t, [ResponseType::AllHistory], mock, true); let core = mock_worker(mock); let act = core.poll_workflow_activation().await.unwrap(); @@ -179,7 +179,7 @@ async fn cancel_child_workflow_lang_thinks_not_started_but_is( #[tokio::test] async fn cancel_already_complete_child_ignored() { let t = canned_histories::single_child_workflow("child-id-1"); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mock = single_hist_mock_sg("fakeid", t, [ResponseType::AllHistory], mock, true); let core = mock_worker(mock); let act = core.poll_workflow_activation().await.unwrap(); diff --git a/core/src/core_tests/determinism.rs b/core/src/core_tests/determinism.rs index 3e552e0bc..4c3ac90a4 100644 --- a/core/src/core_tests/determinism.rs +++ b/core/src/core_tests/determinism.rs @@ -2,7 +2,7 @@ use crate::{ internal_flags::CoreInternalFlags, replay::DEFAULT_WORKFLOW_TYPE, test_help::{MockPollCfg, ResponseType, canned_histories, mock_sdk, mock_sdk_cfg}, - worker::client::mocks::mock_workflow_client, + worker::client::mocks::mock_worker_client, }; use std::{ sync::atomic::{AtomicBool, AtomicUsize, Ordering}, @@ -40,7 +40,7 @@ async fn test_panic_wf_task_rejected_properly() { let wf_id = "fakeid"; let wf_type = DEFAULT_WORKFLOW_TYPE; let t = canned_histories::workflow_fails_with_failure_after_timer("1"); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 2], mock); // We should see one wft failure which has unspecified cause, since panics don't have a defined // type. @@ -72,7 +72,7 @@ async fn test_wf_task_rejected_properly_due_to_nondeterminism(#[case] use_cache: let wf_id = "fakeid"; let wf_type = DEFAULT_WORKFLOW_TYPE; let t = canned_histories::single_timer_wf_completes("1"); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches( wf_id, t, @@ -131,7 +131,7 @@ async fn activity_id_or_type_change_is_nondeterministic( canned_histories::single_activity("1") }; t.set_flags_first_wft(&[CoreInternalFlags::IdAndTypeDeterminismChecks as u32], &[]); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches( wf_id, t, @@ -214,7 +214,7 @@ async fn child_wf_id_or_type_change_is_nondeterministic( let wf_type = DEFAULT_WORKFLOW_TYPE; let mut t = canned_histories::single_child_workflow("1"); t.set_flags_first_wft(&[CoreInternalFlags::IdAndTypeDeterminismChecks as u32], &[]); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches( wf_id, t, @@ -289,7 +289,7 @@ async fn repro_channel_missing_because_nondeterminism() { let _ts = t.add_by_type(EventType::TimerStarted); t.add_workflow_task_scheduled_and_started(); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches(wf_id, t, [1.into(), ResponseType::AllHistory], mock); mh.num_expected_fails = 1; diff --git a/core/src/core_tests/local_activities.rs b/core/src/core_tests/local_activities.rs index 327eb81bd..d524d0317 100644 --- a/core/src/core_tests/local_activities.rs +++ b/core/src/core_tests/local_activities.rs @@ -5,7 +5,7 @@ use crate::{ MockPollCfg, ResponseType, WorkerExt, build_mock_pollers, hist_to_poll_resp, mock_sdk, mock_sdk_cfg, mock_worker, single_hist_mock_sg, }, - worker::{LEGACY_QUERY_ID, client::mocks::mock_workflow_client}, + worker::{LEGACY_QUERY_ID, client::mocks::mock_worker_client}, }; use anyhow::anyhow; use crossbeam_queue::SegQueue; @@ -69,7 +69,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached: t.add_workflow_execution_completed(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let resps = if replay { vec![ResponseType::AllHistory] } else { @@ -140,7 +140,7 @@ async fn local_act_many_concurrent() { t.add_workflow_execution_completed(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 3], mock); let mut worker = mock_sdk(mh); @@ -178,7 +178,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) { t.add_workflow_task_scheduled_and_started(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 2, 2], mock); mh.enforce_correct_number_of_polls = false; let mut worker = mock_sdk_cfg(mh, |wc| { @@ -240,7 +240,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) { t.add_workflow_task_scheduled_and_started(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mh = MockPollCfg::from_resp_batches(wf_id, t, [1], mock); let mut worker = mock_sdk(mh); @@ -316,7 +316,7 @@ async fn local_act_retry_long_backoff_uses_timer() { t.add_workflow_execution_completed(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mh = MockPollCfg::from_resp_batches( wf_id, t, @@ -376,7 +376,7 @@ async fn local_act_null_result() { t.add_workflow_execution_completed(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mh = MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::AllHistory], mock); let mut worker = mock_sdk_cfg(mh, |w| w.max_cached_workflows = 1); @@ -418,7 +418,7 @@ async fn local_act_command_immediately_follows_la_marker() { t.add_full_wf_task(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); // Bug only repros when seeing history up to third wft let mh = MockPollCfg::from_resp_batches(wf_id, t, [3], mock); let mut worker = mock_sdk_cfg(mh, |w| w.max_cached_workflows = 0); @@ -489,7 +489,7 @@ async fn query_during_wft_heartbeat_doesnt_accidentally_fail_to_continue_heartbe ), ), ]; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mock = single_hist_mock_sg(wfid, t, tasks, mock, true); mock.worker_cfg(|wc| wc.max_cached_workflows = 1); let core = mock_worker(mock); @@ -605,7 +605,7 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer pr }, ]; - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); if impossible_query_in_task { mock.expect_respond_legacy_query() .times(1) @@ -712,7 +712,7 @@ async fn test_schedule_to_start_timeout() { t.add_full_wf_task(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mh = MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::ToTaskNum(1)], mock); let mut worker = mock_sdk_cfg(mh, |w| w.max_cached_workflows = 1); @@ -791,7 +791,7 @@ async fn test_schedule_to_start_timeout_not_based_on_original_time( t.add_workflow_task_scheduled_and_started(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mh = MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::AllHistory], mock); let mut worker = mock_sdk_cfg(mh, |w| w.max_cached_workflows = 1); @@ -868,7 +868,7 @@ async fn start_to_close_timeout_allows_retries(#[values(true, false)] la_complet t.add_workflow_execution_completed(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mh = MockPollCfg::from_resp_batches( wf_id, t, @@ -947,7 +947,7 @@ async fn wft_failure_cancels_running_las() { t.add_workflow_task_scheduled_and_started(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2], mock); mh.num_expected_fails = 1; let mut worker = mock_sdk_cfg(mh, |w| w.max_cached_workflows = 1); @@ -1007,7 +1007,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() { t.add_workflow_task_scheduled_and_started(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches( wf_id, t, @@ -1058,7 +1058,7 @@ async fn local_act_records_nonfirst_attempts_ok() { t.add_workflow_task_scheduled_and_started(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 3], mock); let nonfirst_counts = Arc::new(SegQueue::new()); let nfc_c = nonfirst_counts.clone(); @@ -1125,7 +1125,7 @@ async fn local_activities_can_be_delivered_during_shutdown() { t.add_timer_fired(timer_started_event_id, "1".to_string()); t.add_workflow_task_scheduled_and_started(); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mock = single_hist_mock_sg( wfid, t, @@ -1214,7 +1214,7 @@ async fn queries_can_be_received_while_heartbeating() { pr }, ]; - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_respond_legacy_query() .times(1) .returning(move |_, _| Ok(Default::default())); @@ -1291,7 +1291,7 @@ async fn local_activity_after_wf_complete_is_discarded() { t.add_full_wf_task(); t.add_workflow_task_scheduled_and_started(); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mock_cfg = MockPollCfg::from_resp_batches( wfid, t, @@ -1385,7 +1385,7 @@ async fn local_act_retry_explicit_delay() { t.add_workflow_task_scheduled_and_started(); let wf_id = "fakeid"; - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mh = MockPollCfg::from_resp_batches(wf_id, t, [1], mock); let mut worker = mock_sdk(mh); diff --git a/core/src/core_tests/mod.rs b/core/src/core_tests/mod.rs index 2f922f0b3..62fc66a02 100644 --- a/core/src/core_tests/mod.rs +++ b/core/src/core_tests/mod.rs @@ -13,7 +13,7 @@ use crate::{ Worker, errors::PollError, test_help::{MockPollCfg, build_mock_pollers, canned_histories, mock_worker, test_worker_cfg}, - worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client}, + worker::client::mocks::{mock_manual_worker_client, mock_worker_client}, }; use futures_util::FutureExt; use std::{sync::LazyLock, time::Duration}; @@ -24,7 +24,7 @@ use tokio::{sync::Barrier, time::sleep}; #[tokio::test] async fn after_shutdown_server_is_not_polled() { let t = canned_histories::single_timer("fake_timer"); - let mh = MockPollCfg::from_resp_batches("fake_wf_id", t, [1], mock_workflow_client()); + let mh = MockPollCfg::from_resp_batches("fake_wf_id", t, [1], mock_worker_client()); let mut mock = build_mock_pollers(mh); // Just so we don't have to deal w/ cache overflow mock.worker_cfg(|cfg| cfg.max_cached_workflows = 1); @@ -49,7 +49,7 @@ static BARR: LazyLock = LazyLock::new(|| Barrier::new(3)); #[tokio::test] async fn shutdown_interrupts_both_polls() { - let mut mock_client = mock_manual_workflow_client(); + let mut mock_client = mock_manual_worker_client(); mock_client .expect_poll_activity_task() .times(1) diff --git a/core/src/core_tests/queries.rs b/core/src/core_tests/queries.rs index 6550330ea..1b152e0b6 100644 --- a/core/src/core_tests/queries.rs +++ b/core/src/core_tests/queries.rs @@ -5,7 +5,7 @@ use crate::{ }, worker::{ LEGACY_QUERY_ID, - client::{LegacyQueryResult, mocks::mock_workflow_client}, + client::{LegacyQueryResult, mocks::mock_worker_client}, }, }; use futures_util::stream; @@ -67,7 +67,7 @@ async fn legacy_query(#[case] include_history: bool) { }, hist_to_poll_resp(&t, wfid.to_owned(), 2.into()), ]; - let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client()); + let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_worker_client()); mock.num_expected_legacy_query_resps = 1; let mut mock = build_mock_pollers(mock); if !include_history { @@ -159,9 +159,9 @@ async fn new_queries(#[values(1, 3)] num_queries: usize) { } pr }]); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client.expect_respond_legacy_query().times(0); - let mut mh = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client()); + let mut mh = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_worker_client()); mh.completion_mock_fn = Some(Box::new(move |c| { // If the completion is the one ending the workflow, make sure it includes the query resps if c.commands[0].command_type() == CommandType::CompleteWorkflowExecution { @@ -238,7 +238,7 @@ async fn legacy_query_failure_on_wft_failure() { pr.history = Some(History { events: vec![] }); pr }]); - let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client()); + let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_worker_client()); mock.num_expected_legacy_query_resps = 1; let mut mock = build_mock_pollers(mock); mock.worker_cfg(|wc| wc.max_cached_workflows = 10); @@ -301,7 +301,7 @@ async fn query_failure_because_nondeterminism(#[values(true, false)] legacy: boo } pr }]; - let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client()); + let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_worker_client()); if legacy { mock.expect_legacy_query_matcher = Box::new(|_, f| match f { LegacyQueryResult::Failed(f) => { @@ -359,7 +359,7 @@ async fn legacy_query_after_complete(#[values(false, true)] full_history: bool) }; tasks.extend([query_with_hist_task.clone(), query_with_hist_task]); - let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client()); + let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_worker_client()); mock.num_expected_legacy_query_resps = 2; let mut mock = build_mock_pollers(mock); mock.worker_cfg(|wc| wc.max_cached_workflows = 10); @@ -452,7 +452,7 @@ async fn query_cache_miss_causes_page_fetch_dont_reply_wft_too_early( ); pr }]); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); if !matches!(hist_type, QueryHists::Full) { mock_client .expect_get_workflow_execution_history() @@ -543,7 +543,7 @@ async fn query_replay_with_continue_as_new_doesnt_reply_empty_command() { pr }; let tasks = VecDeque::from(vec![query_with_hist_task]); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_complete_workflow_task() .times(1) @@ -627,7 +627,7 @@ async fn legacy_query_response_gets_not_found_not_fatal() { }); pr }]; - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_respond_legacy_query() .times(1) .returning(move |_, _| Err(tonic::Status::not_found("Query gone boi"))); @@ -680,7 +680,7 @@ async fn new_query_fail() { ); pr }]); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_complete_workflow_task() .times(1) @@ -783,7 +783,7 @@ async fn legacy_query_combined_with_timer_fire_repro() { pr }, ]; - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_respond_legacy_query() .times(1) .returning(move |_, _| Ok(Default::default())); @@ -884,9 +884,9 @@ async fn build_id_set_properly_on_query_on_first_task() { ); pr }]); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client.expect_respond_legacy_query().times(0); - let mh = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client()); + let mh = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_worker_client()); let mut mock = build_mock_pollers(mh); mock.worker_cfg(|wc| { wc.max_cached_workflows = 10; @@ -971,7 +971,7 @@ async fn queries_arent_lost_in_buffer_void(#[values(false, true)] buffered_becau hist_to_poll_resp(&t, wfid.to_owned(), 2.into()), ]); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_complete_workflow_task() .returning(|_| Ok(Default::default())); mock.expect_respond_legacy_query() diff --git a/core/src/core_tests/replay_flag.rs b/core/src/core_tests/replay_flag.rs index db4599f32..bb17f4414 100644 --- a/core/src/core_tests/replay_flag.rs +++ b/core/src/core_tests/replay_flag.rs @@ -3,7 +3,7 @@ use crate::{ MockPollCfg, ResponseType, build_fake_sdk, build_mock_pollers, canned_histories, hist_to_poll_resp, mock_worker, }, - worker::{LEGACY_QUERY_ID, client::mocks::mock_workflow_client}, + worker::{LEGACY_QUERY_ID, client::mocks::mock_worker_client}, }; use rstest::{fixture, rstest}; use std::{collections::VecDeque, time::Duration}; @@ -94,7 +94,7 @@ async fn replay_flag_correct_with_query() { }, hist_to_poll_resp(&t, wfid.to_owned(), 2.into()), ]); - let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_workflow_client()); + let mut mock = MockPollCfg::from_resp_batches(wfid, t, tasks, mock_worker_client()); mock.num_expected_legacy_query_resps = 1; let mut mock = build_mock_pollers(mock); mock.worker_cfg(|wc| wc.max_cached_workflows = 10); @@ -139,7 +139,7 @@ async fn replay_flag_correct_signal_before_query_ending_on_wft_completed() { pr }; - let mut mock = MockPollCfg::from_resp_batches(wfid, t, [task], mock_workflow_client()); + let mut mock = MockPollCfg::from_resp_batches(wfid, t, [task], mock_worker_client()); mock.num_expected_legacy_query_resps = 1; let mut mock = build_mock_pollers(mock); mock.worker_cfg(|wc| wc.max_cached_workflows = 10); diff --git a/core/src/core_tests/updates.rs b/core/src/core_tests/updates.rs index 854d85d4a..778e6dfb4 100644 --- a/core/src/core_tests/updates.rs +++ b/core/src/core_tests/updates.rs @@ -4,7 +4,7 @@ use crate::{ MockPollCfg, PollWFTRespExt, ResponseType, build_mock_pollers, hist_to_poll_resp, mock_worker, }, - worker::client::mocks::mock_workflow_client, + worker::client::mocks::mock_worker_client, }; use temporal_sdk_core_api::Worker; use temporal_sdk_core_protos::{ @@ -108,7 +108,7 @@ async fn initial_request_sent_back(#[values(false, true)] reject: bool) { let mut poll_resp = hist_to_poll_resp(&t, wfid, ResponseType::AllHistory); let upd_req_body = poll_resp.add_update_request(update_id, 1); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_complete_workflow_task() .times(1) @@ -171,7 +171,7 @@ async fn speculative_wft_with_command_event() { EventType::ActivityTaskScheduled as i32 ); - let mock_client = mock_workflow_client(); + let mock_client = mock_worker_client(); let mut mh = MockPollCfg::from_resp_batches( wfid, real_hist, diff --git a/core/src/core_tests/workers.rs b/core/src/core_tests/workers.rs index f4ea8482c..475f3d1d3 100644 --- a/core/src/core_tests/workers.rs +++ b/core/src/core_tests/workers.rs @@ -8,7 +8,7 @@ use crate::{ self, client::{ MockWorkerClient, - mocks::{DEFAULT_TEST_CAPABILITIES, DEFAULT_WORKERS_REGISTRY, mock_workflow_client}, + mocks::{DEFAULT_TEST_CAPABILITIES, DEFAULT_WORKERS_REGISTRY, mock_worker_client}, }, }, }; @@ -106,7 +106,7 @@ async fn worker_shutdown_during_poll_doesnt_deadlock() { )) }); let mw = MockWorkerInputs::new(stream.boxed()); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_complete_workflow_task() .returning(|_| Ok(RespondWorkflowTaskCompletedResponse::default())); @@ -126,7 +126,7 @@ async fn worker_shutdown_during_poll_doesnt_deadlock() { #[tokio::test] async fn can_shutdown_local_act_only_worker_when_act_polling() { let t = canned_histories::single_timer("1"); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mh = MockPollCfg::from_resp_batches("fakeid", t, [1], mock); let mut mock = build_mock_pollers(mh); mock.worker_cfg(|w| { @@ -166,7 +166,7 @@ async fn can_shutdown_local_act_only_worker_when_act_polling() { #[tokio::test] async fn complete_with_task_not_found_during_shutdown() { let t = canned_histories::single_timer("1"); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_complete_workflow_task() .times(1) .returning(|_| Err(tonic::Status::not_found("Workflow task not found."))); @@ -209,7 +209,7 @@ async fn complete_eviction_after_shutdown_doesnt_panic() { "fakeid", t, [1], - mock_workflow_client(), + mock_worker_client(), )); mh.make_wft_stream_interminable(); let core = mock_worker(mh); @@ -236,7 +236,7 @@ async fn complete_eviction_after_shutdown_doesnt_panic() { #[tokio::test] async fn worker_does_not_panic_on_retry_exhaustion_of_nonfatal_net_err() { let t = canned_histories::single_timer("1"); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); // Return a failure that counts as retryable, and hence we want to be swallowed mock.expect_complete_workflow_task() .times(1) @@ -264,7 +264,7 @@ async fn worker_does_not_panic_on_retry_exhaustion_of_nonfatal_net_err() { #[rstest::rstest] #[tokio::test] async fn worker_can_shutdown_after_never_polling_ok(#[values(true, false)] poll_workflow: bool) { - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_poll_activity_task() .returning(|_, _| Err(tonic::Status::permission_denied("you shall not pass"))); if poll_workflow { @@ -314,6 +314,8 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool) mock.expect_is_mock().returning(|| true); mock.expect_sdk_name_and_version() .returning(|| ("test-core".to_string(), "0.0.0".to_string())); + mock.expect_get_identity() + .returning(|| "test-identity".to_string()); if use_cache { if api_success { mock.expect_shutdown_worker() diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index 94f626d0f..72ef7a7bb 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -13,7 +13,7 @@ use crate::{ }, worker::{ TunerBuilder, - client::mocks::{mock_manual_workflow_client, mock_workflow_client}, + client::mocks::{mock_manual_worker_client, mock_worker_client}, }, }; use futures_util::{FutureExt, stream}; @@ -499,7 +499,7 @@ async fn abandoned_activities_ignore_start_and_complete(hist_batches: &'static [ t.add_timer_fired(timer_started_event_id, "2".to_string()); t.add_full_wf_task(); t.add_workflow_execution_completed(); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut worker = mock_sdk(MockPollCfg::from_resp_batches(wfid, t, hist_batches, mock)); worker.register_wf(wf_type.to_owned(), |ctx: WfContext| async move { @@ -1156,7 +1156,7 @@ async fn wft_timeout_repro(hist_batches: &'static [usize]) { async fn complete_after_eviction() { let wfid = "fake_wf_id"; let t = canned_histories::single_timer("1"); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_complete_workflow_task().times(0); let mock = single_hist_mock_sg(wfid, t, [2], mock, true); let core = mock_worker(mock); @@ -1194,7 +1194,7 @@ async fn sends_appropriate_sticky_task_queue_responses() { // include the information that tells the server to enqueue the next task on a sticky queue. let wfid = "fake_wf_id"; let t = canned_histories::single_timer("1"); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_complete_workflow_task() .withf(|comp| comp.sticky_attributes.is_some()) .times(1) @@ -1218,7 +1218,7 @@ async fn sends_appropriate_sticky_task_queue_responses() { async fn new_server_work_while_eviction_outstanding_doesnt_overwrite_activation() { let wfid = "fake_wf_id"; let t = canned_histories::single_timer("1"); - let mock = single_hist_mock_sg(wfid, t, [1, 2], mock_workflow_client(), false); + let mock = single_hist_mock_sg(wfid, t, [1, 2], mock_worker_client(), false); let taskmap = mock.outstanding_task_map.clone().unwrap(); let core = mock_worker(mock); @@ -1279,7 +1279,7 @@ async fn buffered_work_drained_on_shutdown() { tasks.extend( std::iter::repeat_with(|| hist_to_poll_resp(&t, wfid.to_owned(), 2.into()).resp).take(50), ); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_complete_workflow_task() .returning(|_| Ok(RespondWorkflowTaskCompletedResponse::default())); let mut mock = MocksHolder::from_wft_stream(mock, stream::iter(tasks)); @@ -1323,7 +1323,7 @@ async fn fail_wft_then_recover() { t, // We need to deliver all of history twice because of eviction [ResponseType::AllHistory, ResponseType::AllHistory], - mock_workflow_client(), + mock_worker_client(), ); mh.num_expected_fails = 1; mh.expect_fail_wft_matcher = @@ -1388,7 +1388,7 @@ async fn poll_response_triggers_wf_error() { "fake_wf_id", t, [ResponseType::AllHistory], - mock_workflow_client(), + mock_worker_client(), ); // Fail wft will be called when auto-failing. mh.num_expected_fails = 1; @@ -1418,7 +1418,7 @@ async fn lang_slower_than_wft_timeouts() { t.add_full_wf_task(); t.add_workflow_execution_completed(); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_complete_workflow_task() .times(1) .returning(|_| Err(tonic::Status::not_found("Workflow task not found."))); @@ -1476,7 +1476,7 @@ async fn tries_cancel_of_completed_activity() { t.add_activity_task_completed(scheduled_event_id, started_event_id, Default::default()); t.add_workflow_task_scheduled_and_started(); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mock = single_hist_mock_sg("fake_wf_id", t, [1, 2], mock, true); mock.worker_cfg(|cfg| cfg.max_cached_workflows = 1); let core = mock_worker(mock); @@ -1524,7 +1524,7 @@ async fn failing_wft_doesnt_eat_permit_forever() { t.add_by_type(EventType::WorkflowExecutionStarted); t.add_workflow_task_scheduled_and_started(); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mock = MockPollCfg::from_resp_batches("fake_wf_id", t, [1, 1, 1], mock); mock.num_expected_fails = 1; let mut mock = build_mock_pollers(mock); @@ -1586,7 +1586,7 @@ async fn cache_miss_will_fetch_history() { "fake_wf_id", t, [ResponseType::ToTaskNum(1), ResponseType::OneTask(2)], - mock_workflow_client(), + mock_worker_client(), ); mh.mock_client .expect_get_workflow_execution_history() @@ -1682,7 +1682,7 @@ async fn history_byte_size_and_can_suggestion_in_activation() { "fake_wf_id", t, [ResponseType::ToTaskNum(1), ResponseType::OneTask(2)], - mock_workflow_client(), + mock_worker_client(), ); let mut mock = build_mock_pollers(mh); mock.worker_cfg(|cfg| cfg.max_cached_workflows = 1); @@ -1713,7 +1713,7 @@ async fn tasks_from_completion_are_delivered() { t.add_full_wf_task(); t.add_workflow_execution_completed(); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); let complete_resp = RespondWorkflowTaskCompletedResponse { workflow_task: Some(hist_to_poll_resp(&t, wfid.to_owned(), 2.into()).resp), activity_tasks: vec![], @@ -1758,7 +1758,7 @@ async fn pagination_works_with_tasks_from_completion() { t.add_we_signaled("sig", vec![]); t.add_workflow_task_scheduled_and_started(); - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); let mut needs_pag_resp = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::OneTask(2)).resp; needs_pag_resp.next_page_token = vec![1]; let complete_resp = RespondWorkflowTaskCompletedResponse { @@ -1812,7 +1812,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() { response_batches: vec![ResponseType::ToTaskNum(1)], }) .collect(); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_complete_workflow_task() .times(3) @@ -1937,7 +1937,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() { async fn eviction_waits_until_replay_finished() { let wfid = "fake_wf_id"; let t = canned_histories::long_sequential_timers(3); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mock = single_hist_mock_sg(wfid, t, [3], mock, true); let core = mock_worker(mock); @@ -1998,7 +1998,7 @@ async fn autocompletes_wft_no_work() { let started_event_id = t.add_activity_task_started(scheduled_event_id); t.add_activity_task_completed(scheduled_event_id, started_event_id, Default::default()); t.add_full_wf_task(); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut mock = single_hist_mock_sg(wfid, t, [1, 2, 3, 4], mock, true); mock.worker_cfg(|w| w.max_cached_workflows = 1); let core = mock_worker(mock); @@ -2052,7 +2052,7 @@ async fn autocompletes_wft_no_work() { #[tokio::test] async fn no_race_acquiring_permits() { let wfid = "fake_wf_id"; - let mut mock_client = mock_manual_workflow_client(); + let mut mock_client = mock_manual_worker_client(); // We need to allow two polls to happen by triggering two processing events in the workflow // stream, but then delivering the actual tasks after that let task_barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2))); @@ -2127,7 +2127,7 @@ async fn continue_as_new_preserves_some_values() { wes_attrs.memo = Some(memo); wes_attrs.search_attributes = Some(search); wes_attrs.retry_policy = Some(retry_policy); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let t = { let mut t = TestHistoryBuilder::default(); t.add(wes_attrs.clone()); @@ -2185,7 +2185,7 @@ async fn ignorable_events_are_ok(#[values(true, false)] attribs_unset: bool) { }); t.add_workflow_task_scheduled_and_started(); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mock = single_hist_mock_sg("wheee", t, [ResponseType::AllHistory], mock, true); let core = mock_worker(mock); @@ -2198,7 +2198,7 @@ async fn ignorable_events_are_ok(#[values(true, false)] attribs_unset: bool) { #[tokio::test] async fn fetching_to_continue_replay_works() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_full_wf_task(); @@ -2269,7 +2269,7 @@ async fn fetching_to_continue_replay_works() { #[tokio::test] async fn fetching_error_evicts_wf() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_workflow_task_scheduled_and_started(); @@ -2336,7 +2336,7 @@ async fn ensure_fetching_fail_during_complete_sends_task_failure() { next_page.history.as_mut().unwrap().events.truncate(9); next_page.next_page_token = vec![2]; - let mut mock = mock_workflow_client(); + let mut mock = mock_worker_client(); mock.expect_get_workflow_execution_history() .returning(move |_, _, _| { error!("Called fetch!"); @@ -2392,7 +2392,7 @@ async fn lang_internal_flags() { "fake_wf_id", t, [ResponseType::ToTaskNum(2), ResponseType::AllHistory], - mock_workflow_client(), + mock_worker_client(), ); mh.completion_mock_fn = Some(Box::new(|c| { assert_matches!(c.sdk_metadata.lang_used_flags.as_slice(), &[2]); @@ -2433,7 +2433,7 @@ async fn lang_internal_flag_with_update() { "fake_wf_id", t, [ResponseType::AllHistory], - mock_workflow_client(), + mock_worker_client(), ); let mut mock = build_mock_pollers(mh); mock.worker_cfg(|wc| wc.max_cached_workflows = 1); @@ -2483,7 +2483,7 @@ async fn core_internal_flags() { "fake_wf_id", t, [ResponseType::ToTaskNum(1)], - mock_workflow_client(), + mock_worker_client(), ); mh.completion_mock_fn = Some(Box::new(move |c| { assert_eq!( @@ -2585,7 +2585,7 @@ async fn _do_post_terminal_commands_test( t: TestHistoryBuilder, ) { let mut mh = - MockPollCfg::from_resp_batches("fake_wf_id", t, response_types, mock_workflow_client()); + MockPollCfg::from_resp_batches("fake_wf_id", t, response_types, mock_worker_client()); if let Some(expected_command_types) = expected_command_types { mh.num_expected_completions = Some(TimesRange::from(1)); mh.completion_mock_fn = Some(Box::new(move |c| { @@ -2636,7 +2636,7 @@ async fn jobs_are_in_appropriate_order() { "fake_wf_id", t, [ResponseType::AllHistory], - mock_workflow_client(), + mock_worker_client(), ); let mut mock = build_mock_pollers(mh); mock.worker_cfg(|wc| wc.max_cached_workflows = 1); @@ -2701,7 +2701,7 @@ async fn history_length_with_fail_and_timeout( t.add_full_wf_task(); t.add_workflow_execution_completed(); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let history_responses = match history_responses_case { 1 => vec![ResponseType::AllHistory], 2 => vec![ @@ -2775,7 +2775,7 @@ async fn poller_wont_run_ahead_of_task_slots() { ) .resp }); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_poll_workflow_task() .returning(move |_, _| Ok(bunch_of_first_tasks.next().unwrap())); @@ -2833,7 +2833,7 @@ async fn poller_wont_run_ahead_of_task_slots() { #[tokio::test] async fn poller_wont_poll_until_lang_polls() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let (tx, rx) = sync_channel(101); // Normally you'd just not set any expectations, but the problem is since we never poll // the WFT stream, we'll never join the tasks running the pollers and thus the error @@ -2874,7 +2874,7 @@ async fn use_compatible_version_flag( #[values("activity", "child_wf", "continue_as_new")] command_type: &'static str, ) { let wfid = "fake_wf_id"; - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let t = { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -2971,7 +2971,7 @@ async fn sets_build_id_from_wft_complete() { t.add_timer_fired(timer_started_event_id, "2".to_string()); t.add_workflow_task_scheduled_and_started(); - let mock = mock_workflow_client(); + let mock = mock_worker_client(); let mut worker = mock_sdk_cfg( MockPollCfg::from_resp_batches(wfid, t, [ResponseType::AllHistory], mock), |cfg| { @@ -3023,7 +3023,7 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() { ) .resp }); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_poll_workflow_task() .returning(move |_, _| Ok(bunch_of_first_tasks.next().unwrap())); @@ -3186,7 +3186,7 @@ async fn both_normal_and_sticky_pollers_poll_concurrently() { .resp }); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); // Track normal vs sticky poll requests and return actual workflow tasks let cc = Arc::clone(&counters); @@ -3258,6 +3258,7 @@ async fn both_normal_and_sticky_pollers_poll_concurrently() { Some("stickytq".to_string()), Arc::new(mock_client), None, + None, ); for _ in 1..50 { diff --git a/core/src/lib.rs b/core/src/lib.rs index 8e43bf82c..12629c59b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -61,7 +61,7 @@ use crate::{ }; use anyhow::bail; use futures_util::Stream; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use temporal_client::{ConfiguredClient, NamespacedClient, TemporalServiceClientWithMetrics}; use temporal_sdk_core_api::{ Worker as WorkerTrait, @@ -98,11 +98,19 @@ where } let client_ident = client.get_identity().to_owned(); let sticky_q = sticky_q_name_for_worker(&client_ident, &worker_config); + + if client_ident.is_empty() { + bail!("Client identity cannot be empty. Either lang or user should be setting this value"); + } + + let heartbeat_fn = Arc::new(OnceLock::new()); + let client_bag = Arc::new(WorkerClientBag::new( client, worker_config.namespace.clone(), client_ident, worker_config.versioning_strategy.clone(), + heartbeat_fn.clone(), )); Ok(Worker::new( @@ -110,6 +118,7 @@ where sticky_q, client_bag, Some(&runtime.telemetry), + Some(heartbeat_fn), )) } diff --git a/core/src/pollers/poll_buffer.rs b/core/src/pollers/poll_buffer.rs index b2a78a61c..6d7677086 100644 --- a/core/src/pollers/poll_buffer.rs +++ b/core/src/pollers/poll_buffer.rs @@ -697,7 +697,7 @@ mod tests { use super::*; use crate::{ abstractions::tests::fixed_size_permit_dealer, - worker::client::mocks::mock_manual_workflow_client, + worker::client::mocks::mock_manual_worker_client, }; use futures_util::FutureExt; use std::time::Duration; @@ -705,7 +705,7 @@ mod tests { #[tokio::test] async fn only_polls_once_with_1_poller() { - let mut mock_client = mock_manual_workflow_client(); + let mut mock_client = mock_manual_worker_client(); mock_client .expect_poll_workflow_task() .times(2) diff --git a/core/src/replay/mod.rs b/core/src/replay/mod.rs index a2c8b4764..650070b20 100644 --- a/core/src/replay/mod.rs +++ b/core/src/replay/mod.rs @@ -6,7 +6,7 @@ use crate::{ Worker, worker::{ PostActivateHookData, - client::mocks::{MockManualWorkerClient, mock_manual_workflow_client}, + client::mocks::{MockManualWorkerClient, mock_manual_worker_client}, }, }; use futures_util::{FutureExt, Stream, StreamExt}; @@ -73,7 +73,7 @@ where let mut client = if let Some(c) = self.client_override { c } else { - mock_manual_workflow_client() + mock_manual_worker_client() }; let hist_allow_tx = historator.replay_done_tx.clone(); @@ -114,7 +114,7 @@ where hist_allow_tx.send("Failed".to_string()).unwrap(); async move { Ok(RespondWorkflowTaskFailedResponse::default()) }.boxed() }); - let mut worker = Worker::new(self.config, None, Arc::new(client), None); + let mut worker = Worker::new(self.config, None, Arc::new(client), None, None); worker.set_post_activate_hook(post_activate); shutdown_tok(worker.shutdown_token()); Ok(worker) diff --git a/core/src/test_help/mod.rs b/core/src/test_help/mod.rs index 8dc69a121..34121b4c0 100644 --- a/core/src/test_help/mod.rs +++ b/core/src/test_help/mod.rs @@ -10,7 +10,7 @@ use crate::{ TaskPollers, client::{ LegacyQueryResult, MockWorkerClient, WorkerClient, WorkflowTaskCompletion, - mocks::mock_workflow_client, + mocks::mock_worker_client, }, }, }; @@ -180,6 +180,7 @@ pub(crate) fn mock_worker(mocks: MocksHolder) -> Worker { .unwrap_or_else(|| mock_poller_from_resps([])), }, None, + None, ) } @@ -430,7 +431,7 @@ impl MockPollCfg { enforce_correct_number_of_polls, num_expected_fails, num_expected_legacy_query_resps: 0, - mock_client: mock_workflow_client(), + mock_client: mock_worker_client(), expect_fail_wft_matcher: Box::new(|_, _, _| true), expect_legacy_query_matcher: Box::new(|_, _| true), completion_mock_fn: None, @@ -444,14 +445,14 @@ impl MockPollCfg { pub(crate) fn from_hist_builder(t: TestHistoryBuilder) -> Self { let full_hist_info = t.get_full_history_info().unwrap(); let tasks = 1..=full_hist_info.wf_task_count(); - Self::from_resp_batches("fake_wf_id", t, tasks, mock_workflow_client()) + Self::from_resp_batches("fake_wf_id", t, tasks, mock_worker_client()) } pub(crate) fn from_resps( t: TestHistoryBuilder, resps: impl IntoIterator>, ) -> Self { - Self::from_resp_batches("fake_wf_id", t, resps, mock_workflow_client()) + Self::from_resp_batches("fake_wf_id", t, resps, mock_worker_client()) } pub(crate) fn from_resp_batches( diff --git a/core/src/worker/activities.rs b/core/src/worker/activities.rs index 0faf06bf7..505d5c840 100644 --- a/core/src/worker/activities.rs +++ b/core/src/worker/activities.rs @@ -726,14 +726,14 @@ mod tests { abstractions::tests::fixed_size_permit_dealer, pollers::{ActivityTaskOptions, LongPollBuffer}, prost_dur, - worker::client::mocks::mock_workflow_client, + worker::client::mocks::mock_worker_client, }; use temporal_sdk_core_api::worker::PollerBehavior; use temporal_sdk_core_protos::coresdk::activity_result::ActivityExecutionResult; #[tokio::test] async fn per_worker_ratelimit() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_poll_activity_task() .times(1) @@ -812,7 +812,7 @@ mod tests { #[tokio::test] async fn local_timeouts() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_poll_activity_task() .times(1) @@ -902,7 +902,7 @@ mod tests { #[tokio::test] async fn local_timeout_heartbeating() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_poll_activity_task() .times(1) diff --git a/core/src/worker/activities/activity_heartbeat_manager.rs b/core/src/worker/activities/activity_heartbeat_manager.rs index 3944e1ef2..44e426ba9 100644 --- a/core/src/worker/activities/activity_heartbeat_manager.rs +++ b/core/src/worker/activities/activity_heartbeat_manager.rs @@ -423,7 +423,7 @@ impl HeartbeatStreamState { mod test { use super::*; - use crate::worker::client::mocks::mock_workflow_client; + use crate::worker::client::mocks::mock_worker_client; use std::time::Duration; use temporal_sdk_core_protos::temporal::api::{ common::v1::Payload, workflowservice::v1::RecordActivityTaskHeartbeatResponse, @@ -434,7 +434,7 @@ mod test { /// every 1/2 of the heartbeat timeout. #[tokio::test] async fn process_heartbeats_and_shutdown() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_record_activity_heartbeat() .returning(|_, _| Ok(RecordActivityTaskHeartbeatResponse::default())) @@ -456,7 +456,7 @@ mod test { #[tokio::test] async fn send_heartbeats_less_frequently_than_throttle_interval() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_record_activity_heartbeat() .returning(|_, _| Ok(RecordActivityTaskHeartbeatResponse::default())) @@ -475,7 +475,7 @@ mod test { /// Ensure that heartbeat can be called from a tight loop and correctly throttle #[tokio::test] async fn process_tight_loop_and_shutdown() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_record_activity_heartbeat() .returning(|_, _| Ok(RecordActivityTaskHeartbeatResponse::default())) @@ -495,7 +495,7 @@ mod test { /// This test reports one heartbeat and waits for the throttle_interval to elapse before sending another #[tokio::test] async fn report_heartbeat_after_timeout() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_record_activity_heartbeat() .returning(|_, _| Ok(RecordActivityTaskHeartbeatResponse::default())) @@ -513,7 +513,7 @@ mod test { #[tokio::test] async fn evict_works() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_record_activity_heartbeat() .returning(|_, _| Ok(RecordActivityTaskHeartbeatResponse::default())) @@ -534,7 +534,7 @@ mod test { #[tokio::test] async fn evict_immediate_after_record() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_record_activity_heartbeat() .returning(|_, _| Ok(RecordActivityTaskHeartbeatResponse::default())) diff --git a/core/src/worker/client.rs b/core/src/worker/client.rs index 2b401e204..8ba1502a4 100644 --- a/core/src/worker/client.rs +++ b/core/src/worker/client.rs @@ -1,14 +1,18 @@ //! Worker-specific client needs pub(crate) mod mocks; +use crate::abstractions::dbg_panic; use crate::protosext::legacy_query_failure; +use crate::worker::heartbeat::HeartbeatFn; use parking_lot::RwLock; +use std::sync::OnceLock; use std::{sync::Arc, time::Duration}; use temporal_client::{ Client, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching, RetryClient, SlotManager, WorkflowService, }; use temporal_sdk_core_api::worker::WorkerVersioningStrategy; +use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat; use temporal_sdk_core_protos::{ TaskToken, coresdk::{workflow_commands::QueryResult, workflow_completion}, @@ -46,6 +50,7 @@ pub(crate) struct WorkerClientBag { namespace: String, identity: String, worker_versioning_strategy: WorkerVersioningStrategy, + heartbeat_data: Arc>, } impl WorkerClientBag { @@ -54,12 +59,14 @@ impl WorkerClientBag { namespace: String, identity: String, worker_versioning_strategy: WorkerVersioningStrategy, + heartbeat_data: Arc>, ) -> Self { Self { replaceable_client: RwLock::new(client), namespace, identity, worker_versioning_strategy, + heartbeat_data, } } @@ -120,6 +127,15 @@ impl WorkerClientBag { None } } + + fn capture_heartbeat(&self) -> Option { + if let Some(hb) = self.heartbeat_data.get() { + hb() + } else { + dbg_panic!("Heartbeat function never set"); + None + } + } } /// This trait contains everything workers need to interact with Temporal, and hence provides a @@ -209,6 +225,11 @@ pub trait WorkerClient: Sync + Send { async fn describe_namespace(&self) -> Result; /// Shutdown the worker async fn shutdown_worker(&self, sticky_task_queue: String) -> Result; + /// Record a worker heartbeat + async fn record_worker_heartbeat( + &self, + heartbeat: WorkerHeartbeat, + ) -> Result; /// Replace the underlying client fn replace_client(&self, new_client: RetryClient); @@ -220,6 +241,8 @@ pub trait WorkerClient: Sync + Send { fn is_mock(&self) -> bool; /// Return name and version of the SDK fn sdk_name_and_version(&self) -> (String, String); + /// Get worker identity + fn get_identity(&self) -> String; } /// Configuration options shared by workflow, activity, and Nexus polling calls @@ -343,7 +366,7 @@ impl WorkerClient for WorkerClientBag { identity: self.identity.clone(), worker_version_capabilities: self.worker_version_capabilities(), deployment_options: self.deployment_options(), - worker_heartbeat: None, + worker_heartbeat: self.capture_heartbeat().into_iter().collect(), } .into_request(); request.extensions_mut().insert(IsWorkerTaskLongPoll); @@ -632,7 +655,7 @@ impl WorkerClient for WorkerClientBag { identity: self.identity.clone(), sticky_task_queue, reason: "graceful shutdown".to_string(), - worker_heartbeat: None, + worker_heartbeat: self.capture_heartbeat(), }; Ok( @@ -647,6 +670,21 @@ impl WorkerClient for WorkerClientBag { *replaceable_client = new_client; } + async fn record_worker_heartbeat( + &self, + heartbeat: WorkerHeartbeat, + ) -> Result { + Ok(self + .cloned_client() + .record_worker_heartbeat(RecordWorkerHeartbeatRequest { + namespace: self.namespace.clone(), + identity: self.identity.clone(), + worker_heartbeat: vec![heartbeat], + }) + .await? + .into_inner()) + } + fn capabilities(&self) -> Option { let client = self.replaceable_client.read(); client.get_client().inner().capabilities().cloned() @@ -666,6 +704,10 @@ impl WorkerClient for WorkerClientBag { let opts = lock.get_client().inner().options(); (opts.client_name.clone(), opts.client_version.clone()) } + + fn get_identity(&self) -> String { + self.identity.clone() + } } impl NamespacedClient for WorkerClientBag { diff --git a/core/src/worker/client/mocks.rs b/core/src/worker/client/mocks.rs index 36a1f73d1..dc09db75b 100644 --- a/core/src/worker/client/mocks.rs +++ b/core/src/worker/client/mocks.rs @@ -22,7 +22,7 @@ pub(crate) static DEFAULT_TEST_CAPABILITIES: &Capabilities = &Capabilities { #[cfg(test)] /// Create a mock client primed with basic necessary expectations -pub(crate) fn mock_workflow_client() -> MockWorkerClient { +pub(crate) fn mock_worker_client() -> MockWorkerClient { let mut r = MockWorkerClient::new(); r.expect_capabilities() .returning(|| Some(*DEFAULT_TEST_CAPABILITIES)); @@ -33,11 +33,13 @@ pub(crate) fn mock_workflow_client() -> MockWorkerClient { .returning(|_| Ok(ShutdownWorkerResponse {})); r.expect_sdk_name_and_version() .returning(|| ("test-core".to_string(), "0.0.0".to_string())); + r.expect_get_identity() + .returning(|| "test-identity".to_string()); r } /// Create a mock manual client primed with basic necessary expectations -pub(crate) fn mock_manual_workflow_client() -> MockManualWorkerClient { +pub(crate) fn mock_manual_worker_client() -> MockManualWorkerClient { let mut r = MockManualWorkerClient::new(); r.expect_capabilities() .returning(|| Some(*DEFAULT_TEST_CAPABILITIES)); @@ -46,6 +48,8 @@ pub(crate) fn mock_manual_workflow_client() -> MockManualWorkerClient { r.expect_is_mock().returning(|| true); r.expect_sdk_name_and_version() .returning(|| ("test-core".to_string(), "0.0.0".to_string())); + r.expect_get_identity() + .returning(|| "test-identity".to_string()); r } @@ -146,10 +150,13 @@ mockall::mock! { fn shutdown_worker<'a, 'b>(&self, sticky_task_queue: String) -> impl Future> + Send + 'b where 'a: 'b, Self: 'b; + fn record_worker_heartbeat<'a, 'b>(&self, heartbeat: WorkerHeartbeat) -> impl Future> + Send + 'b where 'a: 'b, Self: 'b; + fn replace_client(&self, new_client: RetryClient); fn capabilities(&self) -> Option; fn workers(&self) -> Arc; fn is_mock(&self) -> bool; fn sdk_name_and_version(&self) -> (String, String); + fn get_identity(&self) -> String; } } diff --git a/core/src/worker/heartbeat.rs b/core/src/worker/heartbeat.rs new file mode 100644 index 000000000..4ca01ff67 --- /dev/null +++ b/core/src/worker/heartbeat.rs @@ -0,0 +1,229 @@ +use crate::WorkerClient; +use crate::abstractions::dbg_panic; +use gethostname::gethostname; +use parking_lot::Mutex; +use prost_types::Duration as PbDuration; +use std::sync::{Arc, OnceLock}; +use std::time::{Duration, SystemTime}; +use temporal_sdk_core_api::worker::WorkerConfig; +use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo}; +use tokio::sync::Notify; +use tokio::task::JoinHandle; +use tokio::time::MissedTickBehavior; +use uuid::Uuid; + +pub(crate) type HeartbeatFn = Box Option + Send + Sync>; + +pub(crate) struct WorkerHeartbeatManager { + heartbeat_handle: JoinHandle<()>, +} + +impl WorkerHeartbeatManager { + pub(crate) fn new( + config: WorkerConfig, + identity: String, + heartbeat_fn: Arc>, + client: Arc, + ) -> Self { + let sdk_name_and_ver = client.sdk_name_and_version(); + let reset_notify = Arc::new(Notify::new()); + let data = Arc::new(Mutex::new(WorkerHeartbeatData::new( + config, + identity, + sdk_name_and_ver, + reset_notify.clone(), + ))); + let data_clone = data.clone(); + + let heartbeat_handle = tokio::spawn(async move { + let mut ticker = tokio::time::interval(data_clone.lock().heartbeat_interval); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + tokio::select! { + _ = ticker.tick() => { + let heartbeat = if let Some(heartbeat) = data_clone.lock().capture_heartbeat_if_needed() { + heartbeat + } else { + continue + }; + if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await { + if matches!( + e.code(), + tonic::Code::Unimplemented + ) { + return; + } + warn!(error=?e, "Network error while sending worker heartbeat"); + } + } + _ = reset_notify.notified() => { + ticker.reset(); + } + } + } + }); + + let data_clone = data.clone(); + if heartbeat_fn + .set(Box::new(move || { + data_clone.lock().capture_heartbeat_if_needed() + })) + .is_err() + { + dbg_panic!( + "Failed to set heartbeat_fn, heartbeat_fn should only be set once, when a singular WorkerHeartbeatInfo is created" + ); + } + + Self { heartbeat_handle } + } + + pub(crate) fn shutdown(&self) { + self.heartbeat_handle.abort() + } +} + +#[derive(Debug, Clone)] +struct WorkerHeartbeatData { + worker_instance_key: String, + worker_identity: String, + host_info: WorkerHostInfo, + // Time of the last heartbeat. This is used to both for heartbeat_time and last_heartbeat_time + heartbeat_time: Option, + task_queue: String, + /// SDK name + sdk_name: String, + /// SDK version + sdk_version: String, + /// Worker start time + start_time: SystemTime, + heartbeat_interval: Duration, + reset_notify: Arc, +} + +impl WorkerHeartbeatData { + fn new( + worker_config: WorkerConfig, + worker_identity: String, + sdk_name_and_ver: (String, String), + reset_notify: Arc, + ) -> Self { + Self { + worker_identity, + host_info: WorkerHostInfo { + host_name: gethostname().to_string_lossy().to_string(), + process_id: std::process::id().to_string(), + ..Default::default() + }, + sdk_name: sdk_name_and_ver.0, + sdk_version: sdk_name_and_ver.1, + task_queue: worker_config.task_queue.clone(), + start_time: SystemTime::now(), + heartbeat_time: None, + worker_instance_key: Uuid::new_v4().to_string(), + heartbeat_interval: worker_config.heartbeat_interval, + reset_notify, + } + } + + fn capture_heartbeat_if_needed(&mut self) -> Option { + let now = SystemTime::now(); + let elapsed_since_last_heartbeat = if let Some(heartbeat_time) = self.heartbeat_time { + let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); + + // Only send poll data if it's nearly been a full interval since this data has been sent + // In this case, "nearly" is 90% of the interval + if dur.as_secs_f64() < 0.9 * self.heartbeat_interval.as_secs_f64() { + return None; + } + Some(PbDuration { + seconds: dur.as_secs() as i64, + nanos: dur.subsec_nanos() as i32, + }) + } else { + None + }; + + self.heartbeat_time = Some(now); + + self.reset_notify.notify_one(); + + Some(WorkerHeartbeat { + worker_instance_key: self.worker_instance_key.clone(), + worker_identity: self.worker_identity.clone(), + host_info: Some(self.host_info.clone()), + task_queue: self.task_queue.clone(), + sdk_name: self.sdk_name.clone(), + sdk_version: self.sdk_version.clone(), + status: 0, + start_time: Some(self.start_time.into()), + heartbeat_time: Some(SystemTime::now().into()), + elapsed_since_last_heartbeat, + ..Default::default() + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_help::WorkerExt; + use crate::test_help::test_worker_cfg; + use crate::worker; + use crate::worker::client::mocks::mock_worker_client; + use std::sync::Arc; + use std::time::Duration; + use temporal_sdk_core_api::worker::PollerBehavior; + use temporal_sdk_core_protos::temporal::api::workflowservice::v1::RecordWorkerHeartbeatResponse; + + #[tokio::test] + async fn worker_heartbeat() { + let mut mock = mock_worker_client(); + mock.expect_record_worker_heartbeat() + .times(2) + .returning(move |heartbeat| { + let host_info = heartbeat.host_info.clone().unwrap(); + assert_eq!("test-identity", heartbeat.worker_identity); + assert!(!heartbeat.worker_instance_key.is_empty()); + assert_eq!( + host_info.host_name, + gethostname::gethostname().to_string_lossy().to_string() + ); + assert_eq!(host_info.process_id, std::process::id().to_string()); + assert_eq!(heartbeat.sdk_name, "test-core"); + assert_eq!(heartbeat.sdk_version, "0.0.0"); + assert_eq!(heartbeat.task_queue, "q"); + assert!(heartbeat.heartbeat_time.is_some()); + assert!(heartbeat.start_time.is_some()); + + Ok(RecordWorkerHeartbeatResponse {}) + }); + + let config = test_worker_cfg() + .activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize)) + .max_outstanding_activities(1_usize) + .heartbeat_interval(Duration::from_millis(200)) + .build() + .unwrap(); + + let heartbeat_fn = Arc::new(OnceLock::new()); + let client = Arc::new(mock); + let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_fn.clone())); + heartbeat_fn.get().unwrap()(); + + // heartbeat timer fires once + advance_time(Duration::from_millis(300)).await; + // it hasn't been >90% of the interval since the last heartbeat, so no data should be returned here + assert_eq!(None, heartbeat_fn.get().unwrap()()); + // heartbeat timer fires once + advance_time(Duration::from_millis(300)).await; + + worker.drain_activity_poller_and_shutdown().await; + } + + async fn advance_time(dur: Duration) { + tokio::time::pause(); + tokio::time::advance(dur).await; + tokio::time::resume(); + } +} diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 023b66f1c..f21d203a9 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -1,5 +1,6 @@ mod activities; pub(crate) mod client; +mod heartbeat; mod nexus; mod slot_provider; pub(crate) mod tuner; @@ -19,6 +20,7 @@ pub(crate) use activities::{ pub(crate) use wft_poller::WFTPollerShared; pub(crate) use workflow::LEGACY_QUERY_ID; +use crate::worker::heartbeat::{HeartbeatFn, WorkerHeartbeatManager}; use crate::{ ActivityHeartbeat, CompleteActivityError, PollError, WorkerTrait, abstractions::{MeteredPermitDealer, PermitDealerContextData, dbg_panic}, @@ -41,10 +43,15 @@ use crate::{ }, }, }; +use crate::{ + pollers::{ActivityTaskOptions, LongPollBuffer}, + worker::workflow::wft_poller, +}; use activities::WorkerActivityTasks; use futures_util::{StreamExt, stream}; use parking_lot::Mutex; use slot_provider::SlotProvider; +use std::sync::OnceLock; use std::{ convert::TryInto, future, @@ -77,11 +84,6 @@ use temporal_sdk_core_protos::{ use tokio::sync::{mpsc::unbounded_channel, watch}; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::sync::CancellationToken; - -use crate::{ - pollers::{ActivityTaskOptions, LongPollBuffer}, - worker::workflow::wft_poller, -}; #[cfg(test)] use { crate::{ @@ -119,6 +121,8 @@ pub struct Worker { local_activities_complete: Arc, /// Used to track all permits have been released all_permits_tracker: tokio::sync::Mutex, + /// Used to shutdown the worker heartbeat task + worker_heartbeat: Option, } struct AllPermitsTracker { @@ -271,6 +275,7 @@ impl Worker { sticky_queue_name: Option, client: Arc, telem_instance: Option<&TelemetryInstance>, + heartbeat_fn: Option>>, ) -> Self { info!(task_queue=%config.task_queue, namespace=%config.namespace, "Initializing worker"); @@ -280,6 +285,7 @@ impl Worker { client, TaskPollers::Real, telem_instance, + heartbeat_fn, ) } @@ -297,7 +303,9 @@ impl Worker { #[cfg(test)] pub(crate) fn new_test(config: WorkerConfig, client: impl WorkerClient + 'static) -> Self { - Self::new(config, None, Arc::new(client), None) + let client = Arc::new(client); + let heartbeat_fn = Arc::new(OnceLock::new()); + Self::new(config, None, client, None, Some(heartbeat_fn)) } pub(crate) fn new_with_pollers( @@ -306,6 +314,7 @@ impl Worker { client: Arc, task_pollers: TaskPollers, telem_instance: Option<&TelemetryInstance>, + heartbeat_fn: Option>>, ) -> Self { let (metrics, meter) = if let Some(ti) = telem_instance { ( @@ -325,7 +334,7 @@ impl Worker { let shutdown_token = CancellationToken::new(); let slot_context_data = Arc::new(PermitDealerContextData { task_queue: config.task_queue.clone(), - worker_identity: config.client_identity_override.clone().unwrap_or_default(), + worker_identity: client.get_identity(), worker_deployment_version: config.computed_deployment_version(), }); let wft_slots = MeteredPermitDealer::new( @@ -437,17 +446,17 @@ impl Worker { }; let (hb_tx, hb_rx) = unbounded_channel(); - let la_pemit_dealer = MeteredPermitDealer::new( + let la_permit_dealer = MeteredPermitDealer::new( tuner.local_activity_slot_supplier(), metrics.with_new_attrs([local_activity_worker_type()]), None, - slot_context_data, + slot_context_data.clone(), meter.clone(), ); - let la_permits = la_pemit_dealer.get_extant_count_rcv(); + let la_permits = la_permit_dealer.get_extant_count_rcv(); let local_act_mgr = Arc::new(LocalActivityManager::new( config.namespace.clone(), - la_pemit_dealer, + la_permit_dealer, hb_tx, metrics.clone(), )); @@ -484,6 +493,16 @@ impl Worker { ); let worker_key = Mutex::new(client.workers().register(Box::new(provider))); let sdk_name_and_ver = client.sdk_name_and_version(); + + let worker_heartbeat = heartbeat_fn.map(|heartbeat_fn| { + WorkerHeartbeatManager::new( + config.clone(), + client.get_identity(), + heartbeat_fn, + client.clone(), + ) + }); + Self { worker_key, client: client.clone(), @@ -540,6 +559,7 @@ impl Worker { la_permits, }), nexus_mgr, + worker_heartbeat, } } @@ -584,6 +604,9 @@ impl Worker { dbg_panic!("Waiting for all slot permits to release took too long!"); } } + if let Some(heartbeat) = self.worker_heartbeat.as_ref() { + heartbeat.shutdown(); + } } /// Finish shutting down by consuming the background pollers and freeing all resources @@ -883,7 +906,7 @@ mod tests { use crate::{ advance_fut, test_help::test_worker_cfg, - worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client}, + worker::client::mocks::{mock_manual_worker_client, mock_worker_client}, }; use futures_util::FutureExt; use temporal_sdk_core_api::worker::PollerBehavior; @@ -891,7 +914,7 @@ mod tests { #[tokio::test] async fn activity_timeouts_maintain_permit() { - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_poll_activity_task() .returning(|_, _| Ok(PollActivityTaskQueueResponse::default())); @@ -913,7 +936,7 @@ mod tests { async fn activity_errs_dont_eat_permits() { // Return one error followed by simulating waiting on the poll, otherwise the poller will // loop very fast and be in some indeterminate state. - let mut mock_client = mock_manual_workflow_client(); + let mut mock_client = mock_manual_worker_client(); mock_client .expect_poll_activity_task() .returning(|_, _| async { Err(tonic::Status::internal("ahhh")) }.boxed()) diff --git a/core/src/worker/workflow/history_update.rs b/core/src/worker/workflow/history_update.rs index da4ca158b..d243f7206 100644 --- a/core/src/worker/workflow/history_update.rs +++ b/core/src/worker/workflow/history_update.rs @@ -803,7 +803,7 @@ mod tests { use crate::{ replay::{HistoryInfo, TestHistoryBuilder}, test_help::{MockPollCfg, ResponseType, canned_histories, hist_to_poll_resp, mock_sdk_cfg}, - worker::client::mocks::mock_workflow_client, + worker::client::mocks::mock_worker_client, }; use futures_util::{StreamExt, TryStreamExt}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -965,7 +965,7 @@ mod tests { let wft_started = hinfo.workflow_task_started_event_id(); let full_hist = hinfo.into_events(); let initial_hist = full_hist.chunks(chunk_size).next().unwrap().to_vec(); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let mut npt = 1; mock_client @@ -1162,7 +1162,7 @@ mod tests { // Chop off the last event, which is WFT started, which server doesn't return in get // history history_from_get.history.as_mut().map(|h| h.events.pop()); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_get_workflow_execution_history() .returning(move |_, _, _| Ok(history_from_get.clone())); @@ -1220,7 +1220,7 @@ mod tests { let partial_task = timer_hist.get_one_wft(2).unwrap(); let prev_started_wft_id = partial_task.previous_started_event_id(); let wft_started_id = partial_task.workflow_task_started_event_id(); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_get_workflow_execution_history() .returning(move |_, _, _| Ok(Default::default())); @@ -1247,7 +1247,7 @@ mod tests { let wft_started_id = partial_task.workflow_task_started_event_id(); let full_resp: GetWorkflowExecutionHistoryResponse = timer_hist.get_full_history_info().unwrap().into(); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_get_workflow_execution_history() .returning(move |_, _, _| { @@ -1296,7 +1296,7 @@ mod tests { timer_hist.get_full_history_info().unwrap().into(); full_resp_with_npt.next_page_token = vec![1]; - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_get_workflow_execution_history() .returning(move |_, _, _| Ok(full_resp_with_npt.clone())) @@ -1375,7 +1375,7 @@ mod tests { resp_1.next_page_token = vec![1]; resp_1.history.as_mut().unwrap().events.truncate(4); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_get_workflow_execution_history() .returning(move |_, _, _| Ok(resp_1.clone())) @@ -1486,7 +1486,7 @@ mod tests { t.add_we_signaled("hi", vec![]); t.add_workflow_task_scheduled_and_started(); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let events: Vec = t.get_full_history_info().unwrap().into_events(); let first_event = events[0].clone(); @@ -1602,7 +1602,7 @@ mod tests { let events: Vec = t.get_full_history_info().unwrap().into_events(); let first_event = events[0].clone(); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); for (i, event) in events.into_iter().enumerate() { // Add an empty page @@ -1722,7 +1722,7 @@ mod tests { t.get_full_history_info().unwrap().into(); resp_1.next_page_token = vec![2]; - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_get_workflow_execution_history() .returning(move |_, _, _| Ok(resp_1.clone())) @@ -1765,7 +1765,7 @@ mod tests { let workflow_task = t.get_full_history_info().unwrap(); let prev_started_wft_id = workflow_task.previous_started_event_id(); let wft_started_id = workflow_task.workflow_task_started_event_id(); - let mock_client = mock_workflow_client(); + let mock_client = mock_worker_client(); let mut paginator = HistoryPaginator::new( workflow_task.into(), prev_started_wft_id, @@ -1802,7 +1802,7 @@ mod tests { let full_resp: GetWorkflowExecutionHistoryResponse = t.get_full_history_info().unwrap().into(); - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); mock_client .expect_get_workflow_execution_history() .returning(move |_, _, _| Ok(full_resp.clone())) @@ -1839,7 +1839,7 @@ mod tests { let incremental_task = hist_to_poll_resp(&t, "wfid".to_owned(), ResponseType::OneTask(3)).resp; - let mut mock_client = mock_workflow_client(); + let mut mock_client = mock_worker_client(); let mut one_task_resp: GetWorkflowExecutionHistoryResponse = t.get_history_info(1).unwrap().into(); one_task_resp.next_page_token = vec![1]; diff --git a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs index 5d3309e06..dafdbffbd 100644 --- a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs +++ b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs @@ -183,7 +183,7 @@ mod tests { replay::TestHistoryBuilder, test_help::{MockPollCfg, ResponseType, build_fake_sdk, build_mock_pollers, mock_worker}, worker::{ - client::mocks::mock_workflow_client, + client::mocks::mock_worker_client, workflow::machines::patch_state_machine::VERSION_SEARCH_ATTR_KEY, }, }; @@ -328,7 +328,7 @@ mod tests { "fakeid", t, [ResponseType::ToTaskNum(1), ResponseType::ToTaskNum(2)], - mock_workflow_client(), + mock_worker_client(), ); // Ensure the upsert command has an empty map when not using the patched command if !with_patched_cmd { diff --git a/sdk-core-protos/protos/api_upstream/buf.yaml b/sdk-core-protos/protos/api_upstream/buf.yaml index 9f94a9edc..e984c1439 100644 --- a/sdk-core-protos/protos/api_upstream/buf.yaml +++ b/sdk-core-protos/protos/api_upstream/buf.yaml @@ -13,8 +13,6 @@ breaking: - WIRE_JSON ignore: - google - # TODO (yuri) remove this - - temporal/api/workflow/v1/message.proto lint: use: - DEFAULT diff --git a/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json b/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json index 0a169fbc6..6da2b5b0f 100644 --- a/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json +++ b/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json @@ -7606,7 +7606,11 @@ "description": "The identity of the client who initiated this request." }, "workerHeartbeat": { - "$ref": "#/definitions/v1WorkerHeartbeat" + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/v1WorkerHeartbeat" + } } } }, @@ -15410,9 +15414,13 @@ "type": "string", "description": "Worker host identifier." }, + "processKey": { + "type": "string", + "title": "Worker process identifier. This id should be unique for all _processes_\nrunning workers in the namespace, and should be shared by all workers\nin the same process.\nThis will be used to build the worker command nexus task queue name:\n\"temporal-sys/worker-commands/{process_key}\"" + }, "processId": { "type": "string", - "description": "Worker process identifier, should be unique for the host." + "description": "Worker process identifier. Unlike process_key, this id only needs to be unique\nwithin one host (so using e.g. a unix pid would be appropriate)." }, "currentHostCpuUsage": { "type": "number", diff --git a/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml b/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml index 6bb75b4f0..5c3cccb7b 100644 --- a/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml +++ b/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml @@ -9723,7 +9723,9 @@ components: type: string description: The identity of the client who initiated this request. workerHeartbeat: - $ref: '#/components/schemas/WorkerHeartbeat' + type: array + items: + $ref: '#/components/schemas/WorkerHeartbeat' RecordWorkerHeartbeatResponse: type: object properties: {} @@ -12797,9 +12799,19 @@ components: hostName: type: string description: Worker host identifier. + processKey: + type: string + description: |- + Worker process identifier. This id should be unique for all _processes_ + running workers in the namespace, and should be shared by all workers + in the same process. + This will be used to build the worker command nexus task queue name: + "temporal-sys/worker-commands/{process_key}" processId: type: string - description: Worker process identifier, should be unique for the host. + description: |- + Worker process identifier. Unlike process_key, this id only needs to be unique + within one host (so using e.g. a unix pid would be appropriate). currentHostCpuUsage: type: number description: |- diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/worker/v1/message.proto b/sdk-core-protos/protos/api_upstream/temporal/api/worker/v1/message.proto index 024357ce5..f5ad9ebe0 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/worker/v1/message.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/worker/v1/message.proto @@ -54,7 +54,16 @@ message WorkerHostInfo { // Worker host identifier. string host_name = 1; - // Worker process identifier, should be unique for the host. + + // Worker process identifier. This id should be unique for all _processes_ + // running workers in the namespace, and should be shared by all workers + // in the same process. + // This will be used to build the worker command nexus task queue name: + // "temporal-sys/worker-commands/{process_key}" + string process_key = 5; + + // Worker process identifier. Unlike process_key, this id only needs to be unique + // within one host (so using e.g. a unix pid would be appropriate). string process_id = 2; // System used CPU as a float in the range [0.0, 1.0] where 1.0 is defined as all @@ -78,7 +87,6 @@ message WorkerHeartbeat { // Usually host_name+(user group name)+process_id, but can be overwritten by the user. string worker_identity = 2; - // Worker host information. WorkerHostInfo host_info = 3; diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto index 718ddc666..180dbba75 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto @@ -1772,7 +1772,7 @@ message PollNexusTaskQueueRequest { temporal.api.deployment.v1.WorkerDeploymentOptions deployment_options = 6; // Worker info to be sent to the server. - temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat = 7; + repeated temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat = 7; } message PollNexusTaskQueueResponse { @@ -2384,7 +2384,7 @@ message RecordWorkerHeartbeatRequest { // The identity of the client who initiated this request. string identity = 2; - temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat = 3; + repeated temporal.api.worker.v1.WorkerHeartbeat worker_heartbeat = 3; } message RecordWorkerHeartbeatResponse {