Skip to content

Commit 33543a7

Browse files
authored
I/O: add shared file pointer support (#397)
1 parent 203e653 commit 33543a7

File tree

8 files changed

+283
-13
lines changed

8 files changed

+283
-13
lines changed

deps/consts_microsoftmpi.jl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,21 @@ const MPI_MODE_UNIQUE_OPEN = reinterpret(Cint, 0x00000020)
9696
const MPI_MODE_SEQUENTIAL = reinterpret(Cint, 0x00000100)
9797
const MPI_MODE_APPEND = reinterpret(Cint, 0x00000080)
9898

99-
const MPI_BOTTOM = reinterpret(SentinelPtr, 0)
100-
const MPI_IN_PLACE = reinterpret(SentinelPtr, -1)
101-
102-
const MPI_STATUS_IGNORE = reinterpret(SentinelPtr, 1)
103-
const MPI_STATUSES_IGNORE = reinterpret(SentinelPtr, 1)
99+
const MPI_SEEK_SET = Cint(600)
100+
const MPI_SEEK_CUR = Cint(602)
101+
const MPI_SEEK_END = Cint(604)
104102

105103
const MPI_IDENT = Cint(0)
106104
const MPI_CONGRUENT = Cint(1)
107105
const MPI_SIMILAR = Cint(2)
108106
const MPI_UNEQUAL = Cint(3)
109107

108+
const MPI_BOTTOM = reinterpret(SentinelPtr, 0)
109+
const MPI_IN_PLACE = reinterpret(SentinelPtr, -1)
110+
111+
const MPI_STATUS_IGNORE = reinterpret(SentinelPtr, 1)
112+
const MPI_STATUSES_IGNORE = reinterpret(SentinelPtr, 1)
113+
110114
struct Status
111115
_pad1::Cint
112116
_pad2::Cint

deps/consts_mpich.jl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,16 @@ const MPI_MODE_UNIQUE_OPEN = Cint(32)
101101
const MPI_MODE_SEQUENTIAL = Cint(256)
102102
const MPI_MODE_APPEND = Cint(128)
103103

104-
const MPI_BOTTOM = reinterpret(SentinelPtr, 0)
105-
const MPI_IN_PLACE = reinterpret(SentinelPtr, -1)
106-
const MPI_STATUS_IGNORE = reinterpret(SentinelPtr, 1)
107-
const MPI_STATUSES_IGNORE = reinterpret(SentinelPtr, 1)
104+
const MPI_SEEK_SET = Cint(600)
105+
const MPI_SEEK_CUR = Cint(602)
106+
const MPI_SEEK_END = Cint(604)
108107

109108
const MPI_IDENT = Cint(0)
110109
const MPI_CONGRUENT = Cint(1)
111110
const MPI_SIMILAR = Cint(2)
112111
const MPI_UNEQUAL = Cint(3)
112+
113+
const MPI_BOTTOM = reinterpret(SentinelPtr, 0)
114+
const MPI_IN_PLACE = reinterpret(SentinelPtr, -1)
115+
const MPI_STATUS_IGNORE = reinterpret(SentinelPtr, 1)
116+
const MPI_STATUSES_IGNORE = reinterpret(SentinelPtr, 1)

deps/consts_openmpi.jl

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,17 @@ const MPI_MODE_UNIQUE_OPEN = Cint(32)
107107
const MPI_MODE_SEQUENTIAL = Cint(256)
108108
const MPI_MODE_APPEND = Cint(128)
109109

110-
const MPI_BOTTOM = reinterpret(SentinelPtr, 0)
111-
const MPI_IN_PLACE = reinterpret(SentinelPtr, 1)
112-
const MPI_STATUS_IGNORE = reinterpret(SentinelPtr, 0)
113-
const MPI_STATUSES_IGNORE = reinterpret(SentinelPtr, 0)
110+
const MPI_SEEK_SET = Cint(600)
111+
const MPI_SEEK_CUR = Cint(602)
112+
const MPI_SEEK_END = Cint(604)
114113

115114
const MPI_IDENT = Cint(0)
116115
const MPI_CONGRUENT = Cint(1)
117116
const MPI_SIMILAR = Cint(2)
118117
const MPI_UNEQUAL = Cint(3)
118+
119+
const MPI_BOTTOM = reinterpret(SentinelPtr, 0)
120+
const MPI_IN_PLACE = reinterpret(SentinelPtr, 1)
121+
const MPI_STATUS_IGNORE = reinterpret(SentinelPtr, 0)
122+
const MPI_STATUSES_IGNORE = reinterpret(SentinelPtr, 0)
123+

deps/gen_consts.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ MPI_Cints = [
135135
:MPI_MODE_UNIQUE_OPEN,
136136
:MPI_MODE_SEQUENTIAL,
137137
:MPI_MODE_APPEND,
138+
:MPI_SEEK_SET,
139+
:MPI_SEEK_CUR,
140+
:MPI_SEEK_END,
138141
:MPI_IDENT,
139142
:MPI_CONGRUENT,
140143
:MPI_SIMILAR,

docs/src/io.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ MPI.File.open
1010

1111
```@docs
1212
MPI.File.set_view!
13+
MPI.File.get_byte_offset
1314
```
1415

1516
## Consistency
@@ -28,3 +29,14 @@ MPI.File.read_at_all!
2829
MPI.File.write_at
2930
MPI.File.write_at_all
3031
```
32+
33+
### Shared pointer
34+
35+
```@docs
36+
MPI.File.read_shared!
37+
MPI.File.write_shared
38+
MPI.File.read_ordered!
39+
MPI.File.write_ordered
40+
MPI.File.seek_shared
41+
MPI.File.get_position_shared
42+
```

src/buffers.jl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ function Base.unsafe_convert(::Type{MPIPtr}, x::MPIBuffertype{T}) where T
1818
ptr = Base.unsafe_convert(Ptr{T}, x)
1919
reinterpret(MPIPtr, ptr)
2020
end
21+
22+
23+
function Base.cconvert(::Type{MPIPtr}, x::String)
24+
x
25+
end
26+
function Base.unsafe_convert(::Type{MPIPtr}, x::String)
27+
reinterpret(MPIPtr, pointer(x))
28+
end
29+
2130
function Base.cconvert(::Type{MPIPtr}, ::Nothing)
2231
reinterpret(MPIPtr, C_NULL)
2332
end
@@ -123,5 +132,8 @@ Construct a [`Buffer`](@ref) object for a send operation from `data`, allowing c
123132
`isbits(data)`.
124133
"""
125134
Buffer_send(data) = isbits(data) ? Buffer(Ref(data)) : Buffer(data)
135+
Buffer_send(str::String) = Buffer(str, sizeof(str), MPI.CHAR)
136+
137+
126138

127139
const BUFFER_NULL = Buffer(C_NULL, 0, DATATYPE_NULL)

src/io.jl

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,26 @@ function set_view!(file::FileHandle, disp::Integer, etype::Datatype, filetype::D
9797
set_view!(file, disp, etype, filetype, datarep, Info(infokwargs...))
9898
end
9999

100+
"""
101+
MPI.File.get_byte_offset(file::FileHandle, offset::Integer)
102+
103+
Converts a view-relative offset into an absolute byte position. Returns the absolute byte
104+
position (from the beginning of the file) of `offset` relative to the current view of
105+
`file`.
106+
107+
# External links
108+
$(_doc_external("MPI_File_get_byte_offset"))
109+
"""
110+
function get_byte_offset(file::FileHandle, offset::Integer)
111+
r_displ = Ref{MPI_Offset}()
112+
# int MPI_File_get_byte_offset(MPI_File fh, MPI_Offset offset,
113+
# MPI_Offset *disp)
114+
@mpichk ccall((:MPI_File_get_byte_offset, libmpi), Cint,
115+
(MPI_File, MPI_Offset, Ptr{MPI_Offset}),
116+
file, offset, r_displ)
117+
r_displ[]
118+
end
119+
100120
"""
101121
MPI.File.sync(fh::FileHandle)
102122
@@ -212,6 +232,148 @@ end
212232
write_at_all(file::FileHandle, offset::Integer, data) = write_at_all(file, offset, Buffer_send(data))
213233

214234

235+
# Shared file pointer
236+
"""
237+
MPI.File.read_shared!(file::FileHandle, data)
238+
239+
Reads from `file` using the shared file pointer into `data`. `data` can be a
240+
[`Buffer`](@ref), or any object for which `Buffer(data)` is defined.
241+
242+
# See also
243+
- [`MPI.File.read_ordered!`](@ref) for the collective operation
244+
245+
# External links
246+
$(_doc_external("MPI_File_read_shared"))
247+
"""
248+
function read_shared!(file::FileHandle, buf::Buffer)
249+
stat_ref = Ref{Status}(MPI.STATUS_EMPTY)
250+
# int MPI_File_read_shared(MPI_File fh, void *buf, int count,
251+
# MPI_Datatype datatype, MPI_Status *status)
252+
@mpichk ccall((:MPI_File_read_shared, libmpi), Cint,
253+
(MPI_File, MPIPtr, Cint, MPI_Datatype, Ptr{Status}),
254+
file, buf.data, buf.count, buf.datatype, stat_ref)
255+
return stat_ref[]
256+
end
257+
read_shared!(file::FileHandle, data) = read_shared!(file, Buffer(data))
258+
259+
"""
260+
MPI.File.write_shared(file::FileHandle, data)
261+
262+
Writes to `file` using the shared file pointer from `data`. `data` can be a
263+
[`Buffer`](@ref), or any object for which `Buffer(data)` is defined.
264+
265+
# See also
266+
- [`MPI.File.write_ordered`](@ref) for the noncollective operation
267+
268+
# External links
269+
$(_doc_external("MPI_File_write_shared"))
270+
"""
271+
function write_shared(file::FileHandle, buf::Buffer)
272+
stat_ref = Ref{Status}(MPI.STATUS_EMPTY)
273+
# int MPI_File_write_shared(MPI_File fh, const void *buf, int count,
274+
# MPI_Datatype datatype, MPI_Status *status)
275+
@mpichk ccall((:MPI_File_write_shared, libmpi), Cint,
276+
(MPI_File, MPIPtr, Cint, MPI_Datatype, Ptr{Status}),
277+
file, buf.data, buf.count, buf.datatype, stat_ref)
278+
return stat_ref[]
279+
end
280+
write_shared(file::FileHandle, buf) = write_shared(file, Buffer_send(buf))
281+
282+
# Shared collective operations
283+
"""
284+
MPI.File.read_ordered!(file::FileHandle, data)
285+
286+
Collectively reads in rank order from `file` using the shared file pointer into `data`.
287+
`data` can be a [`Buffer`](@ref), or any object for which `Buffer(data)` is defined. This
288+
is a collective operation, so must be called on all ranks in the communicator on which
289+
`file` was opened.
290+
291+
# See also
292+
- [`MPI.File.read_shared!`](@ref) for the non-collective operation
293+
294+
# External links
295+
$(_doc_external("MPI_File_read_ordered"))
296+
"""
297+
function read_ordered!(file::FileHandle, buf::Buffer)
298+
stat_ref = Ref{Status}(MPI.STATUS_EMPTY)
299+
# int MPI_File_read_ordered(MPI_File fh, void *buf, int count,
300+
# MPI_Datatype datatype, MPI_Status *status)
301+
@mpichk ccall((:MPI_File_read_ordered, libmpi), Cint,
302+
(MPI_File, MPIPtr, Cint, MPI_Datatype, Ptr{Status}),
303+
file, buf.data, buf.count, buf.datatype, stat_ref)
304+
return stat_ref[]
305+
end
306+
read_ordered!(file::FileHandle, data) = read_ordered!(file, Buffer(data))
307+
308+
"""
309+
MPI.File.write_ordered(file::FileHandle, data)
310+
311+
Collectively writes in rank order to `file` using the shared file pointer from `data`.
312+
`data` can be a [`Buffer`](@ref), or any object for which `Buffer(data)` is defined. This
313+
is a collective operation, so must be called on all ranks in the communicator on which
314+
`file` was opened.
315+
316+
# See also
317+
- [`MPI.File.write_shared`](@ref) for the noncollective operation
318+
319+
# External links
320+
$(_doc_external("MPI_File_write_ordered"))
321+
"""
322+
function write_ordered(file::FileHandle, buf::Buffer)
323+
stat_ref = Ref{Status}(MPI.STATUS_EMPTY)
324+
# int MPI_File_write_ordered(MPI_File fh, const void *buf, int count,
325+
# MPI_Datatype datatype, MPI_Status *status)
326+
@mpichk ccall((:MPI_File_write_ordered, libmpi), Cint,
327+
(MPI_File, MPIPtr, Cint, MPI_Datatype, Ptr{Status}),
328+
file, buf.data, buf.count, buf.datatype, stat_ref)
329+
return stat_ref[]
330+
end
331+
write_ordered(file::FileHandle, buf) = write_ordered(file, Buffer_send(buf))
215332

333+
# seek
334+
@enum Seek begin
335+
SEEK_SET = MPI.MPI_SEEK_SET
336+
SEEK_CUR = MPI.MPI_SEEK_CUR
337+
SEEK_END = MPI.MPI_SEEK_END
338+
end
339+
340+
"""
341+
MPI.File.seek_shared(file::FileHandle, offset::Integer, whence::Seek=SEEK_SET)
342+
343+
Updates the shared file pointer according to `whence`, which has the following possible
344+
values:
345+
346+
- `MPI.File.SEEK_SET` (default): the pointer is set to `offset`
347+
- `MPI.File.SEEK_CUR`: the pointer is set to the current pointer position plus `offset`
348+
- `MPI.File.SEEK_END`: the pointer is set to the end of file plus `offset`
349+
350+
This is a collective operation, and must be called with the same value on all processes in
351+
the communicator.
352+
353+
# External links
354+
$(_doc_external("MPI_File_seek_shared"))
355+
"""
356+
function seek_shared(file::FileHandle, offset::Integer, whence::Seek=SEEK_SET)
357+
# int MPI_File_seek_shared(MPI_File fh, MPI_Offset offset, int whence)
358+
@mpichk ccall((:MPI_File_seek_shared, libmpi), Cint,
359+
(MPI_File, MPI_Offset, Cint),
360+
file, offset, whence)
361+
end
362+
363+
"""
364+
MPI.File.get_position_shared(file::FileHandle)
365+
366+
The current position of the shared file pointer (in `etype` units) relative to the current view.
367+
368+
# External links
369+
$(_doc_external("MPI_File_get_position_shared"))
370+
"""
371+
function get_position_shared(file::FileHandle)
372+
r = Ref{MPI_Offset}()
373+
# int MPI_File_get_position_shared(MPI_File fh, MPI_Offset *offset)
374+
@mpichk ccall((:MPI_File_get_position_shared, libmpi), Cint,
375+
(MPI_File, Ptr{MPI_Offset}), file, r)
376+
return r[]
377+
end
216378

217379
end # module

test/test_io_shared.jl

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
using Test
2+
using MPI
3+
using Random
4+
5+
if get(ENV,"JULIA_MPI_TEST_ARRAYTYPE","") == "CuArray"
6+
using CuArrays
7+
ArrayType = CuArray
8+
else
9+
ArrayType = Array
10+
end
11+
12+
MPI.Init()
13+
comm = MPI.COMM_WORLD
14+
rank = MPI.Comm_rank(comm)
15+
sz = MPI.Comm_size(comm)
16+
filename = MPI.bcast(tempname(), 0, comm)
17+
18+
MPI.Barrier(comm)
19+
20+
# Collective write
21+
fh = MPI.File.open(comm, filename, read=true, write=true, create=true)
22+
@test MPI.File.get_position_shared(fh) == 0
23+
24+
header = "my header"
25+
26+
if rank == 0
27+
MPI.File.write_shared(fh, header)
28+
end
29+
30+
# TODO: is there a better way to synchronise shared pointers?
31+
MPI.Barrier(comm)
32+
33+
offset = MPI.File.get_position_shared(fh)
34+
@test offset == sizeof(header)
35+
byte_offset = MPI.File.get_byte_offset(fh, offset)
36+
@test byte_offset == offset
37+
38+
MPI.File.set_view!(fh, byte_offset, MPI.Datatype(Int64), MPI.Datatype(Int64))
39+
@test MPI.File.get_position_shared(fh) == 0
40+
41+
MPI.File.write_ordered(fh, fill(Int64(rank), rank+1))
42+
@test MPI.File.get_position_shared(fh) == sum(1:sz)
43+
44+
MPI.File.seek_shared(fh, 0)
45+
@test MPI.File.get_position_shared(fh) == 0
46+
47+
buf = zeros(Int64, rank+1)
48+
MPI.File.read_ordered!(fh, buf)
49+
@test buf == fill(Int64(rank), rank+1)
50+
51+
MPI.Barrier(comm)
52+
@test MPI.File.get_position_shared(fh) == sum(1:sz)
53+
54+
MPI.File.set_view!(fh, 0, MPI.Datatype(UInt8), MPI.Datatype(UInt8))
55+
MPI.File.seek_shared(fh, 0)
56+
@test MPI.File.get_position_shared(fh) == 0
57+
58+
if rank == sz-1
59+
buf = Array{UInt8}(undef, sizeof(header))
60+
MPI.File.read_shared!(fh, buf)
61+
@test String(buf) == header
62+
end
63+
64+
MPI.Barrier(comm)
65+
66+
@test MPI.File.get_position_shared(fh) == sizeof(header)
67+
68+
close(fh)

0 commit comments

Comments
 (0)