Skip to content

Commit a9eaacb

Browse files
committed
Always access worker state atomically
1 parent ac60df7 commit a9eaacb

File tree

3 files changed

+12
-12
lines changed

3 files changed

+12
-12
lines changed

src/cluster.jl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ function set_worker_state(w, state)
151151
end
152152

153153
function check_worker_state(w::Worker)
154-
if w.state === W_CREATED
154+
if (@atomic w.state) === W_CREATED
155155
if !isclusterlazy()
156156
if PGRP.topology === :all_to_all
157157
# Since higher pids connect with lower pids, the remote worker
@@ -190,7 +190,7 @@ function exec_conn_func(w::Worker)
190190
end
191191

192192
function wait_for_conn(w)
193-
if w.state === W_CREATED
193+
if (@atomic w.state) === W_CREATED
194194
timeout = worker_timeout() - (time() - w.ct_time)
195195
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
196196

@@ -654,7 +654,7 @@ function create_worker(manager, wconfig)
654654
for jw in PGRP.workers
655655
if (jw.id != 1) && (jw.id < w.id)
656656
# wait for wl to join
657-
if jw.state === W_CREATED
657+
if (@atomic jw.state) === W_CREATED
658658
lock(jw.c_state) do
659659
wait(jw.c_state)
660660
end
@@ -682,7 +682,7 @@ function create_worker(manager, wconfig)
682682

683683
for wl in wlist
684684
lock(wl.c_state) do
685-
if wl.state === W_CREATED
685+
if (@atomic wl.state) === W_CREATED
686686
# wait for wl to join
687687
wait(wl.c_state)
688688
end
@@ -884,7 +884,7 @@ function nprocs()
884884
n = length(PGRP.workers)
885885
# filter out workers in the process of being setup/shutdown.
886886
for jw in PGRP.workers
887-
if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED)
887+
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED)
888888
n = n - 1
889889
end
890890
end
@@ -935,7 +935,7 @@ julia> procs()
935935
function procs()
936936
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
937937
# filter out workers in the process of being setup/shutdown.
938-
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
938+
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
939939
else
940940
return Int[x.id for x in PGRP.workers]
941941
end
@@ -944,7 +944,7 @@ end
944944
function id_in_procs(id) # faster version of `id in procs()`
945945
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
946946
for x in PGRP.workers
947-
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED)
947+
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED)
948948
return true
949949
end
950950
end
@@ -966,7 +966,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
966966
"""
967967
function procs(pid::Integer)
968968
if myid() == 1
969-
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
969+
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
970970
if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
971971
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
972972
else
@@ -1073,11 +1073,11 @@ function _rmprocs(pids, waitfor)
10731073

10741074
start = time_ns()
10751075
while (time_ns() - start) < waitfor*1e9
1076-
all(w -> w.state === W_TERMINATED, rmprocset) && break
1076+
all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break
10771077
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
10781078
end
10791079

1080-
unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)]
1080+
unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)]
10811081
if length(unremoved) > 0
10821082
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
10831083
throw(ErrorException(estr))

src/messages.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ end
194194
function flush_gc_msgs()
195195
try
196196
for w in (PGRP::ProcessGroup).workers
197-
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
197+
if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag
198198
flush_gc_msgs(w)
199199
end
200200
end

src/process_messages.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
222222
println(stderr, "Process($(myid())) - Unknown remote, closing connection.")
223223
elseif !(wpid in map_del_wrkr)
224224
werr = worker_from_id(wpid)
225-
oldstate = werr.state
225+
oldstate = @atomic werr.state
226226
set_worker_state(werr, W_TERMINATED)
227227

228228
# If unhandleable error occurred talking to pid 1, exit

0 commit comments

Comments
 (0)