@@ -29,7 +29,6 @@ mutable struct WindowIO <: IO
29
29
header:: BufferHeader
30
30
remote_header:: BufferHeader
31
31
header_win:: Win
32
- header_cwin:: CWin
33
32
is_open:: Bool
34
33
# Current read position
35
34
ptr:: WinCountT
@@ -56,7 +55,6 @@ mutable struct WindowIO <: IO
56
55
header,
57
56
remote_header,
58
57
header_win,
59
- CWin (header_win),
60
58
true ,
61
59
0 ,
62
60
Condition (),
@@ -92,23 +90,23 @@ function has_data_available(w::WindowIO)
92
90
return false
93
91
end
94
92
95
- if w. header. count > w. ptr && w. header. needed_length == w. header. length # fast check without window sync
96
- return true
97
- end
98
-
99
- # Check if we need to grow the buffer
100
- MPI. Win_lock (MPI. LOCK_EXCLUSIVE, w. myrank, 0 , w. header_win)
101
- MPI. Win_sync (w. header_cwin) # CWin version doesn't allocate
102
- if w. header. needed_length > w. header. length
93
+ MPI. Win_lock (MPI. LOCK_SHARED, w. myrank, 0 , w. header_win)
94
+ have_data = w. header. count > w. ptr
95
+ need_grow = w. header. needed_length > w. header. length
96
+ MPI. Win_unlock (w. myrank, w. header_win)
97
+
98
+ # Grow buffer if needed
99
+ if need_grow
103
100
MPI. Win_detach (w. win, w. buffer)
104
101
resize! (w. buffer, w. header. needed_length)
105
102
MPI. Win_attach (w. win, w. buffer)
103
+ MPI. Win_lock (MPI. LOCK_EXCLUSIVE, w. myrank, 0 , w. header_win)
106
104
w. header. address = MPI. Get_address (w. buffer)
107
105
w. header. length = w. header. needed_length
106
+ MPI. Win_unlock (w. myrank, w. header_win)
108
107
end
109
- MPI. Win_unlock (w. myrank, w. header_win)
110
108
111
- return w . header . count > w . ptr
109
+ return have_data
112
110
end
113
111
114
112
function Base. wait (w:: WindowIO )
126
124
127
125
# wait until the specified number of bytes is available or the stream is closed
128
126
function wait_nb_available (w, nb)
129
- nb_found = wait_nb_available (w)
127
+ nb_found = 0
130
128
while nb_found < nb && w. is_open
131
- MPI. Win_lock (MPI. LOCK_SHARED, w. myrank, 0 , w. header_win)
132
- MPI. Win_sync (w. header_cwin) # sync every loop, to make sure we get updates
133
- MPI. Win_unlock (w. myrank, w. header_win)
134
129
nb_found = wait_nb_available (w)
135
130
end
136
131
return nb_found
0 commit comments