Skip to content

Commit 2026da7

Browse files
Sushisourcemjameswh
andcommitted
Fix more pagination insanity (#538)
* Make all SDK team codeowners Co-authored-by: James Watkins-Harvey <james.watkinsharvey@temporal.io>
1 parent 5ab2ddd commit 2026da7

File tree

7 files changed

+544
-75
lines changed

7 files changed

+544
-75
lines changed

.buildkite/pipeline.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ steps:
1717
agents:
1818
queue: "default"
1919
docker: "*"
20-
command: "cargo lint"
20+
command: "cargo lint && cargo test-lint"
2121
timeout_in_minutes: 15
2222
plugins:
2323
- docker-compose#v3.0.0:

.cargo/config.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ wf-input-replay = ["run", "--package", "temporal-sdk-core", "--features", "save_
44
"--example", "wf_input_replay", "--"]
55
lint = ["clippy", "--workspace", "--examples", "--all-features",
66
"--test", "integ_tests", "--test", "heavy_tests", "--", "--D", "warnings"]
7+
test-lint = ["clippy", "--all", "--all-features", "--examples", "--workspace",
8+
"--tests", "--", "--D", "warnings"]

CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
# Primary owners
22

3-
* @Sushisource @bergundy @cretz @Spikhalskiy
3+
* @temporalio/sdk

core/src/core_tests/workflow_tasks.rs

Lines changed: 25 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -896,10 +896,7 @@ async fn workflow_failures_only_reported_once() {
896896
#[tokio::test]
897897
async fn max_wft_respected() {
898898
let total_wfs = 100;
899-
let wf_ids: Vec<_> = (0..total_wfs)
900-
.into_iter()
901-
.map(|i| format!("fake-wf-{i}"))
902-
.collect();
899+
let wf_ids: Vec<_> = (0..total_wfs).map(|i| format!("fake-wf-{i}")).collect();
903900
let hists = wf_ids.iter().map(|wf_id| {
904901
let hist = canned_histories::single_timer("1");
905902
FakeWfResponses {
@@ -1027,7 +1024,7 @@ async fn activity_not_canceled_when_also_completed_repro(hist_batches: &'static
10271024
#[tokio::test]
10281025
async fn lots_of_workflows() {
10291026
let total_wfs = 500;
1030-
let hists = (0..total_wfs).into_iter().map(|i| {
1027+
let hists = (0..total_wfs).map(|i| {
10311028
let wf_id = format!("fake-wf-{i}");
10321029
let hist = canned_histories::single_timer("1");
10331030
FakeWfResponses {
@@ -1705,9 +1702,7 @@ async fn pagination_works_with_tasks_from_completion() {
17051702
t.add_by_type(EventType::WorkflowExecutionStarted);
17061703
t.add_full_wf_task();
17071704
t.add_we_signaled("sig", vec![]);
1708-
t.add_full_wf_task();
1709-
t.add_workflow_execution_completed();
1710-
let get_exec_resp: GetWorkflowExecutionHistoryResponse = t.get_history_info(2).unwrap().into();
1705+
t.add_workflow_task_scheduled_and_started();
17111706

17121707
let mut mock = mock_workflow_client();
17131708
let mut needs_pag_resp = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::OneTask(2)).resp;
@@ -1722,9 +1717,13 @@ async fn pagination_works_with_tasks_from_completion() {
17221717
mock.expect_complete_workflow_task()
17231718
.times(1)
17241719
.returning(|_| Ok(Default::default()));
1720+
1721+
let get_exec_resp: GetWorkflowExecutionHistoryResponse =
1722+
t.get_full_history_info().unwrap().into();
17251723
mock.expect_get_workflow_execution_history()
17261724
.returning(move |_, _, _| Ok(get_exec_resp.clone()))
17271725
.times(1);
1726+
17281727
let mut mock = single_hist_mock_sg(wfid, t, [1], mock, true);
17291728
mock.worker_cfg(|wc| wc.max_cached_workflows = 2);
17301729
let core = mock_worker(mock);
@@ -2162,23 +2161,15 @@ async fn fetching_to_continue_replay_works() {
21622161
t.add_full_wf_task(); // end 14
21632162
let mut fetch_resp: GetWorkflowExecutionHistoryResponse =
21642163
t.get_full_history_info().unwrap().into();
2165-
// Should only contain events after 7
2166-
if let Some(ref mut h) = fetch_resp.history {
2167-
h.events.retain(|e| e.event_id >= 8);
2168-
}
21692164
// And indicate that even *more* needs to be fetched after this, so we see a request for the
21702165
// next page happen.
21712166
fetch_resp.next_page_token = vec![2];
21722167

21732168
let timer_started_event_id = t.add_by_type(EventType::TimerStarted);
21742169
t.add_timer_fired(timer_started_event_id, "1".to_string());
21752170
t.add_full_wf_task();
2176-
let mut final_fetch_resp: GetWorkflowExecutionHistoryResponse =
2171+
let final_fetch_resp: GetWorkflowExecutionHistoryResponse =
21772172
t.get_full_history_info().unwrap().into();
2178-
// Should have only the final event
2179-
if let Some(ref mut h) = final_fetch_resp.history {
2180-
h.events.retain(|e| e.event_id >= 15);
2181-
}
21822173

21832174
let tasks = vec![
21842175
ResponseType::ToTaskNum(1),
@@ -2273,15 +2264,25 @@ async fn ensure_fetching_fail_during_complete_sends_task_failure() {
22732264
t.add_full_wf_task(); // started 3
22742265
t.add_we_signaled("sig1", vec![]);
22752266
t.add_full_wf_task(); // started 7
2276-
t.add_we_signaled("sig2", vec![]);
2267+
2268+
// Need a command event after here so the paginator will know it has two complete WFTs and
2269+
// processing can begin before needing to fetch again
2270+
t.add_by_type(EventType::TimerStarted);
22772271
t.add_full_wf_task(); // started 11
22782272
t.add_workflow_execution_completed();
22792273

2280-
let mut first_poll = hist_to_poll_resp(&t, wfid, ResponseType::ToTaskNum(1)).resp;
2281-
first_poll.next_page_token = vec![1];
2282-
first_poll.previous_started_event_id = 3;
2274+
let mut first_poll = hist_to_poll_resp(&t, wfid, ResponseType::OneTask(4)).resp;
2275+
// History is partial so fetch will happen. We have to lie here and make up a previous started
2276+
// which really makes no sense, otherwise the paginator eagerly fetches and will fail before we
2277+
// ever start anything -- which is good -- but this test wants to make sure a fetching failure
2278+
// during a completion is handled correctly. That may no longer actually be a thing that can
2279+
// happen.
2280+
first_poll.previous_started_event_id = 0;
2281+
first_poll.started_event_id = 11;
22832282

2284-
let mut next_page: GetWorkflowExecutionHistoryResponse = t.get_history_info(2).unwrap().into();
2283+
let mut next_page: GetWorkflowExecutionHistoryResponse =
2284+
t.get_full_history_info().unwrap().into();
2285+
next_page.history.as_mut().unwrap().events.truncate(9);
22852286
next_page.next_page_token = vec![2];
22862287

22872288
let mut mock = mock_workflow_client();
@@ -2291,9 +2292,6 @@ async fn ensure_fetching_fail_during_complete_sends_task_failure() {
22912292
Ok(next_page.clone())
22922293
})
22932294
.times(1);
2294-
let mut really_empty_fetch_resp: GetWorkflowExecutionHistoryResponse =
2295-
t.get_history_info(1).unwrap().into();
2296-
really_empty_fetch_resp.history = Some(Default::default());
22972295
mock.expect_get_workflow_execution_history()
22982296
.returning(move |_, _, _| {
22992297
error!("Called fetch second time!");
@@ -2314,24 +2312,13 @@ async fn ensure_fetching_fail_during_complete_sends_task_failure() {
23142312
.await
23152313
.unwrap();
23162314

2317-
let wf_task = core.poll_workflow_activation().await.unwrap();
2318-
assert_matches!(
2319-
wf_task.jobs.as_slice(),
2320-
[WorkflowActivationJob {
2321-
variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)),
2322-
},]
2323-
);
2324-
core.complete_workflow_activation(WorkflowActivationCompletion::empty(wf_task.run_id))
2325-
.await
2326-
.unwrap();
2327-
23282315
// Expect to see eviction b/c of history fetching error here.
23292316
let wf_task = core.poll_workflow_activation().await.unwrap();
23302317
assert_matches!(
23312318
wf_task.jobs.as_slice(),
23322319
[WorkflowActivationJob {
2333-
variant: Some(workflow_activation_job::Variant::RemoveFromCache(_)),
2334-
},]
2320+
variant: Some(workflow_activation_job::Variant::RemoveFromCache(c)),
2321+
}] if c.message.contains("Fetching history")
23352322
);
23362323

23372324
core.shutdown().await;
@@ -2401,7 +2388,6 @@ async fn core_internal_flags() {
24012388
.copied()
24022389
.collect::<HashSet<_>>(),
24032390
CoreInternalFlags::all_except_too_high()
2404-
.into_iter()
24052391
.map(|f| f as u32)
24062392
.collect()
24072393
);

0 commit comments

Comments
 (0)