Skip to content

Commit 5ab2ddd

Browse files
committed
Fix possibly dropping final events in a WFT during cache miss (#537)
1 parent 3adb89a commit 5ab2ddd

File tree

3 files changed

+156
-16
lines changed

3 files changed

+156
-16
lines changed

core/src/worker/workflow/history_update.rs

Lines changed: 146 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,14 @@ impl HistoryPaginator {
176176
run_id: req.original_wft.work.execution.run_id.clone(),
177177
previous_wft_started_id: req.original_wft.work.update.previous_wft_started_id,
178178
wft_started_event_id: req.original_wft.work.update.wft_started_id,
179-
id_of_last_event_in_last_extracted_update: None,
179+
id_of_last_event_in_last_extracted_update: req
180+
.original_wft
181+
.paginator
182+
.id_of_last_event_in_last_extracted_update,
180183
client,
181184
event_queue: Default::default(),
182185
next_page_token: NextPageToken::FetchFromStart,
183-
final_events: vec![],
186+
final_events: req.original_wft.work.update.events,
184187
};
185188
let first_update = paginator.extract_next_update().await?;
186189
req.original_wft.work.update = first_update;
@@ -244,7 +247,7 @@ impl HistoryPaginator {
244247
/// we have two, or until we are at the end of history.
245248
pub(crate) async fn extract_next_update(&mut self) -> Result<HistoryUpdate, tonic::Status> {
246249
loop {
247-
let fetch_happened = !self.get_next_page().await?;
250+
let no_next_page = !self.get_next_page().await?;
248251
let current_events = mem::take(&mut self.event_queue);
249252
let seen_enough_events = current_events
250253
.back()
@@ -267,10 +270,7 @@ impl HistoryPaginator {
267270
.id_of_last_event_in_last_extracted_update
268271
.unwrap_or_default()
269272
>= self.wft_started_event_id;
270-
if current_events.is_empty()
271-
&& !fetch_happened
272-
&& already_sent_update_with_enough_events
273-
{
273+
if current_events.is_empty() && no_next_page && already_sent_update_with_enough_events {
274274
// We must return an empty update which also says is contains the final WFT so we
275275
// know we're done with replay.
276276
return Ok(HistoryUpdate::from_events(
@@ -282,12 +282,15 @@ impl HistoryPaginator {
282282
.0);
283283
}
284284

285-
if current_events.is_empty() || (fetch_happened && !seen_enough_events) {
285+
if current_events.is_empty() || (no_next_page && !seen_enough_events) {
286286
// If next page fetching happened, and we still ended up with no or insufficient
287287
// events, something is wrong. We're expecting there to be more events to be able to
288288
// extract this update, but server isn't giving us any. We have no choice except to
289289
// give up and evict.
290290
error!(
291+
current_events=?current_events,
292+
no_next_page,
293+
seen_enough_events,
291294
"We expected to be able to fetch more events but server says there are none"
292295
);
293296
return Err(EMPTY_FETCH_ERR.clone());
@@ -319,13 +322,13 @@ impl HistoryPaginator {
319322
}
320323
}
321324

322-
/// Fetches the next page and adds it to the internal queue. Returns true if a fetch was
323-
/// performed, false if there is no next page.
325+
/// Fetches the next page and adds it to the internal queue.
326+
/// Returns true if we still have a next page token after fetching.
324327
async fn get_next_page(&mut self) -> Result<bool, tonic::Status> {
325328
let history = loop {
326329
let npt = match mem::replace(&mut self.next_page_token, NextPageToken::Done) {
327330
// If the last page token we got was empty, we're done.
328-
NextPageToken::Done => return Ok(false),
331+
NextPageToken::Done => break None,
329332
NextPageToken::FetchFromStart => vec![],
330333
NextPageToken::Next(v) => v,
331334
};
@@ -366,7 +369,7 @@ impl HistoryPaginator {
366369
);
367370
}
368371
};
369-
Ok(true)
372+
Ok(!matches!(&self.next_page_token, NextPageToken::Done))
370373
}
371374
}
372375

@@ -715,11 +718,21 @@ pub mod tests {
715718
use super::*;
716719
use crate::{
717720
replay::{HistoryInfo, TestHistoryBuilder},
718-
test_help::canned_histories,
721+
test_help::{canned_histories, mock_sdk_cfg, MockPollCfg, ResponseType},
719722
worker::client::mocks::mock_workflow_client,
720723
};
724+
use futures::StreamExt;
721725
use futures_util::TryStreamExt;
722-
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::GetWorkflowExecutionHistoryResponse;
726+
use std::sync::atomic::{AtomicUsize, Ordering};
727+
use temporal_client::WorkflowOptions;
728+
use temporal_sdk::WfContext;
729+
use temporal_sdk_core_protos::{
730+
temporal::api::{
731+
common::v1::WorkflowExecution, enums::v1::WorkflowTaskFailedCause,
732+
workflowservice::v1::GetWorkflowExecutionHistoryResponse,
733+
},
734+
DEFAULT_WORKFLOW_TYPE,
735+
};
723736

724737
impl From<HistoryInfo> for HistoryUpdate {
725738
fn from(v: HistoryInfo) -> Self {
@@ -1252,4 +1265,123 @@ pub mod tests {
12521265
let mut update = paginator.extract_next_update().await.unwrap();
12531266
assert_matches!(update.take_next_wft_sequence(8), NextWFT::ReplayOver);
12541267
}
1268+
1269+
#[tokio::test]
1270+
async fn weird_pagination_doesnt_drop_wft_events() {
1271+
crate::telemetry::test_telem_console();
1272+
let wf_id = "fakeid";
1273+
// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
1274+
// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
1275+
// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED
1276+
// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
1277+
// empty page
1278+
// 5: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED
1279+
// 6: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
1280+
// 7: EVENT_TYPE_WORKFLOW_TASK_STARTED
1281+
// 8: EVENT_TYPE_WORKFLOW_TASK_FAILED
1282+
// empty page
1283+
// 9: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED
1284+
// 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
1285+
// 11: EVENT_TYPE_WORKFLOW_TASK_STARTED
1286+
// empty page
1287+
let mut t = TestHistoryBuilder::default();
1288+
t.add_by_type(EventType::WorkflowExecutionStarted);
1289+
t.add_full_wf_task();
1290+
1291+
t.add_we_signaled("hi", vec![]);
1292+
t.add_workflow_task_scheduled_and_started();
1293+
t.add_workflow_task_failed_with_failure(
1294+
WorkflowTaskFailedCause::UnhandledCommand,
1295+
Default::default(),
1296+
);
1297+
1298+
t.add_we_signaled("hi", vec![]);
1299+
t.add_workflow_task_scheduled_and_started();
1300+
1301+
let workflow_task = t.get_full_history_info().unwrap();
1302+
let mut wft_resp = workflow_task.as_poll_wft_response();
1303+
wft_resp.workflow_execution = Some(WorkflowExecution {
1304+
workflow_id: wf_id.to_string(),
1305+
run_id: t.get_orig_run_id().to_string(),
1306+
});
1307+
// Just 9/10/11 in WFT
1308+
wft_resp.history.as_mut().unwrap().events.drain(0..8);
1309+
1310+
let mut resp_1: GetWorkflowExecutionHistoryResponse =
1311+
t.get_full_history_info().unwrap().into();
1312+
resp_1.next_page_token = vec![1];
1313+
resp_1.history.as_mut().unwrap().events.truncate(4);
1314+
1315+
let mut mock_client = mock_workflow_client();
1316+
mock_client
1317+
.expect_get_workflow_execution_history()
1318+
.returning(move |_, _, _| Ok(resp_1.clone()))
1319+
.times(1);
1320+
mock_client
1321+
.expect_get_workflow_execution_history()
1322+
.returning(move |_, _, _| {
1323+
Ok(GetWorkflowExecutionHistoryResponse {
1324+
history: Some(History { events: vec![] }),
1325+
raw_history: vec![],
1326+
next_page_token: vec![2],
1327+
archived: false,
1328+
})
1329+
})
1330+
.times(1);
1331+
let mut resp_2: GetWorkflowExecutionHistoryResponse =
1332+
t.get_full_history_info().unwrap().into();
1333+
resp_2.next_page_token = vec![3];
1334+
resp_2.history.as_mut().unwrap().events.drain(0..4);
1335+
resp_2.history.as_mut().unwrap().events.truncate(4);
1336+
mock_client
1337+
.expect_get_workflow_execution_history()
1338+
.returning(move |_, _, _| Ok(resp_2.clone()))
1339+
.times(1);
1340+
mock_client
1341+
.expect_get_workflow_execution_history()
1342+
.returning(move |_, _, _| {
1343+
Ok(GetWorkflowExecutionHistoryResponse {
1344+
history: Some(History { events: vec![] }),
1345+
raw_history: vec![],
1346+
next_page_token: vec![],
1347+
archived: false,
1348+
})
1349+
})
1350+
.times(1);
1351+
1352+
let wf_type = DEFAULT_WORKFLOW_TYPE;
1353+
let mh =
1354+
MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::Raw(wft_resp)], mock_client);
1355+
let mut worker = mock_sdk_cfg(mh, |cfg| {
1356+
cfg.max_cached_workflows = 2;
1357+
cfg.ignore_evicts_on_shutdown = false;
1358+
});
1359+
1360+
let sig_ctr = Arc::new(AtomicUsize::new(0));
1361+
let sig_ctr_clone = sig_ctr.clone();
1362+
worker.register_wf(wf_type.to_owned(), move |ctx: WfContext| {
1363+
let sig_ctr_clone = sig_ctr_clone.clone();
1364+
async move {
1365+
let mut sigchan = ctx.make_signal_channel("hi");
1366+
while sigchan.next().await.is_some() {
1367+
if sig_ctr_clone.fetch_add(1, Ordering::AcqRel) == 1 {
1368+
break;
1369+
}
1370+
}
1371+
Ok(().into())
1372+
}
1373+
});
1374+
1375+
worker
1376+
.submit_wf(
1377+
wf_id.to_owned(),
1378+
wf_type.to_owned(),
1379+
vec![],
1380+
WorkflowOptions::default(),
1381+
)
1382+
.await
1383+
.unwrap();
1384+
worker.run_until_done().await.unwrap();
1385+
assert_eq!(sig_ctr.load(Ordering::Acquire), 2);
1386+
}
12551387
}

sdk/src/workflow_context.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
SignalExternalWfResult, TimerResult, UnblockEvent, Unblockable,
1212
};
1313
use crossbeam::channel::{Receiver, Sender};
14-
use futures::{task::Context, FutureExt, Stream};
14+
use futures::{task::Context, FutureExt, Stream, StreamExt};
1515
use parking_lot::RwLock;
1616
use std::{
1717
collections::HashMap,
@@ -398,6 +398,14 @@ impl DrainableSignalStream {
398398
}
399399
signals
400400
}
401+
402+
pub fn drain_ready(&mut self) -> Vec<SignalData> {
403+
let mut signals = vec![];
404+
while let Some(s) = self.0.next().now_or_never().flatten() {
405+
signals.push(s);
406+
}
407+
signals
408+
}
401409
}
402410

403411
impl Stream for DrainableSignalStream {

sdk/src/workflow_context/options.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ impl Signal {
272272
}
273273

274274
/// Data contained within a signal
275-
#[derive(Default)]
275+
#[derive(Default, Debug)]
276276
pub struct SignalData {
277277
/// The arguments the signal will receive
278278
pub input: Vec<Payload>,

0 commit comments

Comments
 (0)