Skip to content

Commit e8f8ace

Browse files
committed
Auto merge of #569 - Mark-Simulacrum:avoid-parking, r=pietroalbini
Adjust some of the code around the worker deadlock This switches to a Condvar associated with the graph lock to maintain the blocked worker pool. This, in and of itself, is just a simplification, but eases fixes for these two cases: * mark_as_failed is called with a try!/? operator, which means that even if progress was made on some parts of a task, we may not have reached the unparking code. The notification is now moved up to just after the graph lock is re-acquired; this ensures that regardless of what happens, other threads will have a chance to run. * Finished did not unpark any blocked threads. In practice, I suspect that the second of these is the cause of our bug. The following is an excerpt of the log before worker-7 stalls out in thread park (in the original version of this code). worker-7 blocks on the root node, the other workers all do so as well and exit via Finished without waking worker-7. With the new code, worker-7 would get woken on each finish for the other workers, letting it also notice that the root is finished and exit. ``` worker-7 | NodeIndex(40): this is blocked worker-7 | NodeIndex(0): this is blocked marking node running: cleanup of crate kivo360/rusty_web_app as complete worker-4 | NodeIndex(0): walked to node root worker-4 | NodeIndex(0): neighbors: [NodeIndex(40)] worker-4 | NodeIndex(40): walked to node crate completed worker-4 | NodeIndex(40): neighbors: [] worker-4 | NodeIndex(40): marked as complete marking node crate completed as complete worker-6 | NodeIndex(0): walked to node root worker-6 | NodeIndex(0): neighbors: [] worker-8 | NodeIndex(0): walked to node root worker-8 | NodeIndex(0): neighbors: [] worker-9 | NodeIndex(0): walked to node root worker-9 | NodeIndex(0): neighbors: [] worker-3 | NodeIndex(0): walked to node root worker-3 | NodeIndex(0): neighbors: [] worker-2 | NodeIndex(0): walked to node root worker-2 | NodeIndex(0): neighbors: [] worker-0 | NodeIndex(0): walked to node root worker-0 | NodeIndex(0): neighbors: [] worker-5 | NodeIndex(0): walked to node root worker-5 | NodeIndex(0): neighbors: [] worker-1 | NodeIndex(0): walked to node root worker-1 | NodeIndex(0): neighbors: [] ``` r? `@pietroalbini`
2 parents c05fd09 + be4e65b commit e8f8ace

File tree

2 files changed

+20
-32
lines changed

2 files changed

+20
-32
lines changed

src/runner/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ use rustwide::logging::LogStorage;
1616
use rustwide::Workspace;
1717
use std::collections::HashMap;
1818
use std::path::Path;
19-
use std::sync::Mutex;
20-
use std::thread;
19+
use std::sync::{Condvar, Mutex};
2120
use std::time::Duration;
2221

2322
const DISK_SPACE_WATCHER_INTERVAL: Duration = Duration::from_secs(300);
@@ -63,6 +62,7 @@ pub fn run_ex<DB: WriteResults + Sync>(
6362

6463
info!("computing the tasks graph...");
6564
let graph = Mutex::new(build_graph(ex, crates, config));
65+
let parked_threads = Condvar::new();
6666

6767
info!("preparing the execution...");
6868
for tc in &ex.toolchains {
@@ -74,9 +74,6 @@ pub fn run_ex<DB: WriteResults + Sync>(
7474

7575
info!("running tasks in {} threads...", threads_count);
7676

77-
// An HashMap is used instead of an HashSet because Thread is not Eq+Hash
78-
let parked_threads: Mutex<HashMap<thread::ThreadId, thread::Thread>> =
79-
Mutex::new(HashMap::new());
8077
let state = RunnerState::new();
8178

8279
let workers = (0..threads_count)

src/runner/worker.rs

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@ use crate::runner::graph::{TasksGraph, WalkResult};
66
use crate::runner::{OverrideResult, RunnerState};
77
use crate::utils;
88
use rustwide::{BuildDirectory, Workspace};
9-
use std::collections::HashMap;
109
use std::sync::Condvar;
1110
use std::sync::{
1211
atomic::{AtomicBool, Ordering},
1312
Mutex,
1413
};
15-
use std::thread;
1614
use std::time::Duration;
1715

1816
pub(super) struct Worker<'a, DB: WriteResults + Sync> {
@@ -24,7 +22,7 @@ pub(super) struct Worker<'a, DB: WriteResults + Sync> {
2422
graph: &'a Mutex<TasksGraph>,
2523
state: &'a RunnerState,
2624
db: &'a DB,
27-
parked_threads: &'a Mutex<HashMap<thread::ThreadId, thread::Thread>>,
25+
parked_threads: &'a Condvar,
2826
target_dir_cleanup: AtomicBool,
2927
}
3028

@@ -37,7 +35,7 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
3735
graph: &'a Mutex<TasksGraph>,
3836
state: &'a RunnerState,
3937
db: &'a DB,
40-
parked_threads: &'a Mutex<HashMap<thread::ThreadId, thread::Thread>>,
38+
parked_threads: &'a Condvar,
4139
) -> Self {
4240
Worker {
4341
build_dir: Mutex::new(workspace.build_dir(&name)),
@@ -59,15 +57,13 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
5957

6058
pub(super) fn run(&self) -> Fallible<()> {
6159
// This uses a `loop` instead of a `while let` to avoid locking the graph too much
60+
let mut guard = self.graph.lock().unwrap();
6261
loop {
6362
self.maybe_cleanup_target_dir()?;
64-
let walk_result = self
65-
.graph
66-
.lock()
67-
.unwrap()
68-
.next_task(self.ex, self.db, &self.name);
63+
let walk_result = guard.next_task(self.ex, self.db, &self.name);
6964
match walk_result {
7065
WalkResult::Task(id, task) => {
66+
drop(guard);
7167
info!("running task: {:?}", task);
7268
let res = task.run(
7369
self.config,
@@ -77,6 +73,9 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
7773
self.db,
7874
self.state,
7975
);
76+
guard = self.graph.lock().unwrap();
77+
// Regardless of how this ends, they should get woken up.
78+
self.parked_threads.notify_all();
8079
if let Err(e) = res {
8180
error!("task failed, marking childs as failed too: {:?}", task);
8281
utils::report_failure(&e);
@@ -94,7 +93,7 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
9493
}
9594
}
9695

97-
self.graph.lock().unwrap().mark_as_failed(
96+
guard.mark_as_failed(
9897
id,
9998
self.ex,
10099
self.db,
@@ -105,28 +104,20 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
105104
&self.name,
106105
)?;
107106
} else {
108-
self.graph.lock().unwrap().mark_as_completed(id);
109-
}
110-
111-
// Unpark all the threads
112-
let mut parked = self.parked_threads.lock().unwrap();
113-
for (_id, thread) in parked.drain() {
114-
thread.unpark();
107+
guard.mark_as_completed(id);
115108
}
116109
}
117110
WalkResult::Blocked => {
118-
// Wait until another thread finished before looking for tasks again
119-
// If the thread spuriously wake up (parking does not guarantee no
120-
// spurious wakeups) it's not a big deal, it will just get parked again
121-
{
122-
let mut parked_threads = self.parked_threads.lock().unwrap();
123-
let current = thread::current();
124-
parked_threads.insert(current.id(), current);
125-
}
126-
thread::park();
111+
guard = self.parked_threads.wait(guard).unwrap();
127112
}
128113
WalkResult::NotBlocked => unreachable!("NotBlocked leaked from the run"),
129-
WalkResult::Finished => break,
114+
WalkResult::Finished => {
115+
// A blocked thread may be waiting on the root node, in
116+
// which case this is crucial to avoiding a deadlock.
117+
self.parked_threads.notify_all();
118+
drop(guard);
119+
break;
120+
}
130121
}
131122
}
132123

0 commit comments

Comments
 (0)