Skip to content

Commit 3da81c6

Browse files
IanButterworthJamesWrigley
authored andcommitted
revert to necessary @asyncs
1 parent 9082665 commit 3da81c6

File tree

6 files changed

+30
-28
lines changed

6 files changed

+30
-28
lines changed

src/cluster.jl

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ function check_worker_state(w::Worker)
163163
else
164164
w.ct_time = time()
165165
if myid() > w.id
166-
t = Threads.@spawn Threads.threadpool() exec_conn_func(w)
166+
t = @async exec_conn_func(w)
167167
else
168168
# route request via node 1
169-
t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
169+
t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
170170
end
171171
errormonitor(t)
172172
wait_for_conn(w)
@@ -252,7 +252,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
252252
else
253253
sock = listen(interface, LPROC.bind_port)
254254
end
255-
errormonitor(Threads.@spawn while isopen(sock)
255+
errormonitor(@async while isopen(sock)
256256
client = accept(sock)
257257
process_messages(client, client, true)
258258
end)
@@ -284,7 +284,7 @@ end
284284

285285

286286
function redirect_worker_output(ident, stream)
287-
t = Threads.@spawn while !eof(stream)
287+
t = @async while !eof(stream)
288288
line = readline(stream)
289289
if startswith(line, " From worker ")
290290
# stdout's of "additional" workers started from an initial worker on a host are not available
@@ -323,7 +323,7 @@ function read_worker_host_port(io::IO)
323323
leader = String[]
324324
try
325325
while ntries > 0
326-
readtask = Threads.@spawn Threads.threadpool() readline(io)
326+
readtask = @async readline(io)
327327
yield()
328328
while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
329329
sleep(0.05)
@@ -424,7 +424,7 @@ if launching workers programmatically, execute `addprocs` in its own task.
424424
425425
```julia
426426
# On busy clusters, call `addprocs` asynchronously
427-
t = Threads.@spawn addprocs(...)
427+
t = @async addprocs(...)
428428
```
429429
430430
```julia
@@ -490,13 +490,14 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
490490
# call manager's `launch` is a separate task. This allows the master
491491
# process initiate the connection setup process as and when workers come
492492
# online
493-
t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy)
493+
# NOTE: Must be `@async`. See FIXME above
494+
t_launch = @async launch(manager, params, launched, launch_ntfy)
494495

495496
@sync begin
496497
while true
497498
if isempty(launched)
498499
istaskdone(t_launch) && break
499-
Threads.@spawn Threads.threadpool() begin
500+
@async begin # NOTE: Must be `@async`. See FIXME above
500501
sleep(1)
501502
notify(launch_ntfy)
502503
end
@@ -506,7 +507,8 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
506507
if !isempty(launched)
507508
wconfig = popfirst!(launched)
508509
let wconfig=wconfig
509-
Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q)
510+
# NOTE: Must be `@async`. See FIXME above
511+
@async setup_launched_worker(manager, wconfig, launched_q)
510512
end
511513
end
512514
end
@@ -586,7 +588,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
586588
wconfig.port = port
587589

588590
let wconfig=wconfig
589-
Threads.@spawn Threads.threadpool() begin
591+
@async begin
590592
pid = create_worker(manager, wconfig)
591593
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
592594
push!(launched_q, pid)
@@ -752,7 +754,7 @@ function check_master_connect()
752754
end
753755

754756
errormonitor(
755-
Threads.@spawn begin
757+
@async begin
756758
timeout = worker_timeout()
757759
if timedwait(() -> !haskey(map_pid_wrkr, 1), timeout) === :timed_out
758760
print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n")
@@ -1044,13 +1046,13 @@ function rmprocs(pids...; waitfor=typemax(Int))
10441046

10451047
pids = vcat(pids...)
10461048
if waitfor == 0
1047-
t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int))
1049+
t = @async _rmprocs(pids, typemax(Int))
10481050
yield()
10491051
return t
10501052
else
10511053
_rmprocs(pids, waitfor)
10521054
# return a dummy task object that user code can wait on.
1053-
return Threads.@spawn Threads.threadpool() nothing
1055+
return @async nothing
10541056
end
10551057
end
10561058

@@ -1233,7 +1235,7 @@ function interrupt(pids::AbstractVector=workers())
12331235
@assert myid() == 1
12341236
@sync begin
12351237
for pid in pids
1236-
Threads.@spawn Threads.threadpool() interrupt(pid)
1238+
@async interrupt(pid)
12371239
end
12381240
end
12391241
end

src/macros.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex)
230230
# execute locally last as we do not want local execution to block serialization
231231
# of the request to remote nodes.
232232
for _ in 1:run_locally
233-
Threads.@spawn Threads.threadpool() Core.eval(m, ex)
233+
@async Core.eval(m, ex)
234234
end
235235
end
236236
nothing
@@ -275,7 +275,7 @@ function preduce(reducer, f, R)
275275
end
276276

277277
function pfor(f, R)
278-
t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
278+
t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
279279
@spawnat :any f(R, first(c), last(c))
280280
end
281281
errormonitor(t)

src/managers.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
178178
# Wait for all launches to complete.
179179
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
180180
let machine=machine, cnt=cnt
181-
Threads.@spawn Threads.threadpool() try
181+
@async try
182182
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
183183
catch e
184184
print(stderr, "exception launching on machine $(machine) : $(e)\n")
@@ -744,7 +744,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou
744744
# First, try sending `exit()` to the remote over the usual control channels
745745
remote_do(exit, pid)
746746

747-
timer_task = Threads.@spawn Threads.threadpool() begin
747+
timer_task = @async begin
748748
sleep(exit_timeout)
749749

750750
# Check to see if our child exited, and if not, send an actual kill signal

src/messages.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ function flush_gc_msgs()
200200
end
201201
catch e
202202
bt = catch_backtrace()
203-
Threads.@spawn showerror(stderr, e, bt)
203+
@async showerror(stderr, e, bt)
204204
end
205205
end
206206

src/process_messages.jl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ function schedule_call(rid, thunk)
8585
rv = RemoteValue(def_rv_channel())
8686
(PGRP::ProcessGroup).refs[rid] = rv
8787
push!(rv.clientset, rid.whence)
88-
errormonitor(Threads.@spawn run_work_thunk(rv, thunk))
88+
errormonitor(@async run_work_thunk(rv, thunk))
8989
return rv
9090
end
9191
end
@@ -118,7 +118,7 @@ end
118118

119119
## message event handlers ##
120120
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
121-
errormonitor(Threads.@spawn process_tcp_streams(r_stream, w_stream, incoming))
121+
errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming))
122122
end
123123

124124
function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
@@ -148,7 +148,7 @@ Julia version number to perform the authentication handshake.
148148
See also [`cluster_cookie`](@ref).
149149
"""
150150
function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
151-
errormonitor(Threads.@spawn message_handler_loop(r_stream, w_stream, incoming))
151+
errormonitor(@async message_handler_loop(r_stream, w_stream, incoming))
152152
end
153153

154154
function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
@@ -283,7 +283,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
283283
schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
284284
end
285285
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
286-
errormonitor(Threads.@spawn begin
286+
errormonitor(@async begin
287287
v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false)
288288
if isa(v, SyncTake)
289289
try
@@ -299,15 +299,15 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi
299299
end
300300

301301
function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
302-
errormonitor(Threads.@spawn begin
302+
errormonitor(@async begin
303303
rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
304304
deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
305305
nothing
306306
end)
307307
end
308308

309309
function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
310-
errormonitor(Threads.@spawn run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true))
310+
errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true))
311311
end
312312

313313
function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
@@ -350,7 +350,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
350350
# The constructor registers the object with a global registry.
351351
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
352352
else
353-
Threads.@spawn connect_to_peer(cluster_manager, rpid, wconfig)
353+
@async connect_to_peer(cluster_manager, rpid, wconfig)
354354
end
355355
end
356356
end

src/remotecall.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ or to use a local [`Channel`](@ref) as a proxy:
205205
```julia
206206
p = 1
207207
f = Future(p)
208-
errormonitor(Threads.@spawn put!(f, remotecall_fetch(long_computation, p)))
208+
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
209209
isready(f) # will not block
210210
```
211211
"""
@@ -322,7 +322,7 @@ function process_worker(rr)
322322
msg = (remoteref_id(rr), myid())
323323

324324
# Needs to acquire a lock on the del_msg queue
325-
T = Threads.@spawn Threads.threadpool() begin
325+
T = Threads.@spawn begin
326326
publish_del_msg!($w, $msg)
327327
end
328328
Base.errormonitor(T)

0 commit comments

Comments
 (0)