-
Notifications
You must be signed in to change notification settings - Fork 90
Worker heartbeat #953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker heartbeat #953
Changes from 1 commit
dc53656
7b83f8e
a1d60a6
aef076e
261201c
f8ea55d
ebc6910
207feaa
100ebe2
35d996f
4b902b6
ba88360
51d0de0
ac35033
0190cb7
3c694e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,97 +1,13 @@ | ||
use crate::WorkerClient; | ||
use futures_util::future; | ||
use futures_util::future::AbortHandle; | ||
use gethostname::gethostname; | ||
use parking_lot::Mutex; | ||
use prost_types::Duration as PbDuration; | ||
use std::sync::Arc; | ||
use std::time::{Duration, SystemTime}; | ||
use temporal_sdk_core_api::worker::WorkerConfig; | ||
use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo}; | ||
use tokio::sync::watch; | ||
use uuid::Uuid; | ||
|
||
/// Heartbeat information | ||
/// | ||
/// Note: Experimental | ||
pub struct WorkerHeartbeatInfo { | ||
pub(crate) data: Arc<Mutex<WorkerHeartbeatData>>, | ||
timer_abort: AbortHandle, | ||
client: Option<Arc<dyn WorkerClient>>, | ||
interval: Option<Duration>, | ||
#[cfg(test)] | ||
heartbeats_sent: Arc<Mutex<usize>>, | ||
} | ||
|
||
impl WorkerHeartbeatInfo { | ||
/// Create a new WorkerHeartbeatInfo. A timer is immediately started to track the worker | ||
/// heartbeat interval. | ||
pub(crate) fn new(worker_config: WorkerConfig) -> Self { | ||
// unused abort handle, will be replaced with a new one when we start a new timer | ||
let (abort_handle, _) = AbortHandle::new_pair(); | ||
|
||
Self { | ||
data: Arc::new(Mutex::new(WorkerHeartbeatData::new(worker_config.clone()))), | ||
timer_abort: abort_handle, | ||
client: None, | ||
interval: worker_config.heartbeat_interval, | ||
#[cfg(test)] | ||
heartbeats_sent: Arc::new(Mutex::new(0)), | ||
} | ||
} | ||
|
||
/// Transform heartbeat data into `WorkerHeartbeat` we can send in gRPC request. Some | ||
/// metrics are also cached for future calls of this function. | ||
pub(crate) fn capture_heartbeat(&mut self) -> WorkerHeartbeat { | ||
self.create_new_timer(); | ||
|
||
self.data.lock().capture_heartbeat() | ||
} | ||
|
||
fn create_new_timer(&mut self) { | ||
self.timer_abort.abort(); | ||
|
||
let (abort_handle, abort_reg) = AbortHandle::new_pair(); | ||
let interval = if let Some(dur) = self.interval { | ||
dur | ||
} else { | ||
Duration::from_secs(60) | ||
}; | ||
let data = self.data.clone(); | ||
#[cfg(test)] | ||
let heartbeats_sent = self.heartbeats_sent.clone(); | ||
self.timer_abort = abort_handle.clone(); | ||
if let Some(client) = self.client.clone() { | ||
tokio::spawn(future::Abortable::new( | ||
async move { | ||
loop { | ||
tokio::time::sleep(interval).await; | ||
#[cfg(test)] | ||
{ | ||
let mut num = heartbeats_sent.lock(); | ||
*num += 1; | ||
} | ||
|
||
let heartbeat = data.lock().capture_heartbeat(); | ||
if let Err(e) = client.clone().record_worker_heartbeat(heartbeat).await { | ||
warn!(error=?e, "Network error while sending worker heartbeat"); | ||
} | ||
} | ||
}, | ||
abort_reg, | ||
)); | ||
} else { | ||
warn!("No client attached to heartbeat_info") | ||
}; | ||
} | ||
|
||
pub(crate) fn add_client(&mut self, client: Arc<dyn WorkerClient>) { | ||
self.client = Some(client); | ||
self.create_new_timer(); | ||
} | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub(crate) struct WorkerHeartbeatData { | ||
pub struct WorkerHeartbeatData { | ||
worker_instance_key: String, | ||
pub(crate) worker_identity: String, | ||
host_info: WorkerHostInfo, | ||
|
@@ -104,10 +20,12 @@ pub(crate) struct WorkerHeartbeatData { | |
pub(crate) sdk_version: String, | ||
/// Worker start time | ||
pub(crate) start_time: SystemTime, | ||
pub(crate) heartbeat_interval: Duration, | ||
pub(crate) reset_tx: Option<watch::Sender<()>>, | ||
} | ||
|
||
impl WorkerHeartbeatData { | ||
fn new(worker_config: WorkerConfig) -> Self { | ||
pub fn new(worker_config: WorkerConfig) -> Self { | ||
Self { | ||
// TODO: Is this right for worker_identity? | ||
worker_identity: worker_config | ||
|
@@ -125,13 +43,22 @@ impl WorkerHeartbeatData { | |
start_time: SystemTime::now(), | ||
heartbeat_time: None, | ||
worker_instance_key: Uuid::new_v4().to_string(), | ||
heartbeat_interval: worker_config.heartbeat_interval, | ||
reset_tx: None, | ||
} | ||
} | ||
|
||
fn capture_heartbeat(&mut self) -> WorkerHeartbeat { | ||
pub fn capture_heartbeat_if_needed(&mut self) -> Option<WorkerHeartbeat> { | ||
let now = SystemTime::now(); | ||
let elapsed_since_last_heartbeat = if let Some(heartbeat_time) = self.heartbeat_time { | ||
let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); // TODO: do we want to fall back to ZERO? | ||
let dur = now.duration_since(heartbeat_time).unwrap_or(Duration::ZERO); | ||
|
||
// Only send poll data if it's nearly been a full interval since this data has been sent | ||
// In this case, "nearly" is 90% of the interval | ||
if dur.as_secs_f64() < 0.9 * self.heartbeat_interval.as_secs_f64() { | ||
println!("Heartbeat interval not yet elapsed, not sending poll data"); | ||
return None; | ||
} | ||
Some(PbDuration { | ||
seconds: dur.as_secs() as i64, | ||
nanos: dur.subsec_nanos() as i32, | ||
|
@@ -141,8 +68,15 @@ impl WorkerHeartbeatData { | |
}; | ||
|
||
self.heartbeat_time = Some(now); | ||
if let Some(reset_tx) = &self.reset_tx { | ||
let _ = reset_tx.send(()); | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This state should be unrepresentable. A fun type puzzle! I think the way to fix this is to separate out data from some kind of handle for calling this function. The handle is created early on (including the reset channel inside it) and the data is created later, down inside This sort of "uninitialized" state still sort of exists between those two calls, but, at least it exists in a way that works fairly naturally with the return type of One option is something like having the handle contain a The more obvious option is to just have a I dunno, fun one, I thought about this for a while and had to stop myself from just trying to do it. Feel free to play around. I maybe spend too much time trying to eliminate states like these. If you think it's not worth it, which is legit, change this to a |
||
warn!( | ||
"No reset_tx attached to heartbeat_info, worker heartbeat was not properly setup" | ||
); | ||
} | ||
|
||
WorkerHeartbeat { | ||
Some(WorkerHeartbeat { | ||
worker_instance_key: self.worker_instance_key.clone(), | ||
worker_identity: self.worker_identity.clone(), | ||
host_info: Some(self.host_info.clone()), | ||
|
@@ -154,38 +88,33 @@ impl WorkerHeartbeatData { | |
heartbeat_time: Some(SystemTime::now().into()), | ||
elapsed_since_last_heartbeat, | ||
..Default::default() | ||
} | ||
}) | ||
} | ||
|
||
pub(crate) fn set_reset_tx(&mut self, reset_tx: watch::Sender<()>) { | ||
self.reset_tx = Some(reset_tx); | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::test_help::WorkerExt; | ||
use crate::test_help::test_worker_cfg; | ||
use crate::worker; | ||
use crate::worker::WorkerHeartbeatInfo; | ||
use crate::worker::client::mocks::mock_worker_client; | ||
use parking_lot::Mutex; | ||
use std::ops::Deref; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
use temporal_sdk_core_api::Worker; | ||
use temporal_sdk_core_api::worker::PollerBehavior; | ||
use temporal_sdk_core_protos::coresdk::ActivityTaskCompletion; | ||
use temporal_sdk_core_protos::coresdk::activity_result::ActivityExecutionResult; | ||
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::{ | ||
PollActivityTaskQueueResponse, RecordWorkerHeartbeatResponse, | ||
RespondActivityTaskCompletedResponse, | ||
}; | ||
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::RecordWorkerHeartbeatResponse; | ||
|
||
#[rstest::rstest] | ||
#[tokio::test] | ||
async fn worker_heartbeat(#[values(true, false)] extra_heartbeat: bool) { | ||
async fn worker_heartbeat() { | ||
let mut mock = mock_worker_client(); | ||
let record_heartbeat_calls = if extra_heartbeat { 2 } else { 3 }; | ||
mock.expect_record_worker_heartbeat() | ||
.times(record_heartbeat_calls) | ||
.returning(|heartbeat| { | ||
.times(2) | ||
.returning(move |heartbeat| { | ||
let host_info = heartbeat.host_info.clone().unwrap(); | ||
assert!(heartbeat.worker_identity.is_empty()); | ||
assert!(!heartbeat.worker_instance_key.is_empty()); | ||
|
@@ -202,54 +131,26 @@ mod tests { | |
|
||
Ok(RecordWorkerHeartbeatResponse {}) | ||
}); | ||
mock.expect_poll_activity_task() | ||
.times(1) | ||
.returning(move |_, _| { | ||
Ok(PollActivityTaskQueueResponse { | ||
task_token: vec![1], | ||
activity_id: "act1".to_string(), | ||
..Default::default() | ||
}) | ||
}); | ||
mock.expect_complete_activity_task() | ||
.returning(|_, _| Ok(RespondActivityTaskCompletedResponse::default())); | ||
|
||
let config = test_worker_cfg() | ||
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize)) | ||
.max_outstanding_activities(1_usize) | ||
.heartbeat_interval(Duration::from_millis(100)) | ||
.heartbeat_interval(Duration::from_millis(200)) | ||
.build() | ||
.unwrap(); | ||
|
||
let heartbeat_info = Arc::new(Mutex::new(WorkerHeartbeatInfo::new(config.clone()))); | ||
let heartbeat_data = Arc::new(Mutex::new(WorkerHeartbeatData::new(config.clone()))); | ||
let client = Arc::new(mock); | ||
heartbeat_info.lock().add_client(client.clone()); | ||
let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_info.clone())); | ||
let _ = heartbeat_info.lock().capture_heartbeat(); | ||
let worker = worker::Worker::new(config, None, client, None, Some(heartbeat_data.clone())); | ||
let _ = heartbeat_data.lock().capture_heartbeat_if_needed(); | ||
|
||
// heartbeat timer fires once | ||
tokio::time::sleep(Duration::from_millis(150)).await; | ||
if extra_heartbeat { | ||
// reset heartbeat timer | ||
heartbeat_info.lock().capture_heartbeat(); | ||
} | ||
tokio::time::sleep(Duration::from_millis(300)).await; | ||
// it hasn't been >90% of the interval since the last heartbeat, so no data should be returned here | ||
assert_eq!(None, heartbeat_data.lock().capture_heartbeat_if_needed()); | ||
// heartbeat timer fires once | ||
tokio::time::sleep(Duration::from_millis(180)).await; | ||
|
||
if extra_heartbeat { | ||
assert_eq!(2, *heartbeat_info.lock().heartbeats_sent.lock().deref()); | ||
} else { | ||
assert_eq!(3, *heartbeat_info.lock().heartbeats_sent.lock().deref()); | ||
} | ||
tokio::time::sleep(Duration::from_millis(150)).await; | ||
|
||
let task = worker.poll_activity_task().await.unwrap(); | ||
worker | ||
.complete_activity_task(ActivityTaskCompletion { | ||
task_token: task.task_token, | ||
result: Some(ActivityExecutionResult::ok(vec![1].into())), | ||
}) | ||
.await | ||
.unwrap(); | ||
worker.drain_activity_poller_and_shutdown().await; | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.