Skip to content

Commit db4c55c

Browse files
vchuravyKristofferC
authored andcommitted
[Distributed] make finalizer messages threadsafe (JuliaLang/julia#42240)
(cherry picked from commit f3e4ef5)
1 parent e923382 commit db4c55c

File tree

3 files changed

+75
-30
lines changed

3 files changed

+75
-30
lines changed

src/cluster.jl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,10 @@ end
9595
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
9696
mutable struct Worker
9797
id::Int
98-
del_msgs::Array{Any,1}
98+
msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
99+
del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels?
99100
add_msgs::Array{Any,1}
100-
gcflag::Bool
101+
@atomic gcflag::Bool
101102
state::WorkerState
102103
c_state::Condition # wait for state changes
103104
ct_time::Float64 # creation time
@@ -133,7 +134,7 @@ mutable struct Worker
133134
if haskey(map_pid_wrkr, id)
134135
return map_pid_wrkr[id]
135136
end
136-
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
137+
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Condition(), time(), conn_func)
137138
w.initialized = Event()
138139
register_worker(w)
139140
w
@@ -471,6 +472,10 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
471472
# The `launch` method should add an object of type WorkerConfig for every
472473
# worker launched. It provides information required on how to connect
473474
# to it.
475+
476+
# FIXME: launched should be a Channel, launch_ntfy should be a Threads.Condition
477+
# but both are part of the public interface. This means we currently can't use
478+
# `Threads.@spawn` in the code below.
474479
launched = WorkerConfig[]
475480
launch_ntfy = Condition()
476481

src/messages.jl

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -126,23 +126,30 @@ function flush_gc_msgs(w::Worker)
126126
if !isdefined(w, :w_stream)
127127
return
128128
end
129-
w.gcflag = false
130-
new_array = Any[]
131-
msgs = w.add_msgs
132-
w.add_msgs = new_array
133-
if !isempty(msgs)
134-
remote_do(add_clients, w, msgs)
135-
end
129+
add_msgs = nothing
130+
del_msgs = nothing
131+
@lock w.msg_lock begin
132+
if !w.gcflag # No work needed for this worker
133+
return
134+
end
135+
@atomic w.gcflag = false
136+
if !isempty(w.add_msgs)
137+
add_msgs = w.add_msgs
138+
w.add_msgs = Any[]
139+
end
136140

137-
# del_msgs gets populated by finalizers, so be very careful here about ordering of allocations
138-
# XXX: threading requires this to be atomic
139-
new_array = Any[]
140-
msgs = w.del_msgs
141-
w.del_msgs = new_array
142-
if !isempty(msgs)
143-
#print("sending delete of $msgs\n")
144-
remote_do(del_clients, w, msgs)
141+
if !isempty(w.del_msgs)
142+
del_msgs = w.del_msgs
143+
w.del_msgs = Any[]
144+
end
145+
end
146+
if add_msgs !== nothing
147+
remote_do(add_clients, w, add_msgs)
148+
end
149+
if del_msgs !== nothing
150+
remote_do(del_clients, w, del_msgs)
145151
end
152+
return
146153
end
147154

148155
# Boundary inserted between messages on the wire, used for recovering
@@ -187,7 +194,7 @@ end
187194
function flush_gc_msgs()
188195
try
189196
for w in (PGRP::ProcessGroup).workers
190-
if isa(w,Worker) && w.gcflag && (w.state == W_CONNECTED)
197+
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
191198
flush_gc_msgs(w)
192199
end
193200
end

src/remotecall.jl

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -256,14 +256,27 @@ function del_clients(pairs::Vector)
256256
end
257257
end
258258

259-
const any_gc_flag = Condition()
259+
# The task below is coalescing the `flush_gc_msgs` call
260+
# across multiple producers, see `send_del_client`,
261+
# and `send_add_client`.
262+
# XXX: Is this worth the additional complexity?
263+
# `flush_gc_msgs` has to iterate over all connected workers.
264+
const any_gc_flag = Threads.Condition()
260265
function start_gc_msgs_task()
261-
errormonitor(@async while true
262-
wait(any_gc_flag)
263-
flush_gc_msgs()
264-
end)
266+
errormonitor(
267+
Threads.@spawn begin
268+
while true
269+
lock(any_gc_flag) do
270+
# this might miss events
271+
wait(any_gc_flag)
272+
end
273+
flush_gc_msgs() # handles throws internally
274+
end
275+
end
276+
)
265277
end
266278

279+
# Function can be called within a finalizer
267280
function send_del_client(rr)
268281
if rr.where == myid()
269282
del_client(rr)
@@ -281,11 +294,27 @@ function send_del_client_no_lock(rr)
281294
end
282295
end
283296

297+
function publish_del_msg!(w::Worker, msg)
298+
lock(w.msg_lock) do
299+
push!(w.del_msgs, msg)
300+
@atomic w.gcflag = true
301+
end
302+
lock(any_gc_flag) do
303+
notify(any_gc_flag)
304+
end
305+
end
306+
284307
function process_worker(rr)
285308
w = worker_from_id(rr.where)::Worker
286-
push!(w.del_msgs, (remoteref_id(rr), myid()))
287-
w.gcflag = true
288-
notify(any_gc_flag)
309+
msg = (remoteref_id(rr), myid())
310+
311+
# Needs to aquire a lock on the del_msg queue
312+
T = Threads.@spawn begin
313+
publish_del_msg!($w, $msg)
314+
end
315+
Base.errormonitor(T)
316+
317+
return
289318
end
290319

291320
function add_client(id, client)
@@ -310,9 +339,13 @@ function send_add_client(rr::AbstractRemoteRef, i)
310339
# to the processor that owns the remote ref. it will add_client
311340
# itself inside deserialize().
312341
w = worker_from_id(rr.where)
313-
push!(w.add_msgs, (remoteref_id(rr), i))
314-
w.gcflag = true
315-
notify(any_gc_flag)
342+
lock(w.msg_lock) do
343+
push!(w.add_msgs, (remoteref_id(rr), i))
344+
@atomic w.gcflag = true
345+
end
346+
lock(any_gc_flag) do
347+
notify(any_gc_flag)
348+
end
316349
end
317350
end
318351

0 commit comments

Comments
 (0)