Skip to content

Worker heartbeat #953

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ use crate::{
};
use anyhow::bail;
use futures_util::Stream;
use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use temporal_client::{ConfiguredClient, NamespacedClient, TemporalServiceClientWithMetrics};
use temporal_sdk_core_api::{
Worker as WorkerTrait,
Expand Down Expand Up @@ -99,29 +98,27 @@ where
}
let client_ident = client.get_identity().to_owned();
let sticky_q = sticky_q_name_for_worker(&client_ident, &worker_config);
let data = Arc::new(Mutex::new(worker::WorkerHeartbeatData::new(
worker_config.clone(),
client_ident.clone(),
)));

if client_ident == "" {
bail!("Client identity cannot be empty. Either lang or user should be setting this value");
}

let heartbeat_fn = Arc::new(OnceLock::new());

let client_bag = Arc::new(WorkerClientBag::new(
client,
worker_config.namespace.clone(),
client_ident,
worker_config.versioning_strategy.clone(),
data.clone(),
heartbeat_fn.clone(),
));

Ok(Worker::new(
worker_config,
sticky_q,
client_bag,
Some(&runtime.telemetry),
Some(data),
Some(heartbeat_fn),
))
}

Expand Down
16 changes: 11 additions & 5 deletions core/src/worker/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Worker-specific client needs

pub(crate) mod mocks;
use crate::abstractions::dbg_panic;
use crate::protosext::legacy_query_failure;
use crate::worker::heartbeat::WorkerHeartbeatData;
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use std::sync::OnceLock;
use std::{sync::Arc, time::Duration};
use temporal_client::{
Client, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching, RetryClient,
Expand Down Expand Up @@ -48,7 +49,7 @@ pub(crate) struct WorkerClientBag {
namespace: String,
identity: String,
worker_versioning_strategy: WorkerVersioningStrategy,
heartbeat_data: Arc<Mutex<WorkerHeartbeatData>>,
heartbeat_data: Arc<OnceLock<Box<dyn Fn() -> Option<WorkerHeartbeat> + Send + Sync>>>,
}

impl WorkerClientBag {
Expand All @@ -57,7 +58,7 @@ impl WorkerClientBag {
namespace: String,
identity: String,
worker_versioning_strategy: WorkerVersioningStrategy,
heartbeat_data: Arc<Mutex<WorkerHeartbeatData>>,
heartbeat_data: Arc<OnceLock<Box<dyn Fn() -> Option<WorkerHeartbeat> + Send + Sync>>>,
) -> Self {
Self {
replaceable_client: RwLock::new(client),
Expand Down Expand Up @@ -127,7 +128,12 @@ impl WorkerClientBag {
}

fn capture_heartbeat(&self) -> Option<WorkerHeartbeat> {
self.heartbeat_data.lock().capture_heartbeat_if_needed()
if let Some(hb) = self.heartbeat_data.get() {
hb()
} else {
dbg_panic!("Heartbeat function never set");
None
}
}
}

Expand Down
133 changes: 99 additions & 34 deletions core/src/worker/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,128 @@
use crate::WorkerClient;
use crate::abstractions::dbg_panic;
use gethostname::gethostname;
use parking_lot::Mutex;
use prost_types::Duration as PbDuration;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, SystemTime};
use temporal_sdk_core_api::worker::WorkerConfig;
use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo};
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use uuid::Uuid;

pub(crate) struct WorkerHeartbeatManager {
heartbeat_handle: JoinHandle<()>,
}

impl WorkerHeartbeatManager {
pub(crate) fn new(
config: WorkerConfig,
identity: String,
heartbeat_fn: Arc<OnceLock<Box<dyn Fn() -> Option<WorkerHeartbeat> + Send + Sync>>>,
client: Arc<dyn WorkerClient>,
) -> Self {
let sdk_name_and_ver = client.sdk_name_and_version();
let (reset_tx, reset_rx) = watch::channel(());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm realizing this should probably just be a https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html

let data = Arc::new(Mutex::new(WorkerHeartbeatData::new(
config,
identity,
sdk_name_and_ver,
reset_tx,
)));
let data_clone = data.clone();

let heartbeat_handle = tokio::spawn(async move {
let mut reset_rx = reset_rx;
let mut ticker = tokio::time::interval(data_clone.lock().heartbeat_interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = ticker.tick() => {
let heartbeat = if let Some(heartbeat) = data_clone.lock().capture_heartbeat_if_needed() {
heartbeat
} else {
continue
};
if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await {
warn!(error=?e, "Network error while sending worker heartbeat");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think probably move this warning after the return-if-unimplemented

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh I was originally thinking we'd want a warning either way, but I guess it makes sense not to bother informing users that don't even have this feature that it's being skipped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess an argument for logging it would be to try to entice folks to upgrade their server versions for this new feature?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, if they read about it and decide they want it, that's how I'd expect they'd learn anyway.

if matches!(
e.code(),
tonic::Code::Unimplemented
) {
return;
}
}
}
_ = reset_rx.changed() => {
ticker.reset();
}
}
}
});

let data_clone = data.clone();
if let Err(_) = heartbeat_fn.set(Box::new(move || {
data_clone.lock().capture_heartbeat_if_needed()
})) {
dbg_panic!(
"Failed to set heartbeat_fn, heartbeat_fn should only be set once, when a singular WorkerHeartbeatInfo is created"
);
}

Self { heartbeat_handle }
}

pub(crate) fn shutdown(&self) {
self.heartbeat_handle.abort()
}
}

#[derive(Debug, Clone)]
pub struct WorkerHeartbeatData {
struct WorkerHeartbeatData {
worker_instance_key: String,
pub(crate) worker_identity: String,
worker_identity: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, de-pub :feelsgoodman:

host_info: WorkerHostInfo,
// Time of the last heartbeat. This is used to both for heartbeat_time and last_heartbeat_time
pub(crate) heartbeat_time: Option<SystemTime>,
pub(crate) task_queue: String,
heartbeat_time: Option<SystemTime>,
task_queue: String,
/// SDK name
pub(crate) sdk_name: String,
sdk_name: String,
/// SDK version
pub(crate) sdk_version: String,
sdk_version: String,
/// Worker start time
pub(crate) start_time: SystemTime,
pub(crate) heartbeat_interval: Duration,
pub(crate) reset_tx: Option<watch::Sender<()>>,
start_time: SystemTime,
heartbeat_interval: Duration,
reset_tx: watch::Sender<()>,
}

impl WorkerHeartbeatData {
pub fn new(worker_config: WorkerConfig, worker_identity: String) -> Self {
fn new(
worker_config: WorkerConfig,
worker_identity: String,
sdk_name_and_ver: (String, String),
reset_tx: watch::Sender<()>,
) -> Self {
Self {
worker_identity,
host_info: WorkerHostInfo {
host_name: gethostname().to_string_lossy().to_string(),
process_id: std::process::id().to_string(),
..Default::default()
},
sdk_name: String::new(),
sdk_version: String::new(),
sdk_name: sdk_name_and_ver.0,
sdk_version: sdk_name_and_ver.1,
task_queue: worker_config.task_queue.clone(),
start_time: SystemTime::now(),
heartbeat_time: None,
worker_instance_key: Uuid::new_v4().to_string(),
heartbeat_interval: worker_config.heartbeat_interval,
reset_tx: None,
reset_tx,
}
}

pub fn capture_heartbeat_if_needed(&mut self) -> Option<WorkerHeartbeat> {
fn capture_heartbeat_if_needed(&mut self) -> Option<WorkerHeartbeat> {
let now = SystemTime::now();
let elapsed_since_last_heartbeat = if let Some(heartbeat_time) = self.heartbeat_time {
let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO);
Expand All @@ -64,13 +142,8 @@ impl WorkerHeartbeatData {
};

self.heartbeat_time = Some(now);
if let Some(reset_tx) = &self.reset_tx {
let _ = reset_tx.send(());
} else {
warn!(
"No reset_tx attached to heartbeat_info, worker heartbeat was not properly setup"
);
}

let _ = self.reset_tx.send(());

Some(WorkerHeartbeat {
worker_instance_key: self.worker_instance_key.clone(),
Expand All @@ -86,10 +159,6 @@ impl WorkerHeartbeatData {
..Default::default()
})
}

pub(crate) fn set_reset_tx(&mut self, reset_tx: watch::Sender<()>) {
self.reset_tx = Some(reset_tx);
}
}

#[cfg(test)]
Expand All @@ -99,7 +168,6 @@ mod tests {
use crate::test_help::test_worker_cfg;
use crate::worker;
use crate::worker::client::mocks::mock_worker_client;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use temporal_sdk_core_api::worker::PollerBehavior;
Expand All @@ -112,7 +180,7 @@ mod tests {
.times(2)
.returning(move |heartbeat| {
let host_info = heartbeat.host_info.clone().unwrap();
assert_eq!("test_identity", heartbeat.worker_identity);
assert_eq!("test-identity", heartbeat.worker_identity);
assert!(!heartbeat.worker_instance_key.is_empty());
assert_eq!(
host_info.host_name,
Expand All @@ -135,18 +203,15 @@ mod tests {
.build()
.unwrap();

let heartbeat_data = Arc::new(Mutex::new(WorkerHeartbeatData::new(
config.clone(),
"test_identity".to_string(),
)));
let heartbeat_fn = Arc::new(OnceLock::new());
let client = Arc::new(mock);
let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_data.clone()));
let _ = heartbeat_data.lock().capture_heartbeat_if_needed();
let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_fn.clone()));
heartbeat_fn.get().unwrap()();

// heartbeat timer fires once
tokio::time::sleep(Duration::from_millis(300)).await;
// it hasn't been >90% of the interval since the last heartbeat, so no data should be returned here
assert_eq!(None, heartbeat_data.lock().capture_heartbeat_if_needed());
assert_eq!(None, heartbeat_fn.get().unwrap()());
// heartbeat timer fires once
tokio::time::sleep(Duration::from_millis(150)).await;

Expand Down
Loading
Loading