1
1
"""
2
- A buffer that drops all elements put into it. Only to be used as the output
3
- buffer for a task - will throw if attached as an input.
2
+ A buffer that drops all elements put into it.
4
3
"""
5
- struct DropBuffer{T} end
4
+ mutable struct DropBuffer{T}
5
+ open:: Bool
6
+ DropBuffer {T} () where T = new {T} (true )
7
+ end
6
8
DropBuffer {T} (_) where T = DropBuffer {T} ()
7
9
Base. isempty (:: DropBuffer ) = true
8
10
isfull (:: DropBuffer ) = false
9
- Base. put! (:: DropBuffer , _) = nothing
10
- Base. take! (:: DropBuffer ) = error (" Cannot `take!` from a DropBuffer" )
11
-
12
- " A process-local buffer backed by a `Channel{T}`."
13
- struct ChannelBuffer{T}
14
- channel:: Channel{T}
15
- len:: Int
16
- count:: Threads.Atomic{Int}
17
- ChannelBuffer {T} (len:: Int = 1024 ) where T =
18
- new {T} (Channel {T} (len), len, Threads. Atomic {Int} (0 ))
19
- end
20
- Base. isempty (cb:: ChannelBuffer ) = isempty (cb. channel)
21
- isfull (cb:: ChannelBuffer ) = cb. count[] == cb. len
22
- function Base. put! (cb:: ChannelBuffer{T} , x) where T
23
- put! (cb. channel, convert (T, x))
24
- Threads. atomic_add! (cb. count, 1 )
25
- end
26
- function Base. take! (cb:: ChannelBuffer )
27
- take! (cb. channel)
28
- Threads. atomic_sub! (cb. count, 1 )
29
- end
30
-
31
- " A cross-worker buffer backed by a `RemoteChannel{T}`."
32
- struct RemoteChannelBuffer{T}
33
- channel:: RemoteChannel{Channel{T}}
34
- len:: Int
35
- count:: Threads.Atomic{Int}
36
- RemoteChannelBuffer {T} (len:: Int = 1024 ) where T =
37
- new {T} (RemoteChannel (()-> Channel {T} (len)), len, Threads. Atomic {Int} (0 ))
38
- end
39
- Base. isempty (cb:: RemoteChannelBuffer ) = isempty (cb. channel)
40
- isfull (cb:: RemoteChannelBuffer ) = cb. count[] == cb. len
41
- function Base. put! (cb:: RemoteChannelBuffer{T} , x) where T
42
- put! (cb. channel, convert (T, x))
43
- Threads. atomic_add! (cb. count, 1 )
44
- end
45
- function Base. take! (cb:: RemoteChannelBuffer )
46
- take! (cb. channel)
47
- Threads. atomic_sub! (cb. count, 1 )
11
+ capacity (:: DropBuffer ) = typemax (Int)
12
+ Base. length (:: DropBuffer ) = 0
13
+ Base. isopen (buf:: DropBuffer ) = buf. open
14
+ function Base. close (buf:: DropBuffer )
15
+ buf. open = false
16
+ end
17
+ function Base. put! (buf:: DropBuffer , _)
18
+ if ! isopen (buf)
19
+ throw (InvalidStateException (" DropBuffer is closed" , :closed ))
20
+ end
21
+ task_may_cancel! (; must_force= true )
22
+ yield ()
23
+ return
24
+ end
25
+ function Base. take! (buf:: DropBuffer )
26
+ while true
27
+ if ! isopen (buf)
28
+ throw (InvalidStateException (" DropBuffer is closed" , :closed ))
29
+ end
30
+ task_may_cancel! (; must_force= true )
31
+ yield ()
32
+ end
48
33
end
49
34
50
35
" A process-local ring buffer."
@@ -53,40 +38,45 @@ mutable struct ProcessRingBuffer{T}
53
38
write_idx:: Int
54
39
@atomic count:: Int
55
40
buffer:: Vector{T}
56
- open:: Bool
41
+ @atomic open:: Bool
57
42
function ProcessRingBuffer {T} (len:: Int = 1024 ) where T
58
43
buffer = Vector {T} (undef, len)
59
44
return new {T} (1 , 1 , 0 , buffer, true )
60
45
end
61
46
end
62
47
Base. isempty (rb:: ProcessRingBuffer ) = (@atomic rb. count) == 0
63
48
isfull (rb:: ProcessRingBuffer ) = (@atomic rb. count) == length (rb. buffer)
49
+ capacity (rb:: ProcessRingBuffer ) = length (rb. buffer)
64
50
Base. length (rb:: ProcessRingBuffer ) = @atomic rb. count
65
- Base. isopen (rb:: ProcessRingBuffer ) = rb. open
51
+ Base. isopen (rb:: ProcessRingBuffer ) = @atomic rb. open
66
52
function Base. close (rb:: ProcessRingBuffer )
67
- rb. open = false
53
+ @atomic rb. open = false
68
54
end
69
55
function Base. put! (rb:: ProcessRingBuffer{T} , x) where T
70
- len = length (rb. buffer)
71
- while (@atomic rb. count) == len
56
+ while isfull (rb)
72
57
yield ()
73
58
if ! isopen (rb)
74
- throw (InvalidStateException (" Stream is closed" , :closed ))
59
+ throw (InvalidStateException (" ProcessRingBuffer is closed" , :closed ))
75
60
end
76
- task_may_cancel! ()
61
+ task_may_cancel! (; must_force = true )
77
62
end
78
- to_write_idx = mod1 (rb. write_idx, len )
63
+ to_write_idx = mod1 (rb. write_idx, length (rb . buffer) )
79
64
rb. buffer[to_write_idx] = convert (T, x)
80
65
rb. write_idx += 1
81
66
@atomic rb. count += 1
82
67
end
83
68
function Base. take! (rb:: ProcessRingBuffer )
84
- while ( @atomic rb . count) == 0
69
+ while isempty (rb)
85
70
yield ()
86
- if ! isopen (rb)
87
- throw (InvalidStateException (" Stream is closed" , :closed ))
71
+ if ! isopen (rb) && isempty (rb)
72
+ throw (InvalidStateException (" ProcessRingBuffer is closed" , :closed ))
88
73
end
89
- task_may_cancel! ()
74
+ if task_cancelled () && isempty (rb)
75
+ # We respect a graceful cancellation only if the buffer is empty.
76
+ # Otherwise, we may have values to continue communicating.
77
+ task_may_cancel! ()
78
+ end
79
+ task_may_cancel! (; must_force= true )
90
80
end
91
81
to_read_idx = rb. read_idx
92
82
rb. read_idx += 1
@@ -106,123 +96,3 @@ function collect!(rb::ProcessRingBuffer{T}) where T
106
96
107
97
return output
108
98
end
109
-
110
- #= TODO
111
- "A server-local ring buffer backed by shared-memory."
112
- mutable struct ServerRingBuffer{T}
113
- read_idx::Int
114
- write_idx::Int
115
- @atomic count::Int
116
- buffer::Vector{T}
117
- function ServerRingBuffer{T}(len::Int=1024) where T
118
- buffer = Vector{T}(undef, len)
119
- return new{T}(1, 1, 0, buffer)
120
- end
121
- end
122
- Base.isempty(rb::ServerRingBuffer) = (@atomic rb.count) == 0
123
- function Base.put!(rb::ServerRingBuffer{T}, x) where T
124
- len = length(rb.buffer)
125
- while (@atomic rb.count) == len
126
- yield()
127
- end
128
- to_write_idx = mod1(rb.write_idx, len)
129
- rb.buffer[to_write_idx] = convert(T, x)
130
- rb.write_idx += 1
131
- @atomic rb.count += 1
132
- end
133
- function Base.take!(rb::ServerRingBuffer)
134
- while (@atomic rb.count) == 0
135
- yield()
136
- end
137
- to_read_idx = rb.read_idx
138
- rb.read_idx += 1
139
- @atomic rb.count -= 1
140
- to_read_idx = mod1(to_read_idx, length(rb.buffer))
141
- return rb.buffer[to_read_idx]
142
- end
143
- =#
144
-
145
- #=
146
- "A TCP-based ring buffer."
147
- mutable struct TCPRingBuffer{T}
148
- read_idx::Int
149
- write_idx::Int
150
- @atomic count::Int
151
- buffer::Vector{T}
152
- function TCPRingBuffer{T}(len::Int=1024) where T
153
- buffer = Vector{T}(undef, len)
154
- return new{T}(1, 1, 0, buffer)
155
- end
156
- end
157
- Base.isempty(rb::TCPRingBuffer) = (@atomic rb.count) == 0
158
- function Base.put!(rb::TCPRingBuffer{T}, x) where T
159
- len = length(rb.buffer)
160
- while (@atomic rb.count) == len
161
- yield()
162
- end
163
- to_write_idx = mod1(rb.write_idx, len)
164
- rb.buffer[to_write_idx] = convert(T, x)
165
- rb.write_idx += 1
166
- @atomic rb.count += 1
167
- end
168
- function Base.take!(rb::TCPRingBuffer)
169
- while (@atomic rb.count) == 0
170
- yield()
171
- end
172
- to_read_idx = rb.read_idx
173
- rb.read_idx += 1
174
- @atomic rb.count -= 1
175
- to_read_idx = mod1(to_read_idx, length(rb.buffer))
176
- return rb.buffer[to_read_idx]
177
- end
178
- =#
179
-
180
- #=
181
- """
182
- A flexible puller which switches to the most efficient buffer type based
183
- on the sender and receiver locations.
184
- """
185
- mutable struct UniBuffer{T}
186
- buffer::Union{ProcessRingBuffer{T}, Nothing}
187
- end
188
- function initialize_stream_buffer!(::Type{UniBuffer{T}}, T, send_proc, recv_proc, buffer_amount) where T
189
- if buffer_amount == 0
190
- error("Return NullBuffer")
191
- end
192
- send_osproc = get_parent(send_proc)
193
- recv_osproc = get_parent(recv_proc)
194
- if send_osproc.pid == recv_osproc.pid
195
- inner = RingBuffer{T}(buffer_amount)
196
- elseif system_uuid(send_osproc.pid) == system_uuid(recv_osproc.pid)
197
- inner = ProcessBuffer{T}(buffer_amount)
198
- else
199
- inner = RemoteBuffer{T}(buffer_amount)
200
- end
201
- return UniBuffer{T}(buffer_amount)
202
- end
203
-
204
- struct LocalPuller{T,B}
205
- buffer::B{T}
206
- id::UInt
207
- function LocalPuller{T,B}(id::UInt, buffer_amount::Integer) where {T,B}
208
- buffer = initialize_stream_buffer!(B, T, buffer_amount)
209
- return new{T,B}(buffer, id)
210
- end
211
- end
212
- function Base.take!(pull::LocalPuller{T,B}) where {T,B}
213
- if pull.buffer === nothing
214
- pull.buffer =
215
- error("Return NullBuffer")
216
- end
217
- value = take!(pull.buffer)
218
- end
219
- function initialize_input_stream!(stream::Stream{T,B}, id::UInt, send_proc::Processor, recv_proc::Processor, buffer_amount::Integer) where {T,B}
220
- local_buffer = remotecall_fetch(stream.ref.handle.owner, stream.ref.handle, id) do ref, id
221
- local_buffer, remote_buffer = initialize_stream_buffer!(B, T, send_proc, recv_proc, buffer_amount)
222
- ref.buffers[id] = remote_buffer
223
- return local_buffer
224
- end
225
- stream.buffer = local_buffer
226
- return stream
227
- end
228
- =#
0 commit comments