1
- import Base: launch, manage, kill, procs, connect
1
+ import Base: kill
2
2
export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do
3
3
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL
4
+ using Compat
5
+ using Compat. Distributed
6
+ import Compat. Sockets: connect, listenany, accept, getipaddr, IPv4
4
7
5
8
6
9
@@ -47,9 +50,9 @@ mutable struct MPIManager <: ClusterManager
47
50
48
51
# MPI_TRANSPORT_ALL
49
52
comm:: MPI.Comm
50
- initiate_shutdown:: Channel{Void }
51
- sending_done:: Channel{Void }
52
- receiving_done:: Channel{Void }
53
+ initiate_shutdown:: Channel{Nothing }
54
+ sending_done:: Channel{Nothing }
55
+ receiving_done:: Channel{Nothing }
53
56
54
57
function MPIManager (; np:: Integer = Sys. CPU_CORES,
55
58
mpirun_cmd:: Cmd = ` mpiexec -n $np ` ,
@@ -83,7 +86,7 @@ mutable struct MPIManager <: ClusterManager
83
86
if mode != MPI_TRANSPORT_ALL
84
87
# Start a listener for capturing stdout from the workers
85
88
port, server = listenany (11000 )
86
- @schedule begin
89
+ @async begin
87
90
while true
88
91
sock = accept (server)
89
92
push! (mgr. stdout_ios, sock)
@@ -98,12 +101,12 @@ mutable struct MPIManager <: ClusterManager
98
101
end
99
102
100
103
if mode == MPI_TRANSPORT_ALL
101
- mgr. initiate_shutdown = Channel {Void } (1 )
102
- mgr. sending_done = Channel {Void } (np)
103
- mgr. receiving_done = Channel {Void } (1 )
104
+ mgr. initiate_shutdown = Channel {Nothing } (1 )
105
+ mgr. sending_done = Channel {Nothing } (np)
106
+ mgr. receiving_done = Channel {Nothing } (1 )
104
107
global initiate_shutdown = mgr. initiate_shutdown
105
108
end
106
- mgr. initiate_shutdown = Channel {Void } (1 )
109
+ mgr. initiate_shutdown = Channel {Nothing } (1 )
107
110
global initiate_shutdown = mgr. initiate_shutdown
108
111
109
112
return mgr
119
122
# MPI_ON_WORKERS case
120
123
121
124
# Launch a new worker, called from Base.addprocs
122
- function launch (mgr:: MPIManager , params:: Dict ,
125
+ function Distributed . launch (mgr:: MPIManager , params:: Dict ,
123
126
instances:: Array , cond:: Condition )
124
127
try
125
128
if mgr. mode == MPI_ON_WORKERS
@@ -129,7 +132,7 @@ function launch(mgr::MPIManager, params::Dict,
129
132
println (" Try again with a different instance of MPIManager." )
130
133
throw (ErrorException (" Reuse of MPIManager is not allowed." ))
131
134
end
132
- cookie = string (" :cookie_" ,Base . cluster_cookie ())
135
+ cookie = string (" :cookie_" ,Distributed . cluster_cookie ())
133
136
setup_cmds = ` using MPI\; MPI.setup_worker'('$(getipaddr (). host) ,$(mgr. port) ,$cookie ')'`
134
137
mpi_cmd = ` $(mgr. mpirun_cmd) $(params[:exename ]) -e $(Base. shell_escape (setup_cmds)) `
135
138
open (detach (mpi_cmd))
@@ -156,7 +159,7 @@ function launch(mgr::MPIManager, params::Dict,
156
159
config. io = io
157
160
# Add config to the correct slot so that MPI ranks and
158
161
# Julia pids are in the same order
159
- rank = Base . deserialize (io)
162
+ rank = Compat . Serialization . deserialize (io)
160
163
idx = mgr. mode == MPI_ON_WORKERS ? rank+ 1 : rank
161
164
configs[idx] = config
162
165
end
@@ -192,7 +195,7 @@ function setup_worker(host, port, cookie)
192
195
193
196
# Send our MPI rank to the manager
194
197
rank = MPI. Comm_rank (MPI. COMM_WORLD)
195
- Base . serialize (io, rank)
198
+ Compat . Serialization . serialize (io, rank)
196
199
197
200
# Hand over control to Base
198
201
if cookie == nothing
@@ -206,7 +209,7 @@ function setup_worker(host, port, cookie)
206
209
end
207
210
208
211
# Manage a worker (e.g. register / deregister it)
209
- function manage (mgr:: MPIManager , id:: Integer , config:: WorkerConfig , op:: Symbol )
212
+ function Distributed . manage (mgr:: MPIManager , id:: Integer , config:: WorkerConfig , op:: Symbol )
210
213
if op == :register
211
214
# Retrieve MPI rank from worker
212
215
# TODO : Why is this necessary? The workers already sent their rank.
@@ -284,7 +287,7 @@ function start_send_event_loop(mgr::MPIManager, rank::Int)
284
287
# quite expensive when there are many workers. Design something better.
285
288
# For example, instead of maintaining two streams per worker, provide
286
289
# only abstract functions to write to / read from these streams.
287
- @schedule begin
290
+ @async begin
288
291
rr = MPI. Comm_rank (mgr. comm)
289
292
reqs = MPI. Request[]
290
293
while ! isready (mgr. initiate_shutdown)
@@ -334,7 +337,7 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
334
337
# Send connection information to all workers
335
338
# TODO : Use Bcast
336
339
for j in 1 : size- 1
337
- cookie = VERSION >= v " 0.5.0-dev+4047" ? Base . cluster_cookie () : nothing
340
+ cookie = VERSION >= v " 0.5.0-dev+4047" ? Distributed . cluster_cookie () : nothing
338
341
MPI. send ((getipaddr (). host, mgr. port, cookie), j, 0 , comm)
339
342
end
340
343
# Tell Base about the workers
@@ -360,9 +363,9 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
360
363
361
364
# Send the cookie over. Introduced in v"0.5.0-dev+4047". Irrelevant under MPI
362
365
# transport, but need it to satisfy the changed protocol.
363
- MPI. bcast (Base . cluster_cookie (), 0 , comm)
366
+ MPI. bcast (Distributed . cluster_cookie (), 0 , comm)
364
367
# Start event loop for the workers
365
- @schedule receive_event_loop (mgr)
368
+ @async receive_event_loop (mgr)
366
369
# Tell Base about the workers
367
370
addprocs (mgr)
368
371
return mgr
@@ -493,7 +496,7 @@ macro mpi_do(mgr, expr)
493
496
end
494
497
495
498
# All managed Julia processes
496
- procs (mgr:: MPIManager ) = sort (keys (mgr. j2mpi))
499
+ Distributed . procs (mgr:: MPIManager ) = sort (keys (mgr. j2mpi))
497
500
498
501
# All managed MPI ranks
499
502
mpiprocs (mgr:: MPIManager ) = sort (keys (mgr. mpi2j))
0 commit comments