Skip to content

Commit 29c89f2

Browse files
authored
Fix flaky tests (#922)
1 parent 24a3c23 commit 29c89f2

File tree

7 files changed

+83
-85
lines changed

7 files changed

+83
-85
lines changed

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ document as your quick reference when submitting pull requests.
1515
- `tests/` – integration, heavy, and manual tests
1616
- `arch_docs/` – architectural design documents
1717
- Contributor guide: `README.md`
18+
- `target/` - This contains compiled files. You never need to look in here.
1819

1920
## Repo Specific Utilities
2021

core/src/core_tests/workflow_tasks.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2528,7 +2528,7 @@ async fn post_terminal_commands_are_retained_when_not_replaying() {
25282528
]);
25292529
_do_post_terminal_commands_test(
25302530
commands_sent_by_lang,
2531-
[ResponseType::ToTaskNum(1), ResponseType::AllHistory],
2531+
[ResponseType::ToTaskNum(1)],
25322532
expected_command_types_emitted,
25332533
t,
25342534
)
@@ -2602,14 +2602,14 @@ async fn _do_post_terminal_commands_test(
26022602

26032603
let act = core.poll_workflow_activation().await.unwrap();
26042604

2605-
core.initiate_shutdown();
26062605
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
26072606
act.run_id,
26082607
commands_sent_by_lang,
26092608
))
26102609
.await
26112610
.unwrap();
26122611

2612+
core.initiate_shutdown();
26132613
let act = core.poll_workflow_activation().await;
26142614
assert_matches!(act.unwrap_err(), PollError::ShutDown);
26152615
core.shutdown().await;
@@ -2685,10 +2685,6 @@ async fn history_length_with_fail_and_timeout(
26852685
#[values(true, false)] use_cache: bool,
26862686
#[values(1, 2, 3)] history_responses_case: u8,
26872687
) {
2688-
if !use_cache && history_responses_case == 3 {
2689-
/* disabled for now because this keeps flaking*/
2690-
return;
2691-
}
26922688
let wfid = "fake_wf_id";
26932689
let mut t = TestHistoryBuilder::default();
26942690
t.add_by_type(EventType::WorkflowExecutionStarted);

core/src/ephemeral_server/mod.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ fn get_free_port(bind_ip: &str) -> io::Result<u16> {
423423
let addr = listen.local_addr()?;
424424

425425
// On Linux and some BSD variants, ephemeral ports are randomized, and may
426-
// consequently repeat within a short time frame after the listenning end
426+
// consequently repeat within a short time frame after the listening end
427427
// has been closed. To avoid this, we make a connection to the port, then
428428
// close that connection from the server's side (this is very important),
429429
// which puts the connection in TIME_WAIT state for some time (by default,
@@ -622,30 +622,12 @@ fn remove_file_past_ttl(ttl: &Option<Duration>, dest: &PathBuf) -> Result<bool,
622622
mod tests {
623623
use super::{get_free_port, remove_file_past_ttl};
624624
use std::{
625-
collections::HashSet,
626625
env::temp_dir,
627626
fs::File,
628627
net::{TcpListener, TcpStream},
629628
time::{Duration, SystemTime},
630629
};
631630

632-
#[test]
633-
fn get_free_port_no_double() {
634-
let host = "127.0.0.1";
635-
let mut port_set = HashSet::new();
636-
637-
for _ in 0..2000 {
638-
let port = get_free_port(host).unwrap();
639-
assert!(
640-
!port_set.contains(&port),
641-
"Port {port} has been assigned more than once"
642-
);
643-
644-
// Add port to the set
645-
port_set.insert(port);
646-
}
647-
}
648-
649631
#[test]
650632
fn get_free_port_can_bind_immediately() {
651633
let host = "127.0.0.1";

core/src/test_help/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,10 +1068,19 @@ impl WorkerExt for Worker {
10681068
);
10691069
},
10701070
async {
1071-
assert_matches!(
1072-
self.poll_workflow_activation().await.unwrap_err(),
1073-
PollError::ShutDown
1074-
);
1071+
loop {
1072+
match self.poll_workflow_activation().await {
1073+
Err(PollError::ShutDown) => break,
1074+
Ok(a) if a.is_only_eviction() => {
1075+
self.complete_workflow_activation(WorkflowActivationCompletion::empty(
1076+
a.run_id,
1077+
))
1078+
.await
1079+
.unwrap();
1080+
}
1081+
o => panic!("Unexpected activation while draining: {o:?}"),
1082+
}
1083+
}
10751084
}
10761085
);
10771086
self.finalize_shutdown().await;

core/src/worker/workflow/mod.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ use std::{
5050
collections::VecDeque,
5151
fmt::Debug,
5252
future::Future,
53-
mem,
5453
ops::DerefMut,
5554
rc::Rc,
5655
result,
@@ -1100,8 +1099,7 @@ struct BufferedTasks {
11001099
/// current one has been processed).
11011100
query_only_tasks: VecDeque<PermittedWFT>,
11021101
/// These are query-only tasks for the *buffered* wft, if any. They will all be discarded if
1103-
/// a buffered wft is replaced before being handled. They move to `query_only_tasks` once the
1104-
/// buffered task is taken.
1102+
/// a buffered wft is replaced before being handled.
11051103
query_only_tasks_for_buffered: VecDeque<PermittedWFT>,
11061104
}
11071105

@@ -1136,9 +1134,13 @@ impl BufferedTasks {
11361134
if let Some(q) = self.query_only_tasks.pop_front() {
11371135
return Some(q);
11381136
}
1139-
if let Some(t) = self.wft.take() {
1140-
self.query_only_tasks = mem::take(&mut self.query_only_tasks_for_buffered);
1141-
return Some(t);
1137+
if self.wft.is_some() {
1138+
if let Some(q) = self.query_only_tasks_for_buffered.pop_front() {
1139+
return Some(q);
1140+
}
1141+
if let Some(t) = self.wft.take() {
1142+
return Some(t);
1143+
}
11421144
}
11431145
None
11441146
}

sdk/src/lib.rs

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ use temporal_sdk_core_protos::{
109109
};
110110
use tokio::{
111111
sync::{
112+
Notify,
112113
mpsc::{UnboundedSender, unbounded_channel},
113114
oneshot,
114115
},
@@ -151,6 +152,7 @@ struct WorkflowHalf {
151152
workflows: RefCell<HashMap<String, WorkflowData>>,
152153
/// Maps workflow type to the function for executing workflow runs with that ID
153154
workflow_fns: RefCell<HashMap<String, WorkflowFunction>>,
155+
workflow_removed_from_map: Notify,
154156
}
155157
struct WorkflowData {
156158
/// Channel used to send the workflow activations
@@ -180,6 +182,7 @@ impl Worker {
180182
workflow_half: WorkflowHalf {
181183
workflows: Default::default(),
182184
workflow_fns: Default::default(),
185+
workflow_removed_from_map: Default::default(),
183186
},
184187
activity_half: ActivityHalf {
185188
activity_fns: Default::default(),
@@ -260,6 +263,7 @@ impl Worker {
260263
join_handle.await??;
261264
debug!(run_id=%run_id, "Removing workflow from cache");
262265
wf_half.workflows.borrow_mut().remove(&run_id);
266+
wf_half.workflow_removed_from_map.notify_one();
263267
Ok(())
264268
}
265269
},
@@ -293,12 +297,15 @@ impl Worker {
293297
if let Some(ref i) = common.worker_interceptor {
294298
i.on_workflow_activation(&activation).await?;
295299
}
296-
if let Some(wf_fut) = wf_half.workflow_activation_handler(
297-
common,
298-
shutdown_token.clone(),
299-
activation,
300-
&completions_tx,
301-
)? && wf_future_tx.send(wf_fut).is_err()
300+
if let Some(wf_fut) = wf_half
301+
.workflow_activation_handler(
302+
common,
303+
shutdown_token.clone(),
304+
activation,
305+
&completions_tx,
306+
)
307+
.await?
308+
&& wf_future_tx.send(wf_fut).is_err()
302309
{
303310
panic!("Receive half of completion processor channel cannot be dropped");
304311
}
@@ -384,7 +391,7 @@ impl Worker {
384391

385392
impl WorkflowHalf {
386393
#[allow(clippy::type_complexity)]
387-
fn workflow_activation_handler(
394+
async fn workflow_activation_handler(
388395
&self,
389396
common: &CommonWorker,
390397
shutdown_token: CancellationToken,
@@ -408,26 +415,29 @@ impl WorkflowHalf {
408415
_ => None,
409416
}) {
410417
let workflow_type = &sw.workflow_type;
411-
let wf_fns_borrow = self.workflow_fns.borrow();
412-
let Some(wf_function) = wf_fns_borrow.get(workflow_type) else {
413-
warn!("Workflow type {workflow_type} not found");
418+
let (wff, activations) = {
419+
let wf_fns_borrow = self.workflow_fns.borrow();
420+
421+
let Some(wf_function) = wf_fns_borrow.get(workflow_type) else {
422+
warn!("Workflow type {workflow_type} not found");
423+
424+
completions_tx
425+
.send(WorkflowActivationCompletion::fail(
426+
run_id,
427+
format!("Workflow type {workflow_type} not found").into(),
428+
Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
429+
))
430+
.expect("Completion channel intact");
431+
return Ok(None);
432+
};
414433

415-
completions_tx
416-
.send(WorkflowActivationCompletion::fail(
417-
run_id,
418-
format!("Workflow type {workflow_type} not found").into(),
419-
Some(WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure),
420-
))
421-
.expect("Completion channel intact");
422-
return Ok(None);
434+
wf_function.start_workflow(
435+
common.worker.get_config().namespace.clone(),
436+
common.task_queue.clone(),
437+
std::mem::take(sw),
438+
completions_tx.clone(),
439+
)
423440
};
424-
425-
let (wff, activations) = wf_function.start_workflow(
426-
common.worker.get_config().namespace.clone(),
427-
common.task_queue.clone(),
428-
std::mem::take(sw),
429-
completions_tx.clone(),
430-
);
431441
let jh = tokio::spawn(async move {
432442
tokio::select! {
433443
r = wff.fuse() => r,
@@ -442,6 +452,17 @@ impl WorkflowHalf {
442452
join_handle: jh,
443453
run_id: run_id.clone(),
444454
});
455+
loop {
456+
// It's possible that we've got a new initialize workflow action before the last
457+
// future for this run finished evicting, as a result of how futures might be
458+
// interleaved. In that case, just wait until it's not in the map, which should be
459+
// a matter of only a few `poll` calls.
460+
if self.workflows.borrow_mut().contains_key(&run_id) {
461+
self.workflow_removed_from_map.notified().await;
462+
} else {
463+
break;
464+
}
465+
}
445466
self.workflows.borrow_mut().insert(
446467
run_id.clone(),
447468
WorkflowData {
@@ -474,7 +495,8 @@ impl WorkflowHalf {
474495
return Ok(None);
475496
}
476497

477-
// In all other cases, we want to panic as the runtime could be in an inconsistent state at this point.
498+
// In all other cases, we want to error as the runtime could be in an inconsistent state
499+
// at this point.
478500
bail!(
479501
"Got activation {:?} for unknown workflow {}",
480502
activation,

sdk/src/workflow_future.rs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ impl WorkflowFuture {
177177
&mut self,
178178
variant: Option<Variant>,
179179
outgoing_cmds: &mut Vec<WorkflowCommand>,
180-
) -> Result<bool, Error> {
180+
) -> Result<(), Error> {
181181
if let Some(v) = variant {
182182
match v {
183183
Variant::InitializeWorkflow(_) => {
@@ -326,17 +326,14 @@ impl WorkflowFuture {
326326
))?
327327
}
328328
Variant::RemoveFromCache(_) => {
329-
// TODO: Need to abort any user-spawned tasks, etc. See also cancel WF.
330-
// How best to do this in executor agnostic way? Is that possible?
331-
// -- tokio JoinSet does this in a nice way.
332-
return Ok(true);
329+
unreachable!("Cache removal should happen higher up");
333330
}
334331
}
335332
} else {
336333
bail!("Empty activation job variant");
337334
}
338335

339-
Ok(false)
336+
Ok(())
340337
}
341338
}
342339

@@ -370,7 +367,6 @@ impl Future for WorkflowFuture {
370367
.map(Into::into);
371368
}
372369

373-
let mut die_of_eviction_when_done = false;
374370
let mut activation_cmds = vec![];
375371
// Lame hack to avoid hitting "unregistered" update handlers in a situation where
376372
// the history has no commands until an update is accepted. Will go away w/ SDK redesign
@@ -391,19 +387,6 @@ impl Future for WorkflowFuture {
391387
}
392388
}
393389

394-
for WorkflowActivationJob { variant } in activation.jobs {
395-
match self.handle_job(variant, &mut activation_cmds) {
396-
Ok(true) => {
397-
die_of_eviction_when_done = true;
398-
}
399-
Err(e) => {
400-
self.fail_wft(run_id, e);
401-
continue 'activations;
402-
}
403-
_ => (),
404-
}
405-
}
406-
407390
if is_only_eviction {
408391
// No need to do anything with the workflow code in this case
409392
self.outgoing_completions
@@ -412,6 +395,13 @@ impl Future for WorkflowFuture {
412395
return Ok(WfExitValue::Evicted).into();
413396
}
414397

398+
for WorkflowActivationJob { variant } in activation.jobs {
399+
if let Err(e) = self.handle_job(variant, &mut activation_cmds) {
400+
self.fail_wft(run_id, e);
401+
continue 'activations;
402+
}
403+
}
404+
415405
// Drive update functions
416406
self.update_futures = std::mem::take(&mut self.update_futures)
417407
.into_iter()
@@ -450,10 +440,6 @@ impl Future for WorkflowFuture {
450440

451441
self.send_completion(run_id, activation_cmds);
452442

453-
if die_of_eviction_when_done {
454-
return Ok(WfExitValue::Evicted).into();
455-
}
456-
457443
// We don't actually return here, since we could be queried after finishing executing,
458444
// and it allows us to rely on evictions for death and cache management
459445
}

0 commit comments

Comments
 (0)