Skip to content

Commit 96bad48

Browse files
jpsamarooJamesWrigley
authored andcommitted
streaming: Get tests passing
Switch from RemoteFetcher to RemoteChannelFetcher Pass object rather than type to `stream_{push,pull}_values!` ProcessRingBuffer: Don't exit on graceful interrupt when non-empty
1 parent 43bd51f commit 96bad48

File tree

5 files changed

+259
-240
lines changed

5 files changed

+259
-240
lines changed

src/Dagger.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ include("sch/Sch.jl"); using .Sch
7171
include("datadeps.jl")
7272

7373
# Streaming
74+
include("stream.jl")
7475
include("stream-buffers.jl")
7576
include("stream-transfer.jl")
76-
include("stream.jl")
7777

7878
# Array computations
7979
include("array/darray.jl")

src/stream-buffers.jl

Lines changed: 44 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,35 @@
11
"""
2-
A buffer that drops all elements put into it. Only to be used as the output
3-
buffer for a task - will throw if attached as an input.
2+
A buffer that drops all elements put into it.
43
"""
5-
struct DropBuffer{T} end
4+
mutable struct DropBuffer{T}
5+
open::Bool
6+
DropBuffer{T}() where T = new{T}(true)
7+
end
68
DropBuffer{T}(_) where T = DropBuffer{T}()
79
Base.isempty(::DropBuffer) = true
810
isfull(::DropBuffer) = false
9-
Base.put!(::DropBuffer, _) = nothing
10-
Base.take!(::DropBuffer) = error("Cannot `take!` from a DropBuffer")
11-
12-
"A process-local buffer backed by a `Channel{T}`."
13-
struct ChannelBuffer{T}
14-
channel::Channel{T}
15-
len::Int
16-
count::Threads.Atomic{Int}
17-
ChannelBuffer{T}(len::Int=1024) where T =
18-
new{T}(Channel{T}(len), len, Threads.Atomic{Int}(0))
19-
end
20-
Base.isempty(cb::ChannelBuffer) = isempty(cb.channel)
21-
isfull(cb::ChannelBuffer) = cb.count[] == cb.len
22-
function Base.put!(cb::ChannelBuffer{T}, x) where T
23-
put!(cb.channel, convert(T, x))
24-
Threads.atomic_add!(cb.count, 1)
25-
end
26-
function Base.take!(cb::ChannelBuffer)
27-
take!(cb.channel)
28-
Threads.atomic_sub!(cb.count, 1)
29-
end
30-
31-
"A cross-worker buffer backed by a `RemoteChannel{T}`."
32-
struct RemoteChannelBuffer{T}
33-
channel::RemoteChannel{Channel{T}}
34-
len::Int
35-
count::Threads.Atomic{Int}
36-
RemoteChannelBuffer{T}(len::Int=1024) where T =
37-
new{T}(RemoteChannel(()->Channel{T}(len)), len, Threads.Atomic{Int}(0))
38-
end
39-
Base.isempty(cb::RemoteChannelBuffer) = isempty(cb.channel)
40-
isfull(cb::RemoteChannelBuffer) = cb.count[] == cb.len
41-
function Base.put!(cb::RemoteChannelBuffer{T}, x) where T
42-
put!(cb.channel, convert(T, x))
43-
Threads.atomic_add!(cb.count, 1)
44-
end
45-
function Base.take!(cb::RemoteChannelBuffer)
46-
take!(cb.channel)
47-
Threads.atomic_sub!(cb.count, 1)
11+
capacity(::DropBuffer) = typemax(Int)
12+
Base.length(::DropBuffer) = 0
13+
Base.isopen(buf::DropBuffer) = buf.open
14+
function Base.close(buf::DropBuffer)
15+
buf.open = false
16+
end
17+
function Base.put!(buf::DropBuffer, _)
18+
if !isopen(buf)
19+
throw(InvalidStateException("DropBuffer is closed", :closed))
20+
end
21+
task_may_cancel!(; must_force=true)
22+
yield()
23+
return
24+
end
25+
function Base.take!(buf::DropBuffer)
26+
while true
27+
if !isopen(buf)
28+
throw(InvalidStateException("DropBuffer is closed", :closed))
29+
end
30+
task_may_cancel!(; must_force=true)
31+
yield()
32+
end
4833
end
4934

5035
"A process-local ring buffer."
@@ -53,40 +38,45 @@ mutable struct ProcessRingBuffer{T}
5338
write_idx::Int
5439
@atomic count::Int
5540
buffer::Vector{T}
56-
open::Bool
41+
@atomic open::Bool
5742
function ProcessRingBuffer{T}(len::Int=1024) where T
5843
buffer = Vector{T}(undef, len)
5944
return new{T}(1, 1, 0, buffer, true)
6045
end
6146
end
6247
Base.isempty(rb::ProcessRingBuffer) = (@atomic rb.count) == 0
6348
isfull(rb::ProcessRingBuffer) = (@atomic rb.count) == length(rb.buffer)
49+
capacity(rb::ProcessRingBuffer) = length(rb.buffer)
6450
Base.length(rb::ProcessRingBuffer) = @atomic rb.count
65-
Base.isopen(rb::ProcessRingBuffer) = rb.open
51+
Base.isopen(rb::ProcessRingBuffer) = @atomic rb.open
6652
function Base.close(rb::ProcessRingBuffer)
67-
rb.open = false
53+
@atomic rb.open = false
6854
end
6955
function Base.put!(rb::ProcessRingBuffer{T}, x) where T
70-
len = length(rb.buffer)
71-
while (@atomic rb.count) == len
56+
while isfull(rb)
7257
yield()
7358
if !isopen(rb)
74-
throw(InvalidStateException("Stream is closed", :closed))
59+
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
7560
end
76-
task_may_cancel!()
61+
task_may_cancel!(; must_force=true)
7762
end
78-
to_write_idx = mod1(rb.write_idx, len)
63+
to_write_idx = mod1(rb.write_idx, length(rb.buffer))
7964
rb.buffer[to_write_idx] = convert(T, x)
8065
rb.write_idx += 1
8166
@atomic rb.count += 1
8267
end
8368
function Base.take!(rb::ProcessRingBuffer)
84-
while (@atomic rb.count) == 0
69+
while isempty(rb)
8570
yield()
86-
if !isopen(rb)
87-
throw(InvalidStateException("Stream is closed", :closed))
71+
if !isopen(rb) && isempty(rb)
72+
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
8873
end
89-
task_may_cancel!()
74+
if task_cancelled() && isempty(rb)
75+
# We respect a graceful cancellation only if the buffer is empty.
76+
# Otherwise, we may have values to continue communicating.
77+
task_may_cancel!()
78+
end
79+
task_may_cancel!(; must_force=true)
9080
end
9181
to_read_idx = rb.read_idx
9282
rb.read_idx += 1
@@ -106,123 +96,3 @@ function collect!(rb::ProcessRingBuffer{T}) where T
10696

10797
return output
10898
end
109-
110-
#= TODO
111-
"A server-local ring buffer backed by shared-memory."
112-
mutable struct ServerRingBuffer{T}
113-
read_idx::Int
114-
write_idx::Int
115-
@atomic count::Int
116-
buffer::Vector{T}
117-
function ServerRingBuffer{T}(len::Int=1024) where T
118-
buffer = Vector{T}(undef, len)
119-
return new{T}(1, 1, 0, buffer)
120-
end
121-
end
122-
Base.isempty(rb::ServerRingBuffer) = (@atomic rb.count) == 0
123-
function Base.put!(rb::ServerRingBuffer{T}, x) where T
124-
len = length(rb.buffer)
125-
while (@atomic rb.count) == len
126-
yield()
127-
end
128-
to_write_idx = mod1(rb.write_idx, len)
129-
rb.buffer[to_write_idx] = convert(T, x)
130-
rb.write_idx += 1
131-
@atomic rb.count += 1
132-
end
133-
function Base.take!(rb::ServerRingBuffer)
134-
while (@atomic rb.count) == 0
135-
yield()
136-
end
137-
to_read_idx = rb.read_idx
138-
rb.read_idx += 1
139-
@atomic rb.count -= 1
140-
to_read_idx = mod1(to_read_idx, length(rb.buffer))
141-
return rb.buffer[to_read_idx]
142-
end
143-
=#
144-
145-
#=
146-
"A TCP-based ring buffer."
147-
mutable struct TCPRingBuffer{T}
148-
read_idx::Int
149-
write_idx::Int
150-
@atomic count::Int
151-
buffer::Vector{T}
152-
function TCPRingBuffer{T}(len::Int=1024) where T
153-
buffer = Vector{T}(undef, len)
154-
return new{T}(1, 1, 0, buffer)
155-
end
156-
end
157-
Base.isempty(rb::TCPRingBuffer) = (@atomic rb.count) == 0
158-
function Base.put!(rb::TCPRingBuffer{T}, x) where T
159-
len = length(rb.buffer)
160-
while (@atomic rb.count) == len
161-
yield()
162-
end
163-
to_write_idx = mod1(rb.write_idx, len)
164-
rb.buffer[to_write_idx] = convert(T, x)
165-
rb.write_idx += 1
166-
@atomic rb.count += 1
167-
end
168-
function Base.take!(rb::TCPRingBuffer)
169-
while (@atomic rb.count) == 0
170-
yield()
171-
end
172-
to_read_idx = rb.read_idx
173-
rb.read_idx += 1
174-
@atomic rb.count -= 1
175-
to_read_idx = mod1(to_read_idx, length(rb.buffer))
176-
return rb.buffer[to_read_idx]
177-
end
178-
=#
179-
180-
#=
181-
"""
182-
A flexible puller which switches to the most efficient buffer type based
183-
on the sender and receiver locations.
184-
"""
185-
mutable struct UniBuffer{T}
186-
buffer::Union{ProcessRingBuffer{T}, Nothing}
187-
end
188-
function initialize_stream_buffer!(::Type{UniBuffer{T}}, T, send_proc, recv_proc, buffer_amount) where T
189-
if buffer_amount == 0
190-
error("Return NullBuffer")
191-
end
192-
send_osproc = get_parent(send_proc)
193-
recv_osproc = get_parent(recv_proc)
194-
if send_osproc.pid == recv_osproc.pid
195-
inner = RingBuffer{T}(buffer_amount)
196-
elseif system_uuid(send_osproc.pid) == system_uuid(recv_osproc.pid)
197-
inner = ProcessBuffer{T}(buffer_amount)
198-
else
199-
inner = RemoteBuffer{T}(buffer_amount)
200-
end
201-
return UniBuffer{T}(buffer_amount)
202-
end
203-
204-
struct LocalPuller{T,B}
205-
buffer::B{T}
206-
id::UInt
207-
function LocalPuller{T,B}(id::UInt, buffer_amount::Integer) where {T,B}
208-
buffer = initialize_stream_buffer!(B, T, buffer_amount)
209-
return new{T,B}(buffer, id)
210-
end
211-
end
212-
function Base.take!(pull::LocalPuller{T,B}) where {T,B}
213-
if pull.buffer === nothing
214-
pull.buffer =
215-
error("Return NullBuffer")
216-
end
217-
value = take!(pull.buffer)
218-
end
219-
function initialize_input_stream!(stream::Stream{T,B}, id::UInt, send_proc::Processor, recv_proc::Processor, buffer_amount::Integer) where {T,B}
220-
local_buffer = remotecall_fetch(stream.ref.handle.owner, stream.ref.handle, id) do ref, id
221-
local_buffer, remote_buffer = initialize_stream_buffer!(B, T, send_proc, recv_proc, buffer_amount)
222-
ref.buffers[id] = remote_buffer
223-
return local_buffer
224-
end
225-
stream.buffer = local_buffer
226-
return stream
227-
end
228-
=#

0 commit comments

Comments
 (0)