-
Notifications
You must be signed in to change notification settings - Fork 90
Worker heartbeat #953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker heartbeat #953
Changes from 1 commit
dc53656
7b83f8e
a1d60a6
aef076e
261201c
f8ea55d
ebc6910
207feaa
100ebe2
35d996f
4b902b6
ba88360
51d0de0
ac35033
0190cb7
3c694e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,50 +1,128 @@ | ||
use crate::WorkerClient; | ||
use crate::abstractions::dbg_panic; | ||
use gethostname::gethostname; | ||
use parking_lot::Mutex; | ||
use prost_types::Duration as PbDuration; | ||
use std::sync::{Arc, OnceLock}; | ||
use std::time::{Duration, SystemTime}; | ||
use temporal_sdk_core_api::worker::WorkerConfig; | ||
use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo}; | ||
use tokio::sync::watch; | ||
use tokio::task::JoinHandle; | ||
use tokio::time::MissedTickBehavior; | ||
use uuid::Uuid; | ||
|
||
pub(crate) struct WorkerHeartbeatManager { | ||
heartbeat_handle: JoinHandle<()>, | ||
} | ||
|
||
impl WorkerHeartbeatManager { | ||
pub(crate) fn new( | ||
config: WorkerConfig, | ||
identity: String, | ||
heartbeat_fn: Arc<OnceLock<Box<dyn Fn() -> Option<WorkerHeartbeat> + Send + Sync>>>, | ||
client: Arc<dyn WorkerClient>, | ||
) -> Self { | ||
let sdk_name_and_ver = client.sdk_name_and_version(); | ||
let (reset_tx, reset_rx) = watch::channel(()); | ||
let data = Arc::new(Mutex::new(WorkerHeartbeatData::new( | ||
config, | ||
identity, | ||
sdk_name_and_ver, | ||
reset_tx, | ||
))); | ||
let data_clone = data.clone(); | ||
|
||
let heartbeat_handle = tokio::spawn(async move { | ||
let mut reset_rx = reset_rx; | ||
let mut ticker = tokio::time::interval(data_clone.lock().heartbeat_interval); | ||
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); | ||
loop { | ||
tokio::select! { | ||
_ = ticker.tick() => { | ||
let heartbeat = if let Some(heartbeat) = data_clone.lock().capture_heartbeat_if_needed() { | ||
heartbeat | ||
} else { | ||
continue | ||
}; | ||
if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await { | ||
warn!(error=?e, "Network error while sending worker heartbeat"); | ||
yuandrew marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think probably move this warning after the return-if-unimplemented There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahh I was originally thinking we'd want a warning either way, but I guess it makes sense not to bother informing users that don't even have this feature that it's being skipped There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i guess an argument for logging it would be to try to entice folks to upgrade their server versions for this new feature? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eh, if they read about it and decide they want it, that's how I'd expect they'd learn anyway. |
||
if matches!( | ||
e.code(), | ||
tonic::Code::Unimplemented | ||
) { | ||
return; | ||
} | ||
} | ||
} | ||
_ = reset_rx.changed() => { | ||
ticker.reset(); | ||
} | ||
} | ||
} | ||
}); | ||
|
||
let data_clone = data.clone(); | ||
if let Err(_) = heartbeat_fn.set(Box::new(move || { | ||
data_clone.lock().capture_heartbeat_if_needed() | ||
})) { | ||
dbg_panic!( | ||
"Failed to set heartbeat_fn, heartbeat_fn should only be set once, when a singular WorkerHeartbeatInfo is created" | ||
); | ||
} | ||
|
||
Self { heartbeat_handle } | ||
} | ||
|
||
pub(crate) fn shutdown(&self) { | ||
self.heartbeat_handle.abort() | ||
} | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct WorkerHeartbeatData { | ||
struct WorkerHeartbeatData { | ||
worker_instance_key: String, | ||
pub(crate) worker_identity: String, | ||
worker_identity: String, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh, de-pub :feelsgoodman: |
||
host_info: WorkerHostInfo, | ||
// Time of the last heartbeat. This is used to both for heartbeat_time and last_heartbeat_time | ||
pub(crate) heartbeat_time: Option<SystemTime>, | ||
pub(crate) task_queue: String, | ||
heartbeat_time: Option<SystemTime>, | ||
task_queue: String, | ||
/// SDK name | ||
pub(crate) sdk_name: String, | ||
sdk_name: String, | ||
/// SDK version | ||
pub(crate) sdk_version: String, | ||
sdk_version: String, | ||
/// Worker start time | ||
pub(crate) start_time: SystemTime, | ||
pub(crate) heartbeat_interval: Duration, | ||
pub(crate) reset_tx: Option<watch::Sender<()>>, | ||
start_time: SystemTime, | ||
heartbeat_interval: Duration, | ||
reset_tx: watch::Sender<()>, | ||
} | ||
|
||
impl WorkerHeartbeatData { | ||
pub fn new(worker_config: WorkerConfig, worker_identity: String) -> Self { | ||
fn new( | ||
worker_config: WorkerConfig, | ||
worker_identity: String, | ||
sdk_name_and_ver: (String, String), | ||
reset_tx: watch::Sender<()>, | ||
) -> Self { | ||
Self { | ||
worker_identity, | ||
host_info: WorkerHostInfo { | ||
host_name: gethostname().to_string_lossy().to_string(), | ||
process_id: std::process::id().to_string(), | ||
..Default::default() | ||
}, | ||
sdk_name: String::new(), | ||
sdk_version: String::new(), | ||
sdk_name: sdk_name_and_ver.0, | ||
sdk_version: sdk_name_and_ver.1, | ||
task_queue: worker_config.task_queue.clone(), | ||
start_time: SystemTime::now(), | ||
heartbeat_time: None, | ||
worker_instance_key: Uuid::new_v4().to_string(), | ||
heartbeat_interval: worker_config.heartbeat_interval, | ||
reset_tx: None, | ||
reset_tx, | ||
} | ||
} | ||
|
||
pub fn capture_heartbeat_if_needed(&mut self) -> Option<WorkerHeartbeat> { | ||
fn capture_heartbeat_if_needed(&mut self) -> Option<WorkerHeartbeat> { | ||
let now = SystemTime::now(); | ||
let elapsed_since_last_heartbeat = if let Some(heartbeat_time) = self.heartbeat_time { | ||
let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); | ||
|
@@ -64,13 +142,8 @@ impl WorkerHeartbeatData { | |
}; | ||
|
||
self.heartbeat_time = Some(now); | ||
if let Some(reset_tx) = &self.reset_tx { | ||
let _ = reset_tx.send(()); | ||
} else { | ||
warn!( | ||
"No reset_tx attached to heartbeat_info, worker heartbeat was not properly setup" | ||
); | ||
} | ||
|
||
let _ = self.reset_tx.send(()); | ||
|
||
Some(WorkerHeartbeat { | ||
worker_instance_key: self.worker_instance_key.clone(), | ||
|
@@ -86,10 +159,6 @@ impl WorkerHeartbeatData { | |
..Default::default() | ||
}) | ||
} | ||
|
||
pub(crate) fn set_reset_tx(&mut self, reset_tx: watch::Sender<()>) { | ||
self.reset_tx = Some(reset_tx); | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
|
@@ -99,7 +168,6 @@ mod tests { | |
use crate::test_help::test_worker_cfg; | ||
use crate::worker; | ||
use crate::worker::client::mocks::mock_worker_client; | ||
use parking_lot::Mutex; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
use temporal_sdk_core_api::worker::PollerBehavior; | ||
|
@@ -112,7 +180,7 @@ mod tests { | |
.times(2) | ||
.returning(move |heartbeat| { | ||
let host_info = heartbeat.host_info.clone().unwrap(); | ||
assert_eq!("test_identity", heartbeat.worker_identity); | ||
assert_eq!("test-identity", heartbeat.worker_identity); | ||
assert!(!heartbeat.worker_instance_key.is_empty()); | ||
assert_eq!( | ||
host_info.host_name, | ||
|
@@ -135,18 +203,15 @@ mod tests { | |
.build() | ||
.unwrap(); | ||
|
||
let heartbeat_data = Arc::new(Mutex::new(WorkerHeartbeatData::new( | ||
config.clone(), | ||
"test_identity".to_string(), | ||
))); | ||
let heartbeat_fn = Arc::new(OnceLock::new()); | ||
let client = Arc::new(mock); | ||
let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_data.clone())); | ||
let _ = heartbeat_data.lock().capture_heartbeat_if_needed(); | ||
let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_fn.clone())); | ||
heartbeat_fn.get().unwrap()(); | ||
|
||
// heartbeat timer fires once | ||
tokio::time::sleep(Duration::from_millis(300)).await; | ||
// it hasn't been >90% of the interval since the last heartbeat, so no data should be returned here | ||
assert_eq!(None, heartbeat_data.lock().capture_heartbeat_if_needed()); | ||
assert_eq!(None, heartbeat_fn.get().unwrap()()); | ||
// heartbeat timer fires once | ||
tokio::time::sleep(Duration::from_millis(150)).await; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm realizing this should probably just be a https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html