@@ -3,7 +3,7 @@ export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do
3
3
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL
4
4
using Compat
5
5
using Compat. Distributed
6
- import Compat. Sockets: connect, listenany, accept, getipaddr, IPv4
6
+ import Compat. Sockets: connect, listenany, accept, IPv4, getsockname
7
7
8
8
9
9
@@ -42,6 +42,7 @@ mutable struct MPIManager <: ClusterManager
42
42
43
43
# TCP Transport
44
44
port:: UInt16
45
+ ip:: UInt32
45
46
stdout_ios:: Array
46
47
47
48
# MPI transport
@@ -54,7 +55,7 @@ mutable struct MPIManager <: ClusterManager
54
55
sending_done:: Channel{Nothing}
55
56
receiving_done:: Channel{Nothing}
56
57
57
- function MPIManager (; np:: Integer = Sys. CPU_CORES ,
58
+ function MPIManager (; np:: Integer = Sys. CPU_THREADS ,
58
59
mpirun_cmd:: Cmd = ` mpiexec -n $np ` ,
59
60
launch_timeout:: Real = 60.0 ,
60
61
mode:: TransportMode = MPI_ON_WORKERS)
@@ -86,13 +87,15 @@ mutable struct MPIManager <: ClusterManager
86
87
if mode != MPI_TRANSPORT_ALL
87
88
# Start a listener for capturing stdout from the workers
88
89
port, server = listenany (11000 )
90
+ ip = getsockname (server)[1 ]. host
89
91
@async begin
90
92
while true
91
93
sock = accept (server)
92
94
push! (mgr. stdout_ios, sock)
93
95
end
94
96
end
95
97
mgr. port = port
98
+ mgr. ip = ip
96
99
mgr. stdout_ios = IO[]
97
100
else
98
101
mgr. rank2streams = Dict {Int,Tuple{IO,IO}} ()
@@ -133,7 +136,7 @@ function Distributed.launch(mgr::MPIManager, params::Dict,
133
136
throw (ErrorException (" Reuse of MPIManager is not allowed." ))
134
137
end
135
138
cookie = string (" :cookie_" ,Distributed. cluster_cookie ())
136
- setup_cmds = ` using MPI\; MPI.setup_worker'('$(getipaddr () . host ) ,$(mgr. port) ,$cookie ')'`
139
+ setup_cmds = ` using MPI\; MPI.setup_worker'('$(mgr . ip ) ,$(mgr. port) ,$cookie ')'`
137
140
mpi_cmd = ` $(mgr. mpirun_cmd) $(params[:exename ]) -e $(Base. shell_escape (setup_cmds)) `
138
141
open (detach (mpi_cmd))
139
142
mgr. launched = true
@@ -151,7 +154,7 @@ function Distributed.launch(mgr::MPIManager, params::Dict,
151
154
end
152
155
153
156
# Traverse all worker I/O streams and receive their MPI rank
154
- configs = Array {WorkerConfig} (mgr. np)
157
+ configs = Array {WorkerConfig} (undef, mgr. np)
155
158
@sync begin
156
159
for io in mgr. stdout_ios
157
160
@async let io= io
@@ -199,12 +202,12 @@ function setup_worker(host, port, cookie)
199
202
200
203
# Hand over control to Base
201
204
if cookie == nothing
202
- Base . start_worker (io)
205
+ Distributed . start_worker (io)
203
206
else
204
207
if isa (cookie, Symbol)
205
208
cookie = string (cookie)[8 : end ] # strip the leading "cookie_"
206
209
end
207
- Base . start_worker (io, cookie)
210
+ Distributed . start_worker (io, cookie)
208
211
end
209
212
end
210
213
279
282
# case
280
283
function start_send_event_loop (mgr:: MPIManager , rank:: Int )
281
284
try
282
- r_s = BufferStream ()
283
- w_s = BufferStream ()
285
+ r_s = Base . BufferStream ()
286
+ w_s = Base . BufferStream ()
284
287
mgr. rank2streams[rank] = (r_s, w_s)
285
288
286
289
# TODO : There is one task per communication partner -- this can be
@@ -292,7 +295,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int)
292
295
reqs = MPI. Request[]
293
296
while ! isready (mgr. initiate_shutdown)
294
297
# When data are available, send them
295
- while nb_available (w_s) > 0
298
+ while bytesavailable (w_s) > 0
296
299
data = take! (w_s. buffer)
297
300
push! (reqs, MPI. Isend (data, rank, 0 , mgr. comm))
298
301
end
@@ -307,7 +310,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int)
307
310
end
308
311
(r_s, w_s)
309
312
catch e
310
- Base. show_backtrace (STDOUT , catch_backtrace ())
313
+ Base. show_backtrace (stdout , catch_backtrace ())
311
314
println (e)
312
315
rethrow (e)
313
316
end
@@ -334,11 +337,15 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
334
337
# Create manager object
335
338
mgr = MPIManager (np= size- 1 , mode= mode)
336
339
mgr. comm = comm
340
+ # Needed because of Julia commit https://github.com/JuliaLang/julia/commit/299300a409c35153a1fa235a05c3929726716600
341
+ if isdefined (Distributed, :init_multi )
342
+ Distributed. init_multi ()
343
+ end
337
344
# Send connection information to all workers
338
345
# TODO : Use Bcast
339
346
for j in 1 : size- 1
340
347
cookie = VERSION >= v " 0.5.0-dev+4047" ? Distributed. cluster_cookie () : nothing
341
- MPI. send ((getipaddr () . host , mgr. port, cookie), j, 0 , comm)
348
+ MPI. send ((mgr . ip , mgr. port, cookie), j, 0 , comm)
342
349
end
343
350
# Tell Base about the workers
344
351
addprocs (mgr)
@@ -363,6 +370,9 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
363
370
364
371
# Send the cookie over. Introduced in v"0.5.0-dev+4047". Irrelevant under MPI
365
372
# transport, but need it to satisfy the changed protocol.
373
+ if isdefined (Distributed, :init_multi )
374
+ Distributed. init_multi ()
375
+ end
366
376
MPI. bcast (Distributed. cluster_cookie (), 0 , comm)
367
377
# Start event loop for the workers
368
378
@async receive_event_loop (mgr)
@@ -376,7 +386,7 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
376
386
mgr. comm = comm
377
387
# Recv the cookie
378
388
cookie = MPI. bcast (nothing , 0 , comm)
379
- Base . init_worker (cookie, mgr)
389
+ Distributed . init_worker (cookie, mgr)
380
390
# Start a worker event loop
381
391
receive_event_loop (mgr)
382
392
MPI. Finalize ()
@@ -394,7 +404,7 @@ function receive_event_loop(mgr::MPIManager)
394
404
(hasdata, stat) = MPI. Iprobe (MPI. ANY_SOURCE, 0 , mgr. comm)
395
405
if hasdata
396
406
count = Get_count (stat, UInt8)
397
- buf = Array {UInt8} (count)
407
+ buf = Array {UInt8} (undef, count)
398
408
from_rank = Get_source (stat)
399
409
MPI. Recv! (buf, from_rank, 0 , mgr. comm)
400
410
@@ -403,7 +413,7 @@ function receive_event_loop(mgr::MPIManager)
403
413
# This is the first time we communicate with this rank.
404
414
# Set up a new connection.
405
415
(r_s, w_s) = start_send_event_loop (mgr, from_rank)
406
- Base . process_messages (r_s, w_s)
416
+ Distributed . process_messages (r_s, w_s)
407
417
num_send_loops += 1
408
418
else
409
419
(r_s, w_s) = streams
459
469
function mpi_do (mgr:: MPIManager , expr)
460
470
! mgr. initialized && wait (mgr. cond_initialized)
461
471
jpids = keys (mgr. j2mpi)
462
- refs = Array {Any} (length (jpids))
472
+ refs = Array {Any} (undef, length (jpids))
463
473
for (i,p) in enumerate (Iterators. filter (x -> x != myid (), jpids))
464
474
refs[i] = remotecall (expr, p)
465
475
end
490
500
macro mpi_do (mgr, expr)
491
501
quote
492
502
# Evaluate expression in Main module
493
- thunk = () -> (eval (Main, $ (Expr (:quote , expr))); nothing )
503
+ thunk = () -> (Core . eval (Main, $ (Expr (:quote , expr))); nothing )
494
504
mpi_do ($ (esc (mgr)), thunk)
495
505
end
496
506
end
0 commit comments