Skip to content

Commit ee687bf

Browse files
authored
Wait for activity completes to reach server before shutdown (#681)
1 parent 4f0b7f3 commit ee687bf

File tree

2 files changed

+82
-6
lines changed

2 files changed

+82
-6
lines changed

core/src/core_tests/activity_tasks.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::{
1717
future,
1818
rc::Rc,
1919
sync::{
20-
atomic::{AtomicUsize, Ordering},
20+
atomic::{AtomicBool, AtomicUsize, Ordering},
2121
Arc,
2222
},
2323
time::Duration,
@@ -1057,7 +1057,6 @@ async fn cant_complete_activity_with_unset_result_payload() {
10571057
#[rstest::rstest]
10581058
#[tokio::test]
10591059
async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) {
1060-
let _task_q = "q";
10611060
let grace_period = Duration::from_millis(200);
10621061
let mut tasks = three_tasks();
10631062
let mut mock_act_poller = mock_poller();
@@ -1122,3 +1121,73 @@ async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) {
11221121
}
11231122
worker.drain_pollers_and_shutdown().await;
11241123
}
1124+
1125+
#[rstest::rstest]
1126+
#[tokio::test]
1127+
async fn activities_must_be_flushed_to_server_on_shutdown(#[values(true, false)] use_grace: bool) {
1128+
crate::telemetry::test_telem_console();
1129+
1130+
let grace_period = if use_grace {
1131+
// Even though the grace period is shorter than the client call, the client call will still
1132+
// go through. This is reasonable since the client has a timeout anyway, and it's unlikely
1133+
// that a user *needs* an extremely short grace period (it'd be kind of pointless in that
1134+
// case). They can always force-kill their worker in this situation.
1135+
Duration::from_millis(50)
1136+
} else {
1137+
Duration::from_secs(10)
1138+
};
1139+
let shutdown_finished: &'static AtomicBool = Box::leak(Box::new(AtomicBool::new(false)));
1140+
let mut tasks = three_tasks();
1141+
let mut mock_act_poller = mock_poller();
1142+
mock_act_poller
1143+
.expect_poll()
1144+
.times(1)
1145+
.returning(move || Some(Ok(tasks.pop_front().unwrap())));
1146+
mock_act_poller
1147+
.expect_poll()
1148+
.times(1)
1149+
.returning(move || None);
1150+
let mut mock_client = mock_manual_workflow_client();
1151+
mock_client
1152+
.expect_complete_activity_task()
1153+
.times(1)
1154+
.returning(|_, _| {
1155+
async {
1156+
// We need some artificial delay here and there's nothing meaningful to sync with
1157+
tokio::time::sleep(Duration::from_millis(100)).await;
1158+
if shutdown_finished.load(Ordering::Acquire) {
1159+
panic!("Shutdown must complete *after* server sees the activity completion");
1160+
}
1161+
Ok(Default::default())
1162+
}
1163+
.boxed()
1164+
});
1165+
1166+
let mw = MockWorkerInputs {
1167+
act_poller: Some(Box::from(mock_act_poller)),
1168+
config: test_worker_cfg()
1169+
.graceful_shutdown_period(grace_period)
1170+
.max_concurrent_at_polls(1_usize) // Makes test logic simple
1171+
.build()
1172+
.unwrap(),
1173+
..Default::default()
1174+
};
1175+
let worker = mock_worker(MocksHolder::from_mock_worker(mock_client, mw));
1176+
1177+
let task = worker.poll_activity_task().await.unwrap();
1178+
1179+
let shutdown_task = async {
1180+
worker.drain_activity_poller_and_shutdown().await;
1181+
shutdown_finished.store(true, Ordering::Release);
1182+
};
1183+
let complete_task = async {
1184+
worker
1185+
.complete_activity_task(ActivityTaskCompletion {
1186+
task_token: task.task_token,
1187+
result: Some(ActivityExecutionResult::ok("hi".into())),
1188+
})
1189+
.await
1190+
.unwrap();
1191+
};
1192+
join!(shutdown_task, complete_task);
1193+
}

core/src/worker/activities.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ pub(crate) struct WorkerActivityTasks {
152152
/// eager activities). Tasks received in this stream hold a "tracked" permit that is issued by
153153
/// the `eager_activities_semaphore`.
154154
eager_activities_tx: UnboundedSender<TrackedPermittedTqResp>,
155+
/// Ensures that no activities are in the middle of flushing their results to server while we
156+
/// try to shut down.
157+
completers_lock: tokio::sync::RwLock<()>,
155158

156159
metrics: MetricsContext,
157160

@@ -230,6 +233,7 @@ impl WorkerActivityTasks {
230233
default_heartbeat_throttle_interval,
231234
poll_returned_shutdown_token: CancellationToken::new(),
232235
outstanding_activity_tasks,
236+
completers_lock: Default::default(),
233237
}
234238
}
235239

@@ -283,6 +287,7 @@ impl WorkerActivityTasks {
283287

284288
pub(crate) async fn shutdown(&self) {
285289
self.initiate_shutdown();
290+
let _ = self.completers_lock.write().await;
286291
self.poll_returned_shutdown_token.cancelled().await;
287292
self.heartbeat_manager.shutdown().await;
288293
}
@@ -321,10 +326,10 @@ impl WorkerActivityTasks {
321326
jh.abort()
322327
};
323328
self.heartbeat_manager.evict(task_token.clone()).await;
324-
self.complete_notify.notify_waiters();
325329

326330
// No need to report activities which we already know the server doesn't care about
327331
if !known_not_found {
332+
let _flushing_guard = self.completers_lock.read().await;
328333
let maybe_net_err = match status {
329334
aer::Status::WillCompleteAsync(_) => None,
330335
aer::Status::Completed(ar::Success { result }) => client
@@ -364,8 +369,8 @@ impl WorkerActivityTasks {
364369
{
365370
details
366371
} else {
367-
warn!(task_token = ? task_token,
368-
"Expected activity cancelled status with CanceledFailureInfo");
372+
warn!(task_token=?task_token,
373+
"Expected activity cancelled status with CanceledFailureInfo");
369374
None
370375
};
371376
client
@@ -376,9 +381,11 @@ impl WorkerActivityTasks {
376381
}
377382
};
378383

384+
self.complete_notify.notify_waiters();
385+
379386
if let Some(e) = maybe_net_err {
380387
if e.code() == tonic::Code::NotFound {
381-
warn!(task_token = ?task_token, details = ?e, "Activity not found on \
388+
warn!(task_token=?task_token, details=?e, "Activity not found on \
382389
completion. This may happen if the activity has already been cancelled but \
383390
completed anyway.");
384391
} else {

0 commit comments

Comments
 (0)