Skip to content

Commit 5442613

Browse files
Ensure failed nodes are deleted from worker graph
The worker's local task graph should be cleaned up by removing nodes after they succeed or fail, including the children of those nodes. We had a subtle edge case where if marking a *child* task as failed returned Err(...) -- for example due to the server being unavailable -- we would fail to actually remove the parent from the graph, instead early exiting (indeed, killing an entire worker thread). This commit instead logs, but otherwise ignores, errors on marking tasks as failed on the server: if we don't succeed, we may be forced to rerun the task later as the server doesn't know of the failure. This failure *should* be transient though, so it should be OK if we end up rerunning the task as a result. I suspect that we don't 100% handle this situation ideally today -- for example, if a task *always* results in some failure on the server, we will loop indefinitely trying to run it. But I'm not sure why that would happen, and it seems like something that the server is better suited than worker nodes to handle (especially since worker nodes are ideally transient state-wise). The server (likely after the recent mutex removal) will currently return 'database is busy' errors on some requests, which is a separate bug from the one being fixed here, but exacerbates the situations in which this bug arises.
1 parent 1878040 commit 5442613

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

src/runner/graph.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ impl TasksGraph {
223223
.graph
224224
.neighbors_directed(node, Direction::Incoming)
225225
.collect::<Vec<_>>();
226+
let mut last_err = None;
226227
for child in children.drain(..) {
227228
// Don't recursively mark a child as failed if this is not the only parent of the child
228229
let parents = self
@@ -237,7 +238,14 @@ impl TasksGraph {
237238
);
238239
continue;
239240
}
240-
self.mark_as_failed(child, ex, db, state, config, error, result, worker)?;
241+
let res = self.mark_as_failed(child, ex, db, state, config, error, result, worker);
242+
if let Err(err) = res {
243+
// Regardless of whether this child failed, we still want to
244+
// (try to) mark the other children as failed as well as the
245+
// current node.
246+
log::error!("Failure to mark child as failed; skipping: {:?}", err);
247+
last_err = Some(err);
248+
}
241249
}
242250

243251
// We need to mark_as_completed the node here (if it's a task),
@@ -257,6 +265,14 @@ impl TasksGraph {
257265

258266
self.mark_as_completed(node);
259267

268+
// If we would have treated this as a successful mark_as_failed, then
269+
// don't.
270+
if res.is_ok() {
271+
if let Some(err) = last_err {
272+
return Err(err);
273+
}
274+
}
275+
260276
res
261277
}
262278

src/runner/worker.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
9393
}
9494
}
9595

96-
guard.mark_as_failed(
96+
if let Err(e) = guard.mark_as_failed(
9797
id,
9898
self.ex,
9999
self.db,
@@ -102,7 +102,13 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
102102
&e,
103103
result,
104104
&self.name,
105-
)?;
105+
) {
106+
// We don't return an Err(...) from the loop here,
107+
// as this failure shouldn't bring down this worker
108+
// thread -- it can continue to do useful work.
109+
error!("Failed to mark node {:?} as failed", id);
110+
utils::report_failure(&e);
111+
}
106112
} else {
107113
guard.mark_as_completed(id);
108114
}

0 commit comments

Comments
 (0)