2
2
handle_fault(...)
3
3
4
4
An internal function to handle a worker dying or being killed by the OS.
5
- Attempts to determine which `Thunk`s require rescheduling based on a
6
- "deadlist", and then corrects the scheduler's internal `ComputeState` struct
7
- to recover from the fault.
5
+ Attempts to determine which `Thunk`s were running on (or had their results
6
+ cached on) the dead worker, and stores them in a "deadlist". It uses this
7
+ deadlist to correct the scheduler's internal `ComputeState` struct to recover
8
+ from the fault.
8
9
9
10
Note: The logic for this functionality is not currently perfectly robust to
10
11
all failure modes, and is only really intended as a last-ditch attempt to
@@ -13,65 +14,32 @@ of DAGs, it *may* cause a `KeyError` or other failures in the scheduler due to
13
14
the complexity of getting the internal state back to a consistent and proper
14
15
state.
15
16
"""
16
- function handle_fault (ctx, state, oldproc)
17
- # Find thunks whose results were cached on the dead worker and place them
18
- # on what's called a "deadlist". This structure will direct the recovery
19
- # of the scheduler's state.
17
+ function handle_fault (ctx, state, deadproc)
18
+ @assert ! isempty (procs (ctx)) " No workers left for fault handling!"
19
+
20
20
deadlist = Thunk[]
21
+
22
+ # Evict cache entries that were stored on the worker
21
23
for t in keys (state. cache)
22
24
v = state. cache[t]
23
- if v isa Chunk && v. handle isa DRef && v. handle. owner == oldproc . pid
25
+ if v isa Chunk && v. handle isa DRef && v. handle. owner == deadproc . pid
24
26
push! (deadlist, t)
25
- # Any inputs to dead cached thunks must be rescheduled
26
- function bfs! (deadlist, t)
27
- for input in t. inputs
28
- istask (input) || continue
29
- ! (input in deadlist) && push! (deadlist, input)
30
- bfs! (deadlist, input)
31
- end
32
- end
33
- bfs! (deadlist, t)
27
+ pop! (state. cache, t)
34
28
end
35
29
end
36
- # TODO : Find *all* thunks who were actively running on the dead worker
37
- # TODO : Set thunk.cache to nothing
38
-
39
- # Empty cache of dead thunks
40
- for ct in keys (state. cache)
41
- if ct in deadlist
42
- delete! (state. cache, ct)
43
- end
44
- end
45
-
46
- function fix_waitdicts! (state, deadlist, t:: Thunk ; isleaf= false )
47
- waiting, waiting_data = state. waiting, state. waiting_data
48
- if ! (t in keys (waiting))
49
- waiting[t] = Set {Thunk} ()
50
- end
51
- if ! isleaf
52
- # If we aren't a leaf thunk, then we may still need to recover
53
- # further into the DAG
54
- for input in t. inputs
55
- istask (input) || continue
56
- @assert haskey (waiting, t) " Error: $t not in state.waiting"
57
- push! (waiting[t], input)
58
- push! (waiting_data[input], t)
59
- isleaf = ! (input in deadlist)
60
- fix_waitdicts! (state, deadlist, input; isleaf= isleaf)
61
- end
62
- end
63
- if isempty (waiting[t])
64
- delete! (waiting, t)
30
+ # Remove thunks that were running on the worker
31
+ for t in collect (keys (state. running_on))
32
+ pid = state. running_on[t]. pid
33
+ if pid == deadproc. pid
34
+ push! (deadlist, t)
35
+ delete! (state. running_on, t)
65
36
end
66
37
end
67
-
68
- # Add state.waiting deps back to state.waiting
69
- for ot in keys (state. waiting)
70
- fix_waitdicts! (state, deadlist, ot)
38
+ # Clear thunk.cache_ref
39
+ for t in deadlist
40
+ t. cache_ref = nothing
71
41
end
72
42
73
- # fix_waitdicts!(state, deadlist, thunk)
74
-
75
43
# Remove thunks from state.ready that have inputs on the deadlist
76
44
for idx in length (state. ready): - 1 : 1
77
45
rt = state. ready[idx]
@@ -80,40 +48,10 @@ function handle_fault(ctx, state, oldproc)
80
48
end
81
49
end
82
50
83
- # Remove dead thunks from state.running, and add state.running
84
- # deps back to state.waiting
85
- wasrunning = copy (state. running)
86
- empty! (state. running)
87
- while ! isempty (wasrunning)
88
- temp = pop! (wasrunning)
89
- if temp isa Thunk
90
- if ! (temp in deadlist)
91
- push! (state. running, temp)
92
- end
93
- fix_waitdicts! (state, deadlist, temp)
94
- elseif temp isa Vector
95
- newtemp = []
96
- for t in temp
97
- fix_waitdicts! (state, deadlist, t)
98
- if ! (t in deadlist)
99
- push! (newtemp, t)
100
- end
101
- end
102
- isempty (newtemp) || push! (state. running, newtemp)
103
- else
104
- throw (" Unexpected type in recovery: $temp " )
105
- end
106
- end
107
-
108
51
# Reschedule inputs from deadlist
109
- @assert ! isempty (procs (ctx)) " No workers left for fault handling!"
110
- while length (deadlist) > 0
111
- dt = popfirst! (deadlist)
112
- if any ((input in deadlist) for input in dt. inputs)
113
- # We need to schedule our input thunks first
114
- continue
115
- end
116
- push! (state. ready, dt)
52
+ seen = Dict {Thunk,Bool} ()
53
+ for t in deadlist
54
+ reschedule_inputs! (state, t, seen)
117
55
end
118
56
schedule! (ctx, state)
119
57
end
0 commit comments