Skip to content

Commit 9d8d9f8

Browse files
authored
Fix some unit test flakes (#703)
1 parent 764a88b commit 9d8d9f8

File tree

9 files changed

+58
-30
lines changed

9 files changed

+58
-30
lines changed

core/src/core_tests/workers.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,15 +175,14 @@ async fn complete_with_task_not_found_during_shutdown() {
175175
// Initiate shutdown before completing the activation
176176
let shutdown_fut = async {
177177
core.shutdown().await;
178-
complete_order.borrow_mut().push(3);
178+
complete_order.borrow_mut().push(2);
179179
};
180180
let poll_fut = async {
181181
// This will return shutdown once the completion goes through
182182
assert_matches!(
183183
core.poll_workflow_activation().await.unwrap_err(),
184184
PollWfError::ShutDown
185185
);
186-
complete_order.borrow_mut().push(2);
187186
};
188187
let complete_fut = async {
189188
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
@@ -195,7 +194,7 @@ async fn complete_with_task_not_found_during_shutdown() {
195194
complete_order.borrow_mut().push(1);
196195
};
197196
tokio::join!(shutdown_fut, poll_fut, complete_fut);
198-
assert_eq!(&complete_order.into_inner(), &[1, 2, 3])
197+
assert_eq!(&complete_order.into_inner(), &[1, 2])
199198
}
200199

201200
#[tokio::test]

core/src/test_help/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,10 @@ pub(crate) fn build_fake_worker(
139139

140140
pub(crate) fn build_fake_sdk(mock_cfg: MockPollCfg) -> temporal_sdk::Worker {
141141
let mut mock = build_mock_pollers(mock_cfg);
142-
mock.worker_cfg(|c| c.max_cached_workflows = 1);
142+
mock.worker_cfg(|c| {
143+
c.max_cached_workflows = 1;
144+
c.ignore_evicts_on_shutdown = false;
145+
});
143146
let core = mock_worker(mock);
144147
let mut worker = temporal_sdk::Worker::new_from_core(Arc::new(core), "replay_q".to_string());
145148
worker.set_worker_interceptor(FailOnNondeterminismInterceptor {});

core/src/worker/activities/local_activities.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,19 @@ impl LocalActivityManager {
378378
let (new_or_retry, permit) = match self.rcvs.lock().await.next().await? {
379379
NewOrCancel::Cancel(c) => {
380380
return match c {
381-
CancelOrTimeout::Cancel(c) => Some(NextPendingLAAction::Dispatch(c)),
381+
CancelOrTimeout::Cancel(c) => {
382+
if self
383+
.dat
384+
.lock()
385+
.outstanding_activity_tasks
386+
.contains_key(c.task_token.as_slice())
387+
{
388+
Some(NextPendingLAAction::Dispatch(c))
389+
} else {
390+
// Don't dispatch cancels for things we've already stopped tracking
391+
None
392+
}
393+
}
382394
CancelOrTimeout::Timeout { run_id, resolution } => {
383395
let tt = self
384396
.dat

core/src/worker/workflow/machines/timer_state_machine.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,8 +310,8 @@ mod test {
310310
mock_cfg.num_expected_fails = 1;
311311
mock_cfg.expect_fail_wft_matcher = Box::new(move |_, cause, f| {
312312
matches!(cause, WorkflowTaskFailedCause::NonDeterministicError)
313-
&& matches!(f, Some(Failure { source, .. })
314-
if source.contains("Timer fired event did not have expected timer id 1"))
313+
&& matches!(f, Some(Failure {message, .. })
314+
if message.contains("Timer fired event did not have expected timer id 1"))
315315
});
316316
let mut worker = build_fake_sdk(mock_cfg);
317317
worker.register_wf(DEFAULT_WORKFLOW_TYPE, |ctx: WfContext| async move {

core/src/worker/workflow/managed_run.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -968,11 +968,10 @@ impl ManagedRun {
968968
} else {
969969
WorkflowTaskFailedCause::Unspecified
970970
};
971-
let wft_fail_str = format!("{:?}", fail.source);
972971
self.failed_completion(
973972
fail_cause,
974973
fail.source.evict_reason(),
975-
Failure::application_failure(wft_fail_str, false).into(),
974+
fail.source.as_failure(),
976975
true,
977976
Some(resp_chan),
978977
)

core/src/worker/workflow/mod.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ impl Workflows {
288288
self.activation_completed(
289289
WorkflowActivationCompletion {
290290
run_id,
291-
status: Some(auto_fail_to_complete_status(machines_err)),
291+
status: Some(machines_err.as_failure().into()),
292292
},
293293
true,
294294
Option::<Box<dyn Fn(PostActivateHookData) + Send>>::None,
@@ -1297,6 +1297,18 @@ impl WFMachinesError {
12971297
WFMachinesError::Fatal(_) => EvictionReason::Fatal,
12981298
}
12991299
}
1300+
1301+
pub fn as_failure(&self) -> Failure {
1302+
Failure {
1303+
failure: Some(
1304+
temporal_sdk_core_protos::temporal::api::failure::v1::Failure::application_failure(
1305+
self.to_string(),
1306+
false,
1307+
),
1308+
),
1309+
force_cause: WorkflowTaskFailedCause::from(self.evict_reason()) as i32,
1310+
}
1311+
}
13001312
}
13011313

13021314
impl From<TimestampError> for WFMachinesError {
@@ -1311,22 +1323,6 @@ impl From<anyhow::Error> for WFMachinesError {
13111323
}
13121324
}
13131325

1314-
fn auto_fail_to_complete_status(err: WFMachinesError) -> workflow_activation_completion::Status {
1315-
workflow_activation_completion::Status::Failed(Failure {
1316-
failure: Some(
1317-
temporal_sdk_core_protos::temporal::api::failure::v1::Failure {
1318-
message: "Error while processing workflow task".to_string(),
1319-
source: err.to_string(),
1320-
stack_trace: "".to_string(),
1321-
encoded_attributes: None,
1322-
cause: None,
1323-
failure_info: None,
1324-
},
1325-
),
1326-
force_cause: WorkflowTaskFailedCause::from(err.evict_reason()) as i32,
1327-
})
1328-
}
1329-
13301326
pub(crate) trait LocalActivityRequestSink: Send + Sync + 'static {
13311327
fn sink_reqs(&self, reqs: Vec<LocalActRequest>) -> Vec<LocalActivityResolution>;
13321328
}

sdk-core-protos/src/task_token.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use base64::{prelude::BASE64_STANDARD, Engine};
2-
use std::fmt::{Debug, Display, Formatter};
2+
use std::{
3+
borrow::Borrow,
4+
fmt::{Debug, Display, Formatter},
5+
};
36

47
static LOCAL_ACT_TASK_TOKEN_PREFIX: &[u8] = b"local_act_";
58

@@ -43,6 +46,12 @@ impl Debug for TaskToken {
4346
}
4447
}
4548

49+
impl Borrow<[u8]> for TaskToken {
50+
fn borrow(&self) -> &[u8] {
51+
self.0.as_slice()
52+
}
53+
}
54+
4655
pub fn fmt_tt(tt: &[u8]) -> String {
4756
BASE64_STANDARD.encode(tt)
4857
}

sdk/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,10 @@ impl ActivityHalf {
521521
});
522522
}
523523
Some(activity_task::Variant::Cancel(_)) => {
524-
if let Some(ct) = self.task_tokens_to_cancels.get(&activity.task_token.into()) {
524+
if let Some(ct) = self
525+
.task_tokens_to_cancels
526+
.get(activity.task_token.as_slice())
527+
{
525528
ct.cancel();
526529
}
527530
}

test-utils/src/lib.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub mod workflows;
1111
pub use temporal_sdk_core::replay::HistoryForReplay;
1212

1313
use crate::stream::{Stream, TryStreamExt};
14-
use anyhow::Context;
14+
use anyhow::{Context, Error};
1515
use base64::{prelude::BASE64_STANDARD, Engine};
1616
use futures::{future, stream, stream::FuturesUnordered, StreamExt};
1717
use parking_lot::Mutex;
@@ -47,6 +47,7 @@ use temporal_sdk_core_api::{
4747
};
4848
use temporal_sdk_core_protos::{
4949
coresdk::{
50+
workflow_activation::WorkflowActivation,
5051
workflow_commands::{
5152
workflow_command, ActivityCancellationType, CompleteWorkflowExecution, QueryResult,
5253
QuerySuccess, ScheduleActivity, ScheduleLocalActivity, StartTimer,
@@ -510,7 +511,7 @@ impl TestWorker {
510511
});
511512
}
512513

513-
/// Runs until all expected workflows have completed
514+
/// Runs until all expected workflows have completed and then shuts down the worker
514515
pub async fn run_until_done(&mut self) -> Result<(), anyhow::Error> {
515516
self.run_until_done_intercepted(Option::<TestWorkerCompletionIceptor>::None)
516517
.await
@@ -604,6 +605,12 @@ impl WorkerInterceptor for TestWorkerCompletionIceptor {
604605
n.on_shutdown(sdk_worker);
605606
}
606607
}
608+
async fn on_workflow_activation(&self, a: &WorkflowActivation) -> Result<(), Error> {
609+
if let Some(n) = self.next.as_ref() {
610+
n.on_workflow_activation(a).await?;
611+
}
612+
Ok(())
613+
}
607614
}
608615

609616
/// Returns the client options used to connect to the server used for integration tests.

0 commit comments

Comments
 (0)