Skip to content

Commit 0c690a5

Browse files
committed
Fix scenario where server sends page token pointing to empty page (#533)
1 parent d8bf112 commit 0c690a5

File tree

3 files changed

+116
-10
lines changed

3 files changed

+116
-10
lines changed

core/src/core_tests/workflow_tasks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2297,7 +2297,7 @@ async fn ensure_fetching_fail_during_complete_sends_task_failure() {
22972297
mock.expect_get_workflow_execution_history()
22982298
.returning(move |_, _, _| {
22992299
error!("Called fetch second time!");
2300-
Ok(really_empty_fetch_resp.clone())
2300+
Err(tonic::Status::not_found("Ahh broken"))
23012301
})
23022302
.times(1);
23032303
mock.expect_fail_workflow_task()

core/src/worker/workflow/history_update.rs

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ pub struct HistoryPaginator {
8484
pub(crate) run_id: String,
8585
pub(crate) previous_wft_started_id: i64,
8686
pub(crate) wft_started_event_id: i64,
87+
id_of_last_event_in_last_extracted_update: Option<i64>,
8788

8889
#[cfg_attr(feature = "save_wf_inputs", serde(skip))]
8990
client: Arc<dyn WorkerClient>,
@@ -175,6 +176,7 @@ impl HistoryPaginator {
175176
run_id: req.original_wft.work.execution.run_id.clone(),
176177
previous_wft_started_id: req.original_wft.work.update.previous_wft_started_id,
177178
wft_started_event_id: req.original_wft.work.update.wft_started_id,
179+
id_of_last_event_in_last_extracted_update: None,
178180
client,
179181
event_queue: Default::default(),
180182
next_page_token: NextPageToken::FetchFromStart,
@@ -211,6 +213,7 @@ impl HistoryPaginator {
211213
final_events,
212214
previous_wft_started_id,
213215
wft_started_event_id,
216+
id_of_last_event_in_last_extracted_update: None,
214217
}
215218
}
216219

@@ -226,6 +229,7 @@ impl HistoryPaginator {
226229
final_events: vec![],
227230
previous_wft_started_id: -2,
228231
wft_started_event_id: -2,
232+
id_of_last_event_in_last_extracted_update: None,
229233
}
230234
}
231235

@@ -240,14 +244,45 @@ impl HistoryPaginator {
240244
/// we have two, or until we are at the end of history.
241245
pub(crate) async fn extract_next_update(&mut self) -> Result<HistoryUpdate, tonic::Status> {
242246
loop {
243-
let no_next_page = !self.get_next_page().await?;
247+
let fetch_happened = !self.get_next_page().await?;
244248
let current_events = mem::take(&mut self.event_queue);
245249
let seen_enough_events = current_events
246250
.back()
247251
.map(|e| e.event_id)
248252
.unwrap_or_default()
249253
>= self.wft_started_event_id;
250-
if current_events.is_empty() || (no_next_page && !seen_enough_events) {
254+
255+
// This handles a special case where the server might send us a page token along with
256+
// a real page which ends at the current end of history. The page token then points to
257+
// en empty page. We need to detect this, and consider it the end of history.
258+
//
259+
// This case unfortunately cannot be handled earlier, because we might fetch a page
260+
// from the server which contains two complete WFTs, and thus we are happy to return
261+
// an update at that time. But, if the page has a next page token, we *cannot* conclude
262+
// we are done with replay until we fetch that page. So, we have to wait until the next
263+
// extraction to determine (after fetching the next page and finding it to be empty)
264+
// that we are done. Fetching the page eagerly is another option, but would be wasteful
265+
// the overwhelming majority of the time.
266+
let already_sent_update_with_enough_events = self
267+
.id_of_last_event_in_last_extracted_update
268+
.unwrap_or_default()
269+
>= self.wft_started_event_id;
270+
if current_events.is_empty()
271+
&& !fetch_happened
272+
&& already_sent_update_with_enough_events
273+
{
274+
// We must return an empty update which also says is contains the final WFT so we
275+
// know we're done with replay.
276+
return Ok(HistoryUpdate::from_events(
277+
[],
278+
self.previous_wft_started_id,
279+
self.wft_started_event_id,
280+
true,
281+
)
282+
.0);
283+
}
284+
285+
if current_events.is_empty() || (fetch_happened && !seen_enough_events) {
251286
// If next page fetching happened, and we still ended up with no or insufficient
252287
// events, something is wrong. We're expecting there to be more events to be able to
253288
// extract this update, but server isn't giving us any. We have no choice except to
@@ -278,6 +313,8 @@ impl HistoryPaginator {
278313
// There was not a meaningful WFT in the whole page. We must fetch more.
279314
continue;
280315
}
316+
self.id_of_last_event_in_last_extracted_update =
317+
update.events.last().map(|e| e.event_id);
281318
return Ok(update);
282319
}
283320
}
@@ -1168,4 +1205,51 @@ pub mod tests {
11681205

11691206
// TODO: Test we dont re-feed pointless updates if fetching returns <= events we already
11701207
// processed
1208+
1209+
#[tokio::test]
1210+
async fn handles_fetching_page_with_complete_wft_and_page_token_to_empty_page() {
1211+
let timer_hist = canned_histories::single_timer("t");
1212+
let workflow_task = timer_hist.get_full_history_info().unwrap();
1213+
let prev_started_wft_id = workflow_task.previous_started_event_id();
1214+
let wft_started_id = workflow_task.workflow_task_started_event_id();
1215+
1216+
let mut full_resp_with_npt: GetWorkflowExecutionHistoryResponse =
1217+
timer_hist.get_full_history_info().unwrap().into();
1218+
full_resp_with_npt.next_page_token = vec![1];
1219+
1220+
let mut mock_client = mock_workflow_client();
1221+
mock_client
1222+
.expect_get_workflow_execution_history()
1223+
.returning(move |_, _, _| Ok(full_resp_with_npt.clone()))
1224+
.times(1);
1225+
mock_client
1226+
.expect_get_workflow_execution_history()
1227+
.returning(move |_, _, _| {
1228+
Ok(GetWorkflowExecutionHistoryResponse {
1229+
history: Some(History { events: vec![] }),
1230+
raw_history: vec![],
1231+
next_page_token: vec![],
1232+
archived: false,
1233+
})
1234+
})
1235+
.times(1);
1236+
1237+
let mut paginator = HistoryPaginator::new(
1238+
workflow_task.into(),
1239+
prev_started_wft_id,
1240+
wft_started_id,
1241+
"wfid".to_string(),
1242+
"runid".to_string(),
1243+
NextPageToken::FetchFromStart,
1244+
Arc::new(mock_client),
1245+
);
1246+
let mut update = paginator.extract_next_update().await.unwrap();
1247+
let seq = update.take_next_wft_sequence(0).unwrap_events();
1248+
assert_eq!(seq.last().unwrap().event_id, 3);
1249+
let seq = update.take_next_wft_sequence(3).unwrap_events();
1250+
assert_eq!(seq.last().unwrap().event_id, 8);
1251+
assert_matches!(update.take_next_wft_sequence(8), NextWFT::NeedFetch);
1252+
let mut update = paginator.extract_next_update().await.unwrap();
1253+
assert_matches!(update.take_next_wft_sequence(8), NextWFT::ReplayOver);
1254+
}
11711255
}

test-utils/src/lib.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use std::{
2222
time::Duration,
2323
};
2424
use temporal_client::{
25-
Client, RetryClient, WorkflowClientTrait, WorkflowExecutionInfo, WorkflowOptions,
25+
Client, ClientTlsConfig, RetryClient, TlsConfig, WorkflowClientTrait, WorkflowExecutionInfo,
26+
WorkflowOptions,
2627
};
2728
use temporal_sdk::{
2829
interceptors::{FailOnNondeterminismInterceptor, WorkerInterceptor},
@@ -34,8 +35,8 @@ use temporal_sdk_core::{
3435
replay::HistoryForReplay,
3536
ClientOptions, ClientOptionsBuilder, CoreRuntime, WorkerConfig, WorkerConfigBuilder,
3637
};
37-
use temporal_sdk_core_api::errors::{PollActivityError, PollWfError};
3838
use temporal_sdk_core_api::{
39+
errors::{PollActivityError, PollWfError},
3940
telemetry::{
4041
Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions, TelemetryOptionsBuilder,
4142
TraceExportConfig, TraceExporter,
@@ -60,6 +61,7 @@ pub const NAMESPACE: &str = "default";
6061
pub const TEST_Q: &str = "q";
6162
/// The env var used to specify where the integ tests should point
6263
pub const INTEG_SERVER_TARGET_ENV_VAR: &str = "TEMPORAL_SERVICE_ADDRESS";
64+
pub const INTEG_USE_TLS_ENV_VAR: &str = "TEMPORAL_USE_TLS";
6365
/// This env var is set (to any value) if temporal CLI dev server is in use
6466
pub const INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR: &str = "INTEG_TEMPORAL_DEV_SERVER_ON";
6567
/// This env var is set (to any value) if the test server is in use
@@ -533,13 +535,33 @@ pub fn get_integ_server_options() -> ClientOptions {
533535
Err(_) => "http://localhost:7233".to_owned(),
534536
};
535537
let url = Url::try_from(&*temporal_server_address).unwrap();
536-
ClientOptionsBuilder::default()
537-
.identity("integ_tester".to_string())
538+
let mut cb = ClientOptionsBuilder::default();
539+
cb.identity("integ_tester".to_string())
538540
.target_url(url)
539541
.client_name("temporal-core".to_string())
540-
.client_version("0.1.0".to_string())
541-
.build()
542-
.unwrap()
542+
.client_version("0.1.0".to_string());
543+
if let Some(tls) = get_integ_tls_config() {
544+
cb.tls_cfg(tls);
545+
};
546+
cb.build().unwrap()
547+
}
548+
549+
pub fn get_integ_tls_config() -> Option<TlsConfig> {
550+
if env::var(INTEG_USE_TLS_ENV_VAR).is_ok() {
551+
let root = std::fs::read("../.cloud_certs/ca.pem").unwrap();
552+
let client_cert = std::fs::read("../.cloud_certs/client.pem").unwrap();
553+
let client_private_key = std::fs::read("../.cloud_certs/client.key").unwrap();
554+
Some(TlsConfig {
555+
server_root_ca_cert: Some(root),
556+
domain: None,
557+
client_tls_config: Some(ClientTlsConfig {
558+
client_cert,
559+
client_private_key,
560+
}),
561+
})
562+
} else {
563+
None
564+
}
543565
}
544566

545567
pub fn get_integ_telem_options() -> TelemetryOptions {

0 commit comments

Comments
 (0)