Skip to content

Commit c1728c0

Browse files
committed
Ensure cancels get delivered after grace, even if poll stream not done (#531)
1 parent 10dc798 commit c1728c0

File tree

3 files changed

+175
-154
lines changed

3 files changed

+175
-154
lines changed

core/src/core_tests/activity_tasks.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use temporal_sdk_core_protos::{
5858
TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE,
5959
};
6060
use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, TestWorker};
61-
use tokio::{sync::Barrier, time::sleep};
61+
use tokio::{join, sync::Barrier, time::sleep};
6262
use tokio_util::sync::CancellationToken;
6363

6464
fn three_tasks() -> VecDeque<PollActivityTaskQueueResponse> {
@@ -288,7 +288,7 @@ async fn activity_cancel_interrupts_poll() {
288288
// Perform first poll to get the activity registered
289289
let act = core.poll_activity_task().await.unwrap();
290290
// Poll should block until heartbeat is sent, issuing the cancel, and interrupting the poll
291-
tokio::join! {
291+
join! {
292292
async {
293293
core.record_activity_heartbeat(ActivityHeartbeat {
294294
task_token: act.task_token,
@@ -984,7 +984,7 @@ async fn activity_tasks_from_completion_reserve_slots() {
984984
// This wf poll should *not* set the flag that it wants tasks back since both slots are
985985
// occupied
986986
let run_fut = async { worker.run_until_done().await.unwrap() };
987-
tokio::join!(run_fut, act_completer);
987+
join!(run_fut, act_completer);
988988
}
989989

990990
#[tokio::test]
@@ -1052,9 +1052,11 @@ async fn cant_complete_activity_with_unset_result_payload() {
10521052
)
10531053
}
10541054

1055+
#[rstest::rstest]
10551056
#[tokio::test]
1056-
async fn graceful_shutdown() {
1057+
async fn graceful_shutdown(#[values(true, false)] at_max_outstanding: bool) {
10571058
let _task_q = "q";
1059+
let grace_period = Duration::from_millis(200);
10581060
let mut tasks = three_tasks();
10591061
let mut mock_client = mock_workflow_client();
10601062
mock_client
@@ -1067,15 +1069,21 @@ async fn graceful_shutdown() {
10671069
.times(3)
10681070
.returning(|_, _| Ok(Default::default()));
10691071

1072+
let max_outstanding = if at_max_outstanding { 3_usize } else { 100 };
10701073
let worker = Worker::new_test(
10711074
test_worker_cfg()
1072-
.graceful_shutdown_period(Duration::from_millis(500))
1075+
.graceful_shutdown_period(grace_period)
1076+
.max_outstanding_activities(max_outstanding)
10731077
.build()
10741078
.unwrap(),
10751079
mock_client,
10761080
);
10771081

10781082
let _1 = worker.poll_activity_task().await.unwrap();
1083+
1084+
// Wait at least the grace period after one poll - ensuring it doesn't trigger prematurely
1085+
tokio::time::sleep(grace_period.mul_f32(1.1)).await;
1086+
10791087
let _2 = worker.poll_activity_task().await.unwrap();
10801088
let _3 = worker.poll_activity_task().await.unwrap();
10811089

0 commit comments

Comments
 (0)