Skip to content

Commit 117cdf2

Browse files
authored
Fix CI test flake (#675)
1 parent cb08b1b commit 117cdf2

File tree

6 files changed

+39
-22
lines changed

6 files changed

+39
-22
lines changed

.github/workflows/per-pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ jobs:
6565
- uses: actions-rs/cargo@v1
6666
with:
6767
command: test
68-
args: -- --include-ignored
68+
args: -- --include-ignored --nocapture
6969
- uses: actions/upload-artifact@v3
7070
with:
7171
name: coverage-report

core/src/core_tests/workflow_tasks.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2112,15 +2112,12 @@ async fn continue_as_new_preserves_some_values() {
21122112
wes_attrs.search_attributes = Some(search);
21132113
wes_attrs.retry_policy = Some(retry_policy);
21142114
let mut mock_client = mock_workflow_client();
2115-
let hist = {
2115+
let t = {
21162116
let mut t = TestHistoryBuilder::default();
21172117
t.add(wes_attrs.clone());
21182118
t.add_full_wf_task();
21192119
t
21202120
};
2121-
mock_client.expect_poll_workflow_task().returning(move |_| {
2122-
Ok(hist_to_poll_resp(&hist, wfid.to_owned(), ResponseType::AllHistory).resp)
2123-
});
21242121
mock_client
21252122
.expect_complete_workflow_task()
21262123
.returning(move |mut c| {
@@ -2135,8 +2132,8 @@ async fn continue_as_new_preserves_some_values() {
21352132
}
21362133
Ok(Default::default())
21372134
});
2138-
2139-
let worker = Worker::new_test(test_worker_cfg().build().unwrap(), mock_client);
2135+
let mock = single_hist_mock_sg(wfid, t, vec![ResponseType::AllHistory], mock_client, true);
2136+
let worker = mock_worker(mock);
21402137
let r = worker.poll_workflow_activation().await.unwrap();
21412138
worker
21422139
.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
@@ -2149,6 +2146,7 @@ async fn continue_as_new_preserves_some_values() {
21492146
))
21502147
.await
21512148
.unwrap();
2149+
worker.shutdown().await;
21522150
}
21532151

21542152
#[rstest]
@@ -2737,15 +2735,12 @@ async fn use_compatible_version_flag(
27372735
) {
27382736
let wfid = "fake_wf_id";
27392737
let mut mock_client = mock_workflow_client();
2740-
let hist = {
2738+
let t = {
27412739
let mut t = TestHistoryBuilder::default();
27422740
t.add_by_type(EventType::WorkflowExecutionStarted);
27432741
t.add_full_wf_task();
27442742
t
27452743
};
2746-
mock_client.expect_poll_workflow_task().returning(move |_| {
2747-
Ok(hist_to_poll_resp(&hist, wfid.to_owned(), ResponseType::AllHistory).resp)
2748-
});
27492744
let compat_flag_expected = match intent {
27502745
VersioningIntent::Unspecified => !different_tq,
27512746
VersioningIntent::Compatible => true,
@@ -2770,7 +2765,8 @@ async fn use_compatible_version_flag(
27702765
Ok(Default::default())
27712766
});
27722767

2773-
let worker = Worker::new_test(test_worker_cfg().build().unwrap(), mock_client);
2768+
let mock = single_hist_mock_sg(wfid, t, vec![ResponseType::AllHistory], mock_client, true);
2769+
let worker = mock_worker(mock);
27742770
let r = worker.poll_workflow_activation().await.unwrap();
27752771
let task_queue = if different_tq {
27762772
"enchi cat!".to_string()
@@ -2806,6 +2802,7 @@ async fn use_compatible_version_flag(
28062802
.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(r.run_id, cmd))
28072803
.await
28082804
.unwrap();
2805+
worker.shutdown().await;
28092806
}
28102807

28112808
#[tokio::test]

core/src/worker/activities/activity_heartbeat_manager.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,7 @@ mod test {
450450
hm.shutdown().await;
451451
}
452452

453-
/// Ensure that heartbeat can be called from a tight loop without any throttle_interval, resulting in two
454-
/// interactions with the server - one immediately and one after 500ms after the throttle_interval.
453+
/// Ensure that heartbeat can be called from a tight loop and correctly throttle
455454
#[tokio::test]
456455
async fn process_tight_loop_and_shutdown() {
457456
let mut mock_client = mock_workflow_client();

core/src/worker/activities/local_activities.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1246,7 +1246,7 @@ mod tests {
12461246
// Wait more than the timeout before grabbing the task
12471247
sleep(timeout + Duration::from_millis(10)).await;
12481248

1249-
assert!(dbg!(lam.next_pending().await.unwrap()).is_timeout(false));
1249+
assert!(lam.next_pending().await.unwrap().is_timeout(false));
12501250
assert_eq!(lam.num_in_backoff(), 0);
12511251
assert_eq!(lam.num_outstanding(), 0);
12521252
}

core/src/worker/workflow/machines/local_activity_state_machine.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1250,7 +1250,6 @@ mod tests {
12501250
});
12511251

12521252
let mut worker = build_fake_sdk(mock_cfg);
1253-
dbg!("Past thing");
12541253
worker.register_wf(DEFAULT_WORKFLOW_TYPE, la_wf);
12551254
worker.register_activity(
12561255
DEFAULT_ACTIVITY_TYPE,

core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,9 @@ mod tests {
208208
use temporal_sdk_core_api::Worker;
209209
use temporal_sdk_core_protos::{
210210
coresdk::{
211-
workflow_commands::SetPatchMarker, workflow_completion::WorkflowActivationCompletion,
211+
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
212+
workflow_commands::SetPatchMarker,
213+
workflow_completion::WorkflowActivationCompletion,
212214
AsJsonPayloadExt,
213215
},
214216
temporal::api::{
@@ -332,24 +334,38 @@ mod tests {
332334
t.add_has_change_marker(&patch_id, false);
333335
}
334336
t.add_upsert_search_attrs_for_patch(&[patch_id.clone()]);
337+
t.add_we_signaled("hi", vec![]);
335338
t.add_full_wf_task();
336339
t.add_workflow_execution_completed();
337340

338341
let mut mp = MockPollCfg::from_resp_batches(
339342
"fakeid",
340343
t,
341-
[ResponseType::ToTaskNum(1), ResponseType::AllHistory],
344+
[ResponseType::ToTaskNum(1), ResponseType::ToTaskNum(2)],
342345
mock_workflow_client(),
343346
);
344347
// Ensure the upsert command has an empty map when not using the patched command
345348
if !with_patched_cmd {
346349
mp.completion_asserts = Some(Box::new(|wftc| {
347-
assert_matches!(wftc.commands.first().and_then(|c| c.attributes.as_ref()).unwrap(),
348-
Attributes::UpsertWorkflowSearchAttributesCommandAttributes(attrs)
349-
if attrs.search_attributes.as_ref().unwrap().indexed_fields.is_empty())
350+
let cmd_attrs = wftc
351+
.commands
352+
.first()
353+
.and_then(|c| c.attributes.as_ref())
354+
.unwrap();
355+
if matches!(
356+
cmd_attrs,
357+
Attributes::CompleteWorkflowExecutionCommandAttributes(_)
358+
) {
359+
return;
360+
}
361+
assert_matches!(cmd_attrs,
362+
Attributes::UpsertWorkflowSearchAttributesCommandAttributes(attrs)
363+
if attrs.search_attributes.clone().unwrap_or_default().indexed_fields.is_empty());
350364
}));
351365
}
352-
let core = mock_worker(build_mock_pollers(mp));
366+
let mut mock = build_mock_pollers(mp);
367+
mock.worker_cfg(|w| w.max_cached_workflows = 1);
368+
let core = mock_worker(mock);
353369

354370
let mut ver_upsert = HashMap::new();
355371
ver_upsert.insert(
@@ -379,6 +395,12 @@ mod tests {
379395
.unwrap();
380396
// Now ensure that encountering the upsert in history works fine
381397
let act = core.poll_workflow_activation().await.unwrap();
398+
assert_matches!(
399+
act.jobs.as_slice(),
400+
[WorkflowActivationJob {
401+
variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)),
402+
}]
403+
);
382404
core.complete_execution(&act.run_id).await;
383405
}
384406
}

0 commit comments

Comments
 (0)