Skip to content

Commit 4f0b7f3

Browse files
authored
Fix possibly sending LA marker after WF Complete command (#678)
1 parent 59987d4 commit 4f0b7f3

File tree

5 files changed

+180
-53
lines changed

5 files changed

+180
-53
lines changed

core/src/core_tests/local_activities.rs

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::{
22
prost_dur,
33
replay::{default_wes_attribs, TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE},
44
test_help::{
5-
hist_to_poll_resp, mock_sdk, mock_sdk_cfg, mock_worker, single_hist_mock_sg, MockPollCfg,
6-
ResponseType, WorkerExt,
5+
build_mock_pollers, hist_to_poll_resp, mock_sdk, mock_sdk_cfg, mock_worker,
6+
single_hist_mock_sg, MockPollCfg, ResponseType, WorkerExt,
77
},
88
worker::{client::mocks::mock_workflow_client, LEGACY_QUERY_ID},
99
};
@@ -38,7 +38,7 @@ use temporal_sdk_core_protos::{
3838
},
3939
temporal::api::{
4040
common::v1::RetryPolicy,
41-
enums::v1::{EventType, TimeoutType, WorkflowTaskFailedCause},
41+
enums::v1::{CommandType, EventType, TimeoutType, WorkflowTaskFailedCause},
4242
failure::v1::{failure::FailureInfo, Failure},
4343
query::v1::WorkflowQuery,
4444
},
@@ -1295,3 +1295,98 @@ async fn queries_can_be_received_while_heartbeating() {
12951295

12961296
core.drain_pollers_and_shutdown().await;
12971297
}
1298+
1299+
#[tokio::test]
1300+
async fn local_activity_after_wf_complete_is_discarded() {
1301+
let wfid = "fake_wf_id";
1302+
let mut t = TestHistoryBuilder::default();
1303+
t.add_wfe_started_with_wft_timeout(Duration::from_millis(200));
1304+
t.add_full_wf_task();
1305+
t.add_workflow_task_scheduled_and_started();
1306+
1307+
let mock = mock_workflow_client();
1308+
let mut mock_cfg = MockPollCfg::from_resp_batches(
1309+
wfid,
1310+
t,
1311+
[ResponseType::ToTaskNum(1), ResponseType::ToTaskNum(2)],
1312+
mock,
1313+
);
1314+
mock_cfg.make_poll_stream_interminable = true;
1315+
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
1316+
asserts
1317+
.then(move |wft| {
1318+
assert_eq!(wft.commands.len(), 0);
1319+
})
1320+
.then(move |wft| {
1321+
assert_eq!(wft.commands.len(), 2);
1322+
assert_eq!(wft.commands[0].command_type(), CommandType::RecordMarker);
1323+
assert_eq!(
1324+
wft.commands[1].command_type(),
1325+
CommandType::CompleteWorkflowExecution
1326+
);
1327+
});
1328+
});
1329+
let mut mock = build_mock_pollers(mock_cfg);
1330+
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
1331+
let core = mock_worker(mock);
1332+
1333+
let barr = Barrier::new(2);
1334+
1335+
let task = core.poll_workflow_activation().await.unwrap();
1336+
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
1337+
task.run_id,
1338+
vec![
1339+
ScheduleLocalActivity {
1340+
seq: 1,
1341+
activity_id: "1".to_string(),
1342+
activity_type: "test_act".to_string(),
1343+
start_to_close_timeout: Some(prost_dur!(from_secs(30))),
1344+
..Default::default()
1345+
}
1346+
.into(),
1347+
ScheduleLocalActivity {
1348+
seq: 2,
1349+
activity_id: "2".to_string(),
1350+
activity_type: "test_act".to_string(),
1351+
start_to_close_timeout: Some(prost_dur!(from_secs(30))),
1352+
..Default::default()
1353+
}
1354+
.into(),
1355+
],
1356+
))
1357+
.await
1358+
.unwrap();
1359+
1360+
let wf_poller = async {
1361+
let task = core.poll_workflow_activation().await.unwrap();
1362+
assert_matches!(
1363+
task.jobs.as_slice(),
1364+
[WorkflowActivationJob {
1365+
variant: Some(workflow_activation_job::Variant::ResolveActivity(_)),
1366+
}]
1367+
);
1368+
barr.wait().await;
1369+
core.complete_execution(&task.run_id).await;
1370+
};
1371+
1372+
let at_poller = async {
1373+
let act_task = core.poll_activity_task().await.unwrap();
1374+
core.complete_activity_task(ActivityTaskCompletion {
1375+
task_token: act_task.task_token,
1376+
result: Some(ActivityExecutionResult::ok(vec![1].into())),
1377+
})
1378+
.await
1379+
.unwrap();
1380+
let act_task = core.poll_activity_task().await.unwrap();
1381+
barr.wait().await;
1382+
core.complete_activity_task(ActivityTaskCompletion {
1383+
task_token: act_task.task_token,
1384+
result: Some(ActivityExecutionResult::ok(vec![2].into())),
1385+
})
1386+
.await
1387+
.unwrap();
1388+
};
1389+
1390+
join!(wf_poller, at_poller);
1391+
core.drain_pollers_and_shutdown().await;
1392+
}

core/src/worker/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,14 @@ impl WorkerTrait for Worker {
179179
// Let the manager know that shutdown has been initiated to try to unblock the local
180180
// activity poll in case this worker is an activity-only worker.
181181
self.local_act_mgr.shutdown_initiated();
182+
182183
if !self.workflows.ever_polled() {
183184
self.local_act_mgr.workflows_have_shutdown();
185+
} else {
186+
// Bump the workflow stream with a pointless input, since if a client initiates shutdown
187+
// and then immediately blocks waiting on a workflow activation poll, it's possible that
188+
// there may not be any more inputs ever, and that poll will never resolve.
189+
self.workflows.send_get_state_info_msg();
184190
}
185191
}
186192

@@ -189,7 +195,6 @@ impl WorkerTrait for Worker {
189195
}
190196

191197
async fn finalize_shutdown(self) {
192-
self.shutdown().await;
193198
self.finalize_shutdown().await
194199
}
195200
}

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1467,6 +1467,9 @@ impl WorkflowMachines {
14671467
let cwfm = self.add_new_command_machine(machine);
14681468
self.workflow_end_time = Some(SystemTime::now());
14691469
self.current_wf_task_commands.push_back(cwfm);
1470+
// Wipe out any pending / executing local activity data since we're about to terminate
1471+
// and there's nothing to be done with them.
1472+
self.local_activity_data.indicate_terminating();
14701473
}
14711474

14721475
/// Add a new command/machines for that command to the current workflow task

core/src/worker/workflow/machines/workflow_machines/local_acts.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub(super) struct LocalActivityData {
2020
/// Maps local activity sequence numbers to their resolutions as found when looking ahead at
2121
/// next WFT
2222
preresolutions: HashMap<u32, ResolveDat>,
23+
/// Set true if the workflow is terminating
24+
am_terminating: bool,
2325
}
2426

2527
impl LocalActivityData {
@@ -45,6 +47,10 @@ impl LocalActivityData {
4547
wf_id: &str,
4648
run_id: &str,
4749
) -> Vec<LocalActRequest> {
50+
if self.am_terminating {
51+
return vec![LocalActRequest::CancelAllInRun(run_id.to_string())];
52+
}
53+
4854
self.cancel_requests
4955
.drain(..)
5056
.map(LocalActRequest::Cancel)
@@ -65,6 +71,9 @@ impl LocalActivityData {
6571

6672
/// Returns all outstanding local activities, whether executing or requested and in the queue
6773
pub(super) fn outstanding_la_count(&self) -> usize {
74+
if self.am_terminating {
75+
return 0;
76+
}
6877
self.executing.len() + self.new_requests.len()
6978
}
7079

@@ -82,4 +91,10 @@ impl LocalActivityData {
8291
.position(|req| req.seq == seq)
8392
.map(|i| self.new_requests.remove(i))
8493
}
94+
95+
/// Store that this workflow is terminating, and thus no new LA requests need be processed,
96+
/// and any executing LAs should not prevent us from shutting down.
97+
pub(super) fn indicate_terminating(&mut self) {
98+
self.am_terminating = true;
99+
}
85100
}

core/src/worker/workflow/mod.rs

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -180,58 +180,60 @@ impl Workflows {
180180
// We must spawn a task to constantly poll the activation stream, because otherwise
181181
// activation completions would not cause anything to happen until the next poll.
182182
let tracing_sub = telem_instance.and_then(|ti| ti.trace_subscriber());
183-
let processing_task = thread::spawn(move || {
184-
if let Some(ts) = tracing_sub {
185-
set_trace_subscriber_for_current_thread(ts);
186-
}
187-
let rt = tokio::runtime::Builder::new_current_thread()
188-
.enable_all()
189-
.thread_name("workflow-processing")
190-
.build()
191-
.unwrap();
192-
let local = LocalSet::new();
193-
local.block_on(&rt, async move {
194-
let mut stream = WFStream::build(
195-
basics,
196-
extracted_wft_stream,
197-
locals_stream,
198-
local_activity_request_sink,
199-
);
183+
let processing_task = thread::Builder::new()
184+
.name("workflow-processing".to_string())
185+
.spawn(move || {
186+
if let Some(ts) = tracing_sub {
187+
set_trace_subscriber_for_current_thread(ts);
188+
}
189+
let rt = tokio::runtime::Builder::new_current_thread()
190+
.enable_all()
191+
.build()
192+
.unwrap();
193+
let local = LocalSet::new();
194+
local.block_on(&rt, async move {
195+
let mut stream = WFStream::build(
196+
basics,
197+
extracted_wft_stream,
198+
locals_stream,
199+
local_activity_request_sink,
200+
);
200201

201-
// However, we want to avoid plowing ahead until we've been asked to poll at least
202-
// once. This supports activity-only workers.
203-
let do_poll = tokio::select! {
204-
sp = start_polling_rx => {
205-
sp.is_ok()
206-
}
207-
_ = shutdown_tok.cancelled() => {
208-
false
202+
// However, we want to avoid plowing ahead until we've been asked to poll at least
203+
// once. This supports activity-only workers.
204+
let do_poll = tokio::select! {
205+
sp = start_polling_rx => {
206+
sp.is_ok()
207+
}
208+
_ = shutdown_tok.cancelled() => {
209+
false
210+
}
211+
};
212+
if !do_poll {
213+
return;
209214
}
210-
};
211-
if !do_poll {
212-
return;
213-
}
214-
while let Some(output) = stream.next().await {
215-
match output {
216-
Ok(o) => {
217-
for fetchreq in o.fetch_histories {
218-
fetch_tx
219-
.send(fetchreq)
220-
.expect("Fetch channel must not be dropped");
221-
}
222-
for act in o.activations {
223-
activation_tx
224-
.send(Ok(act))
225-
.expect("Activation processor channel not dropped");
215+
while let Some(output) = stream.next().await {
216+
match output {
217+
Ok(o) => {
218+
for fetchreq in o.fetch_histories {
219+
fetch_tx
220+
.send(fetchreq)
221+
.expect("Fetch channel must not be dropped");
222+
}
223+
for act in o.activations {
224+
activation_tx
225+
.send(Ok(act))
226+
.expect("Activation processor channel not dropped");
227+
}
226228
}
229+
Err(e) => activation_tx
230+
.send(Err(e))
231+
.expect("Activation processor channel not dropped"),
227232
}
228-
Err(e) => activation_tx
229-
.send(Err(e))
230-
.expect("Activation processor channel not dropped"),
231233
}
232-
}
233-
});
234-
});
234+
});
235+
})
236+
.expect("Must be able to spawn workflow processing thread");
235237
Self {
236238
task_queue,
237239
local_tx,
@@ -498,10 +500,17 @@ impl Workflows {
498500
});
499501
}
500502

501-
/// Query the state of workflow management. Can return `None` if workflow state is shut down.
502-
pub(super) fn get_state_info(&self) -> impl Future<Output = Option<WorkflowStateInfo>> {
503+
/// Send a `GetStateInfoMsg` to the workflow stream. Can be used to bump the stream if there
504+
/// would otherwise be no new inputs.
505+
pub(super) fn send_get_state_info_msg(&self) -> oneshot::Receiver<WorkflowStateInfo> {
503506
let (tx, rx) = oneshot::channel();
504507
self.send_local(GetStateInfoMsg { response_tx: tx });
508+
rx
509+
}
510+
511+
/// Query the state of workflow management. Can return `None` if workflow state is shut down.
512+
pub(super) fn get_state_info(&self) -> impl Future<Output = Option<WorkflowStateInfo>> {
513+
let rx = self.send_get_state_info_msg();
505514
async move { rx.await.ok() }
506515
}
507516

0 commit comments

Comments
 (0)