Skip to content

Worker heartbeat #953

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ pub struct WorkerConfig {
/// A versioning strategy for this worker.
pub versioning_strategy: WorkerVersioningStrategy,

/// The interval in which the worker will send a heartbeat.
/// The interval within which the worker will send a heartbeat.
/// The timer is reset on each existing RPC call that also happens to send this data, like
/// `PollWorkflowTaskQueueRequest`.
#[builder(default)]
pub heartbeat_interval: Option<Duration>,
#[builder(default = "Duration::from_secs(60)")]
pub heartbeat_interval: Duration,
}

impl WorkerConfig {
Expand Down
8 changes: 3 additions & 5 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
}
let client_ident = client.get_identity().to_owned();
let sticky_q = sticky_q_name_for_worker(&client_ident, &worker_config);
let heartbeat_info = Arc::new(Mutex::new(worker::WorkerHeartbeatInfo::new(
let data = Arc::new(Mutex::new(worker::WorkerHeartbeatData::new(
worker_config.clone(),
)));

Expand All @@ -108,17 +108,15 @@ where
worker_config.namespace.clone(),
client_ident,
worker_config.versioning_strategy.clone(),
heartbeat_info.clone(),
data.clone(),
));
// TODO: Adding this afterwards feels a little clunky
heartbeat_info.lock().add_client(client_bag.clone());

Ok(Worker::new(
worker_config,
sticky_q,
client_bag,
Some(&runtime.telemetry),
Some(heartbeat_info),
Some(data),
))
}

Expand Down
14 changes: 7 additions & 7 deletions core/src/worker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

pub(crate) mod mocks;
use crate::protosext::legacy_query_failure;
use crate::worker::heartbeat::WorkerHeartbeatInfo;
use crate::worker::heartbeat::WorkerHeartbeatData;
use parking_lot::{Mutex, RwLock};
use std::{sync::Arc, time::Duration};
use temporal_client::{
Expand Down Expand Up @@ -48,7 +48,7 @@ pub(crate) struct WorkerClientBag {
namespace: String,
identity: String,
worker_versioning_strategy: WorkerVersioningStrategy,
heartbeat_info: Arc<Mutex<WorkerHeartbeatInfo>>,
heartbeat_data: Arc<Mutex<WorkerHeartbeatData>>,
}

impl WorkerClientBag {
Expand All @@ -57,14 +57,14 @@ impl WorkerClientBag {
namespace: String,
identity: String,
worker_versioning_strategy: WorkerVersioningStrategy,
heartbeat_info: Arc<Mutex<WorkerHeartbeatInfo>>,
heartbeat_data: Arc<Mutex<WorkerHeartbeatData>>,
) -> Self {
Self {
replaceable_client: RwLock::new(client),
namespace,
identity,
worker_versioning_strategy,
heartbeat_info,
heartbeat_data,
}
}

Expand Down Expand Up @@ -127,7 +127,7 @@ impl WorkerClientBag {
}

fn capture_heartbeat(&self) -> Option<WorkerHeartbeat> {
Some(self.heartbeat_info.lock().capture_heartbeat())
self.heartbeat_data.lock().capture_heartbeat_if_needed()
}
}

Expand Down Expand Up @@ -287,7 +287,7 @@ impl WorkerClient for WorkerClientBag {
binary_checksum: self.binary_checksum(),
worker_version_capabilities: self.worker_version_capabilities(),
deployment_options: self.deployment_options(),
worker_heartbeat: self.capture_heartbeat(),
worker_heartbeat: None,
}
.into_request();
request.extensions_mut().insert(IsWorkerTaskLongPoll);
Expand Down Expand Up @@ -324,7 +324,7 @@ impl WorkerClient for WorkerClientBag {
}),
worker_version_capabilities: self.worker_version_capabilities(),
deployment_options: self.deployment_options(),
worker_heartbeat: self.capture_heartbeat(),
worker_heartbeat: None,
}
.into_request();
request.extensions_mut().insert(IsWorkerTaskLongPoll);
Expand Down
183 changes: 42 additions & 141 deletions core/src/worker/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,97 +1,13 @@
use crate::WorkerClient;
use futures_util::future;
use futures_util::future::AbortHandle;
use gethostname::gethostname;
use parking_lot::Mutex;
use prost_types::Duration as PbDuration;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use temporal_sdk_core_api::worker::WorkerConfig;
use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo};
use tokio::sync::watch;
use uuid::Uuid;

/// Heartbeat information
///
/// Note: Experimental
pub struct WorkerHeartbeatInfo {
pub(crate) data: Arc<Mutex<WorkerHeartbeatData>>,
timer_abort: AbortHandle,
client: Option<Arc<dyn WorkerClient>>,
interval: Option<Duration>,
#[cfg(test)]
heartbeats_sent: Arc<Mutex<usize>>,
}

impl WorkerHeartbeatInfo {
/// Create a new WorkerHeartbeatInfo. A timer is immediately started to track the worker
/// heartbeat interval.
pub(crate) fn new(worker_config: WorkerConfig) -> Self {
// unused abort handle, will be replaced with a new one when we start a new timer
let (abort_handle, _) = AbortHandle::new_pair();

Self {
data: Arc::new(Mutex::new(WorkerHeartbeatData::new(worker_config.clone()))),
timer_abort: abort_handle,
client: None,
interval: worker_config.heartbeat_interval,
#[cfg(test)]
heartbeats_sent: Arc::new(Mutex::new(0)),
}
}

/// Transform heartbeat data into `WorkerHeartbeat` we can send in gRPC request. Some
/// metrics are also cached for future calls of this function.
pub(crate) fn capture_heartbeat(&mut self) -> WorkerHeartbeat {
self.create_new_timer();

self.data.lock().capture_heartbeat()
}

fn create_new_timer(&mut self) {
self.timer_abort.abort();

let (abort_handle, abort_reg) = AbortHandle::new_pair();
let interval = if let Some(dur) = self.interval {
dur
} else {
Duration::from_secs(60)
};
let data = self.data.clone();
#[cfg(test)]
let heartbeats_sent = self.heartbeats_sent.clone();
self.timer_abort = abort_handle.clone();
if let Some(client) = self.client.clone() {
tokio::spawn(future::Abortable::new(
async move {
loop {
tokio::time::sleep(interval).await;
#[cfg(test)]
{
let mut num = heartbeats_sent.lock();
*num += 1;
}

let heartbeat = data.lock().capture_heartbeat();
if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await {
warn!(error=?e, "Network error while sending worker heartbeat");
}
}
},
abort_reg,
));
} else {
warn!("No client attached to heartbeat_info")
};
}

pub(crate) fn add_client(&mut self, client: Arc<dyn WorkerClient>) {
self.client = Some(client);
self.create_new_timer();
}
}

#[derive(Debug, Clone)]
pub(crate) struct WorkerHeartbeatData {
pub struct WorkerHeartbeatData {
worker_instance_key: String,
pub(crate) worker_identity: String,
host_info: WorkerHostInfo,
Expand All @@ -104,10 +20,12 @@ pub(crate) struct WorkerHeartbeatData {
pub(crate) sdk_version: String,
/// Worker start time
pub(crate) start_time: SystemTime,
pub(crate) heartbeat_interval: Duration,
pub(crate) reset_tx: Option<watch::Sender<()>>,
}

impl WorkerHeartbeatData {
fn new(worker_config: WorkerConfig) -> Self {
pub fn new(worker_config: WorkerConfig) -> Self {
Self {
// TODO: Is this right for worker_identity?
worker_identity: worker_config
Expand All @@ -125,13 +43,22 @@ impl WorkerHeartbeatData {
start_time: SystemTime::now(),
heartbeat_time: None,
worker_instance_key: Uuid::new_v4().to_string(),
heartbeat_interval: worker_config.heartbeat_interval,
reset_tx: None,
}
}

fn capture_heartbeat(&mut self) -> WorkerHeartbeat {
pub fn capture_heartbeat_if_needed(&mut self) -> Option<WorkerHeartbeat> {
let now = SystemTime::now();
let elapsed_since_last_heartbeat = if let Some(heartbeat_time) = self.heartbeat_time {
let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); // TODO: do we want to fall back to ZERO?
let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO);

// Only send poll data if it's nearly been a full interval since this data has been sent
// In this case, "nearly" is 90% of the interval
if dur.as_secs_f64() < 0.9 * self.heartbeat_interval.as_secs_f64() {
println!("Heartbeat interval not yet elapsed, not sending poll data");
return None;
}
Some(PbDuration {
seconds: dur.as_secs() as i64,
nanos: dur.subsec_nanos() as i32,
Expand All @@ -141,8 +68,15 @@ impl WorkerHeartbeatData {
};

self.heartbeat_time = Some(now);
if let Some(reset_tx) = &self.reset_tx {
let _ = reset_tx.send(());
} else {
Copy link
Member

@Sushisource Sushisource Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This state should be unrepresentable. A fun type puzzle!

I think the way to fix this is to separate out data from some kind of handle for calling this function. The handle is created early on (including the reset channel inside it) and the data is created later, down inside Worker::new. Initially the handle has no capability but once it's passed into the constructor for data it becomes useful.

This sort of "uninitialized" state still sort of exists between those two calls, but, at least it exists in a way that works fairly naturally with the return type of Option<WorkerHeartbeat>, or in a way that can block until everything is initialized.

One option is something like having the handle contain a req_resp: Sender<Receiver<Option<WorkerHeartbeat>>> which can be used inside the timing task to decide if the caller should get a heartbeat or not (also obviates then need for the dedicated reset channel). Dunno if that's overkill or not lol.

The more obvious option is to just have a OnceLock<Box<dyn Fn() -> Option<WorkerHeartbeat>>> or similar, which can be waited on to do semantically the same thing as the channel-of-channel. It's probably a bit cleaner than nested channels.

I dunno, fun one, I thought about this for a while and had to stop myself from just trying to do it. Feel free to play around. I maybe spend too much time trying to eliminate states like these. If you think it's not worth it, which is legit, change this to a dbg_panic!

warn!(
"No reset_tx attached to heartbeat_info, worker heartbeat was not properly setup"
);
}

WorkerHeartbeat {
Some(WorkerHeartbeat {
worker_instance_key: self.worker_instance_key.clone(),
worker_identity: self.worker_identity.clone(),
host_info: Some(self.host_info.clone()),
Expand All @@ -154,38 +88,33 @@ impl WorkerHeartbeatData {
heartbeat_time: Some(SystemTime::now().into()),
elapsed_since_last_heartbeat,
..Default::default()
}
})
}

pub(crate) fn set_reset_tx(&mut self, reset_tx: watch::Sender<()>) {
self.reset_tx = Some(reset_tx);
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_help::WorkerExt;
use crate::test_help::test_worker_cfg;
use crate::worker;
use crate::worker::WorkerHeartbeatInfo;
use crate::worker::client::mocks::mock_worker_client;
use parking_lot::Mutex;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use temporal_sdk_core_api::Worker;
use temporal_sdk_core_api::worker::PollerBehavior;
use temporal_sdk_core_protos::coresdk::ActivityTaskCompletion;
use temporal_sdk_core_protos::coresdk::activity_result::ActivityExecutionResult;
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::{
PollActivityTaskQueueResponse, RecordWorkerHeartbeatResponse,
RespondActivityTaskCompletedResponse,
};
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::RecordWorkerHeartbeatResponse;

#[rstest::rstest]
#[tokio::test]
async fn worker_heartbeat(#[values(true, false)] extra_heartbeat: bool) {
async fn worker_heartbeat() {
let mut mock = mock_worker_client();
let record_heartbeat_calls = if extra_heartbeat { 2 } else { 3 };
mock.expect_record_worker_heartbeat()
.times(record_heartbeat_calls)
.returning(|heartbeat| {
.times(2)
.returning(move |heartbeat| {
let host_info = heartbeat.host_info.clone().unwrap();
assert!(heartbeat.worker_identity.is_empty());
assert!(!heartbeat.worker_instance_key.is_empty());
Expand All @@ -202,54 +131,26 @@ mod tests {

Ok(RecordWorkerHeartbeatResponse {})
});
mock.expect_poll_activity_task()
.times(1)
.returning(move |_, _| {
Ok(PollActivityTaskQueueResponse {
task_token: vec![1],
activity_id: "act1".to_string(),
..Default::default()
})
});
mock.expect_complete_activity_task()
.returning(|_, _| Ok(RespondActivityTaskCompletedResponse::default()));

let config = test_worker_cfg()
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize))
.max_outstanding_activities(1_usize)
.heartbeat_interval(Duration::from_millis(100))
.heartbeat_interval(Duration::from_millis(200))
.build()
.unwrap();

let heartbeat_info = Arc::new(Mutex::new(WorkerHeartbeatInfo::new(config.clone())));
let heartbeat_data = Arc::new(Mutex::new(WorkerHeartbeatData::new(config.clone())));
let client = Arc::new(mock);
heartbeat_info.lock().add_client(client.clone());
let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_info.clone()));
let _ = heartbeat_info.lock().capture_heartbeat();
let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_data.clone()));
let _ = heartbeat_data.lock().capture_heartbeat_if_needed();

// heartbeat timer fires once
tokio::time::sleep(Duration::from_millis(150)).await;
if extra_heartbeat {
// reset heartbeat timer
heartbeat_info.lock().capture_heartbeat();
}
tokio::time::sleep(Duration::from_millis(300)).await;
// it hasn't been >90% of the interval since the last heartbeat, so no data should be returned here
assert_eq!(None, heartbeat_data.lock().capture_heartbeat_if_needed());
// heartbeat timer fires once
tokio::time::sleep(Duration::from_millis(180)).await;

if extra_heartbeat {
assert_eq!(2, *heartbeat_info.lock().heartbeats_sent.lock().deref());
} else {
assert_eq!(3, *heartbeat_info.lock().heartbeats_sent.lock().deref());
}
tokio::time::sleep(Duration::from_millis(150)).await;

let task = worker.poll_activity_task().await.unwrap();
worker
.complete_activity_task(ActivityTaskCompletion {
task_token: task.task_token,
result: Some(ActivityExecutionResult::ok(vec![1].into())),
})
.await
.unwrap();
worker.drain_activity_poller_and_shutdown().await;
}
}
Loading
Loading