Skip to content

Commit 2741428

Browse files
committed
Add a ClusterManager using one-sided communication
1 parent a2e938c commit 2741428

File tree

8 files changed

+473
-21
lines changed

8 files changed

+473
-21
lines changed

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,14 @@ Fields `j2mpi` and `mpi2j` of `MPIManager` are associative collections mapping j
166166

167167
This launches a total of 5 processes, mpi rank 0 is the julia pid 1. mpi rank 1 is julia pid 2 and so on.
168168

169-
The program must call `MPI.start(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`.
169+
The program must call `MPI.start_main_loop(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`.
170170
On mpi rank 0, it returns a `manager` which can be used with `@mpi_do`
171171
On other processes (i.e., the workers) the function does not return
172172

173173

174174
### MPIManager
175175
### (MPI transport - all processes execute MPI code)
176-
`MPI.start` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport.
176+
`MPI.start_main_loop` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport.
177177
`mpirun -np 5 julia 06-cman-transport.jl MPI` will run the example using MPI as transport.
178178

179179
## Julia MPI-only interface
@@ -189,6 +189,11 @@ juliacomm = MPI.COMM_WORLD
189189
ccomm = MPI.CComm(juliacomm)
190190
```
191191

192+
### MPIWindowIOManager
193+
This manager is started using the `MPI_WINDOW_IO` or `MPI_WINDOW_NOWAIT` transports. It uses asynchronous IO
194+
based on MPI windows. The `MPI_WINDOW_NOWAIT` will only use the clustermanager for code preceeded by the `@cluster`
195+
macro. See `test_windowcman.jl` and `test_windowcman_nowait.jl` for examples.
196+
192197
### Currently wrapped MPI functions
193198
Convention: `MPI_Fun => MPI.Fun`
194199

src/MPI.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ include(depfile)
2020
include("mpi-base.jl")
2121
include("cman.jl")
2222
include("window-io.jl")
23+
include("window-cman.jl")
2324

2425
const mpitype_dict = Dict{DataType, Cint}()
2526
const mpitype_dict_inverse = Dict{Cint, DataType}()

src/cman.jl

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,37 @@
11
import Base: kill
22
export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do
3-
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL
3+
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL, MPI_WINDOW_IO, MPI_WINDOW_NOWAIT
44
using Compat
55
using Compat.Distributed
66
import Compat.Sockets: connect, listenany, accept, IPv4, getsockname
77

88

99

10-
################################################################################
11-
# MPI Cluster Manager
12-
# Note: The cluster manager object lives only in the manager process,
13-
# except for MPI_TRANSPORT_ALL
10+
"""
11+
MPI Cluster Manager
12+
Note: The cluster manager object lives only in the manager process,
13+
except for MPI_TRANSPORT_ALL and MPI_WINDOW_IO
14+
15+
There are four different transport modes:
1416
15-
# There are three different transport modes:
17+
MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This
18+
allows interactive use from a Julia shell, using the familiar `addprocs`
19+
interface.
1620
17-
# MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This
18-
# allows interactive use from a Julia shell, using the familiar `addprocs`
19-
# interface.
21+
MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager
22+
process. This corresponds to the "usual" way in which MPI is used in a
23+
headless mode, e.g. submitted as a script to a queueing system.
2024
21-
# MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager
22-
# process. This corresponds to the "usual" way in which MPI is used in a
23-
# headless mode, e.g. submitted as a script to a queueing system.
25+
TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its
26+
communication between processes. MPI can still be used by the user.
2427
25-
# TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its
26-
# communication between processes. MPI can still be used by the user.
28+
MPI_WINDOW_IO: Uses the MPI shared memory model with passive communication on all processes.
29+
The program must be started with mpirun or equivalent.
2730
28-
@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL
31+
MPI_WINDOW_NOWAIT: Sets up a cluster manager, but only uses it for code enlosed in the @cluster
32+
macro. All other code runs as regular MPI code (single program, multiple data).
33+
"""
34+
@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL MPI_WINDOW_IO MPI_WINDOW_NOWAIT
2935

3036
mutable struct MPIManager <: ClusterManager
3137
np::Int # number of worker processes (excluding the manager process)
@@ -319,8 +325,9 @@ end
319325
################################################################################
320326
# Alternative startup model: All Julia processes are started via an external
321327
# mpirun, and the user does not call addprocs.
322-
323-
# Enter the MPI cluster manager's main loop (does not return on the workers)
328+
"""
329+
Enter the MPI cluster manager's main loop (does not return on the workers)
330+
"""
324331
function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
325332
comm::MPI.Comm=MPI.COMM_WORLD)
326333
!MPI.Initialized() && MPI.Init()
@@ -392,6 +399,10 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
392399
MPI.Finalize()
393400
exit()
394401
end
402+
elseif mode == MPI_WINDOW_IO
403+
start_window_worker(comm, true)
404+
elseif mode == MPI_WINDOW_NOWAIT
405+
start_window_worker(comm, false)
395406
else
396407
error("Unknown mode $mode")
397408
end

src/window-cman.jl

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import Base: launch, kill, manage, connect
2+
export MPIWindowIOManager, launch, kill, manage, connect, @cluster
3+
4+
"""
5+
Stores the buffers needed for communication, in one instance per rank. Loop stops when the stop_condition is triggered
6+
"""
7+
mutable struct MPIWindowIOManager <: ClusterManager
8+
comm::MPI.Comm
9+
connection_windows::Vector{WindowIO}
10+
stdio_windows::Vector{WindowIO}
11+
workers_wait::Bool
12+
13+
function MPIWindowIOManager(comm::MPI.Comm, workers_wait::Bool)
14+
nb_procs = MPI.Comm_size(comm)
15+
connection_windows = Vector{WindowIO}(nb_procs)
16+
stdio_windows = Vector{WindowIO}(nb_procs)
17+
18+
for i in 1:nb_procs
19+
connection_windows[i] = WindowIO(comm)
20+
stdio_windows[i] = WindowIO(comm)
21+
end
22+
23+
# Make sure all windows are created before continuing
24+
MPI.Barrier(comm)
25+
26+
return new(comm, connection_windows, stdio_windows, workers_wait)
27+
end
28+
end
29+
30+
# Closes all local MPI Windows in a manager. Must be called collectively on all ranks
31+
function closeall(manager::MPIWindowIOManager)
32+
for w in manager.connection_windows
33+
close(w)
34+
end
35+
for w in manager.stdio_windows
36+
close(w)
37+
end
38+
end
39+
40+
function launch(mgr::MPIWindowIOManager, params::Dict,
41+
instances::Array, cond::Condition)
42+
try
43+
nprocs = MPI.Comm_size(mgr.comm)
44+
for cnt in 1:(nprocs-1)
45+
push!(instances, WorkerConfig())
46+
end
47+
notify(cond)
48+
catch e
49+
println("Error in MPI launch $e")
50+
rethrow(e)
51+
end
52+
end
53+
54+
function kill(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig)
55+
@spawnat pid notify(_stop_requested)
56+
Distributed.set_worker_state(Distributed.Worker(pid), Distributed.W_TERMINATED)
57+
end
58+
59+
function manage(mgr::MPIWindowIOManager, id::Integer, config::WorkerConfig, op::Symbol) end
60+
61+
function connect(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig)
62+
myrank = MPI.Comm_rank(mgr.comm)
63+
if myrank == 0
64+
proc_stdio = mgr.stdio_windows[pid]
65+
@schedule while !eof(proc_stdio)
66+
try
67+
println("\tFrom worker $(pid):\t$(readline(proc_stdio))")
68+
catch e
69+
end
70+
end
71+
end
72+
return (mgr.connection_windows[pid], WindowWriter(mgr.connection_windows[myrank+1], pid-1))
73+
end
74+
75+
function redirect_to_mpi(s::WindowWriter)
76+
(rd, wr) = redirect_stdout()
77+
@schedule while !eof(rd) && isopen(s.winio)
78+
av = readline(rd)
79+
if isopen(s.winio)
80+
println(s,av)
81+
flush(s)
82+
end
83+
end
84+
end
85+
86+
function checkworkers()
87+
for w in workers()
88+
if w != (@fetchfrom w myid())
89+
error("worker $w is not waiting")
90+
end
91+
end
92+
end
93+
94+
function notify_workers()
95+
for w in workers()
96+
@spawnat(w, notify(_stop_requested))
97+
end
98+
end
99+
100+
function wait_for_events()
101+
global _stop_requested
102+
wait(_stop_requested)
103+
end
104+
105+
"""
106+
Initialize the current process as a Julia parallel worker. Must be called on all ranks.
107+
If comm is not supplied, MPI is initialized and MPI_COMM_WORLD is used.
108+
"""
109+
function start_window_worker(comm::Comm, workers_wait)
110+
rank = MPI.Comm_rank(comm)
111+
N = MPI.Comm_size(comm)
112+
113+
manager = MPIWindowIOManager(comm, workers_wait)
114+
cookie = string(comm)
115+
if length(cookie) > Base.Distributed.HDR_COOKIE_LEN
116+
cookie = cookie[1:Base.Distributed.HDR_COOKIE_LEN]
117+
end
118+
119+
try
120+
if rank == 0
121+
Base.cluster_cookie(cookie)
122+
MPI.Barrier(comm)
123+
addprocs(manager)
124+
@assert nprocs() == N
125+
@assert nworkers() == (N == 1 ? 1 : N-1)
126+
127+
if !workers_wait
128+
checkworkers()
129+
notify_workers()
130+
end
131+
else
132+
init_worker(cookie, manager)
133+
MPI.Barrier(comm)
134+
redirect_to_mpi(WindowWriter(manager.stdio_windows[rank+1], 0))
135+
for i in vcat([1], (rank+2):N)
136+
# Receiving end of connections to all higher workers and master
137+
Base.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1))
138+
end
139+
140+
global _stop_requested = Condition()
141+
wait_for_events()
142+
end
143+
catch e
144+
Base.display_error(STDERR,"exception $e on rank $rank",backtrace())
145+
end
146+
147+
if workers_wait && rank != 0
148+
closeall(manager)
149+
MPI.Finalize()
150+
exit(0)
151+
end
152+
153+
return manager
154+
end
155+
156+
"""
157+
Stop the manager. This closes all windows and calls MPI.Finalize on all workers
158+
"""
159+
function stop_main_loop(manager::MPIWindowIOManager)
160+
if myid() != 1
161+
wait_for_events()
162+
else
163+
checkworkers()
164+
if nprocs() > 1
165+
rmprocs(workers())
166+
end
167+
end
168+
closeall(manager)
169+
MPI.Finalize()
170+
end
171+
172+
"""
173+
Runs the given expression using the Julia parallel cluster. Useful when running with MPI_WINDOW_NOWAIT,
174+
since this will temporarily activate the worker event loops to listen for messages.
175+
"""
176+
macro cluster(expr)
177+
quote
178+
if myid() != 1
179+
wait_for_events()
180+
else
181+
$(esc(expr))
182+
notify_workers()
183+
end
184+
end
185+
end

src/window-io.jl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,16 @@ function has_data_available(w::WindowIO)
9797
end
9898

9999
# Check if we need to grow the buffer
100+
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win)
100101
MPI.Win_sync(w.header_cwin) # CWin version doesn't allocate
101102
if w.header.needed_length > w.header.length
102-
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win)
103103
MPI.Win_detach(w.win, w.buffer)
104104
resize!(w.buffer, w.header.needed_length)
105105
MPI.Win_attach(w.win, w.buffer)
106106
w.header.address = MPI.Get_address(w.buffer)
107107
w.header.length = w.header.needed_length
108-
MPI.Win_unlock(w.myrank, w.header_win)
109108
end
109+
MPI.Win_unlock(w.myrank, w.header_win)
110110

111111
return w.header.count > w.ptr
112112
end
@@ -128,7 +128,9 @@ end
128128
function wait_nb_available(w, nb)
129129
nb_found = wait_nb_available(w)
130130
while nb_found < nb && w.is_open
131+
MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win)
131132
MPI.Win_sync(w.header_cwin) # sync every loop, to make sure we get updates
133+
MPI.Win_unlock(w.myrank, w.header_win)
132134
nb_found = wait_nb_available(w)
133135
end
134136
return nb_found

test/test_reduce.jl

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,35 @@ sum_mesg = MPI.Reduce(mesg, MPI.SUM, root, comm)
2727
sum_mesg = rank == root ? sum_mesg : size*mesg
2828
@test isapprox(norm(sum_mesg-size*mesg), 0.0)
2929

30+
# For comparison with the clustermanager version
31+
const ARRSIZE = 1024^2*100
32+
@test ARRSIZE % size == 0
33+
const my_arr = fill(1*(rank+1),ARRSIZE ÷ size)
34+
35+
function mpi_sum(arr)::Int
36+
mysum = 0
37+
for x in arr
38+
mysum += x
39+
end
40+
totalsum = MPI.Reduce(mysum, +, 0, comm)
41+
return rank == 0 ? totalsum[1] : 0
42+
end
43+
44+
const sumresult = mpi_sum(my_arr)
45+
const expected = sum((ARRSIZE ÷ size) * (1:size))
46+
if rank == 0
47+
@test sumresult == expected
48+
end
49+
if rank == 0
50+
println("Timings for MPI reduce:")
51+
@time expected == mpi_sum(my_arr)
52+
@time expected == mpi_sum(my_arr)
53+
@time expected == mpi_sum(my_arr)
54+
else
55+
mpi_sum(my_arr)
56+
mpi_sum(my_arr)
57+
mpi_sum(my_arr)
58+
end
59+
60+
3061
MPI.Finalize()

0 commit comments

Comments
 (0)