Skip to content

Commit e772af0

Browse files
committed
Add streaming API
1 parent 8d04144 commit e772af0

File tree

13 files changed

+1494
-8
lines changed

13 files changed

+1494
-8
lines changed

Project.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
1010
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
1111
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
1212
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
13-
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
1413
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
1514
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
1615
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ makedocs(;
2222
"Task Spawning" => "task-spawning.md",
2323
"Data Management" => "data-management.md",
2424
"Distributed Arrays" => "darray.md",
25+
"Streaming Tasks" => "streaming.md",
2526
"Scopes" => "scopes.md",
2627
"Processors" => "processors.md",
2728
"Task Queues" => "task-queues.md",

docs/src/streaming.md

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Streaming Tasks
2+
3+
Dagger tasks have a limited lifetime - they are created, execute, finish, and
4+
are eventually destroyed when they're no longer needed. Thus, if one wants
5+
to run the same kind of computations over and over, one might re-create a
6+
similar set of tasks for each unit of data that needs processing.
7+
8+
This might be fine for computations which take a long time to run (thus
9+
dwarfing the cost of task creation, which is quite small), or when working with
10+
a limited set of data, but this approach is not great for doing lots of small
11+
computations on a large (or endless) amount of data. For example, processing
12+
image frames from a webcam, reacting to messages from a message bus, reading
13+
samples from a software radio, etc. All of these tasks are better suited to a
14+
"streaming" model of data processing, where data is simply piped into a
15+
continuously-running task (or DAG of tasks) forever, or until the data runs
16+
out.
17+
18+
Thankfully, if you have a problem which is best modeled as a streaming system
19+
of tasks, Dagger has you covered! Building on its support for
20+
[Task Queues](@ref), Dagger provides a means to convert an entire DAG of
21+
tasks into a streaming DAG, where data flows into and out of each task
22+
asynchronously, using the `spawn_streaming` function:
23+
24+
```julia
25+
Dagger.spawn_streaming() do # enters a streaming region
26+
vals = Dagger.@spawn rand()
27+
print_vals = Dagger.@spawn println(vals)
28+
end # exits the streaming region, and starts the DAG running
29+
```
30+
31+
In the above example, `vals` is a Dagger task which has been transformed to run
32+
in a streaming manner - instead of just calling `rand()` once and returning its
33+
result, it will re-run `rand()` endlessly, continuously producing new random
34+
values. In typical Dagger style, `print_vals` is a Dagger task which depends on
35+
`vals`, but in streaming form - it will continuously `println` the random
36+
values produced from `vals`. Both tasks will run forever, and will run
37+
efficiently, only doing the work necessary to generate, transfer, and consume
38+
values.
39+
40+
As the comments point out, `spawn_streaming` creates a streaming region, during
41+
which `vals` and `print_vals` are created and configured. Both tasks are halted
42+
until `spawn_streaming` returns, allowing large DAGs to be built all at once,
43+
without any task losing a single value. If desired, streaming regions can be
44+
connected, although some values might be lost while tasks are being connected:
45+
46+
```julia
47+
vals = Dagger.spawn_streaming() do
48+
Dagger.@spawn rand()
49+
end
50+
51+
# Some values might be generated by `vals` but thrown away
52+
# before `print_vals` is fully setup and connected to it
53+
54+
print_vals = Dagger.spawn_streaming() do
55+
Dagger.@spawn println(vals)
56+
end
57+
```
58+
59+
More complicated streaming DAGs can be easily constructed, without doing
60+
anything different. For example, we can generate multiple streams of random
61+
numbers, write them all to their own files, and print the combined results:
62+
63+
```julia
64+
Dagger.spawn_streaming() do
65+
all_vals = [Dagger.spawn(rand) for i in 1:4]
66+
all_vals_written = map(1:4) do i
67+
Dagger.spawn(all_vals[i]) do val
68+
open("results_$i.txt"; write=true, create=true, append=true) do io
69+
println(io, repr(val))
70+
end
71+
return val
72+
end
73+
end
74+
Dagger.spawn(all_vals_written...) do all_vals_written...
75+
vals_sum = sum(all_vals_written)
76+
println(vals_sum)
77+
end
78+
end
79+
```
80+
81+
If you want to stop the streaming DAG and tear it all down, you can call
82+
`Dagger.kill!(all_vals[1])` (or `Dagger.kill!(all_vals_written[2])`, etc., the
83+
kill propagates throughout the DAG).
84+
85+
Alternatively, tasks can stop themselves from the inside with
86+
`finish_streaming`, optionally returning a value that can be `fetch`'d. Let's
87+
do this when our randomly-drawn number falls within some arbitrary range:
88+
89+
```julia
90+
vals = Dagger.spawn_streaming() do
91+
Dagger.spawn() do
92+
x = rand()
93+
if x < 0.001
94+
# That's good enough, let's be done
95+
return Dagger.finish_streaming("Finished!")
96+
end
97+
return x
98+
end
99+
end
100+
fetch(vals)
101+
```
102+
103+
In this example, the call to `fetch` will hang (while random numbers continue
104+
to be drawn), until a drawn number is less than 0.001; at that point, `fetch`
105+
will return with "Finished!", and the task `vals` will have terminated.

src/Dagger.jl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ if !isdefined(Base, :ScopedValues)
2323
else
2424
import Base.ScopedValues: ScopedValue, with
2525
end
26-
2726
import TaskLocalValues: TaskLocalValue
2827

2928
if !isdefined(Base, :get_extension)
@@ -69,6 +68,11 @@ include("sch/Sch.jl"); using .Sch
6968
# Data dependency task queue
7069
include("datadeps.jl")
7170

71+
# Streaming
72+
include("stream.jl")
73+
include("stream-buffers.jl")
74+
include("stream-transfer.jl")
75+
7276
# Array computations
7377
include("array/darray.jl")
7478
include("array/alloc.jl")

src/sch/Sch.jl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,11 @@ end
253253
Combine `SchedulerOptions` and `ThunkOptions` into a new `ThunkOptions`.
254254
"""
255255
function Base.merge(sopts::SchedulerOptions, topts::ThunkOptions)
256-
single = topts.single !== nothing ? topts.single : sopts.single
257-
allow_errors = topts.allow_errors !== nothing ? topts.allow_errors : sopts.allow_errors
258-
proclist = topts.proclist !== nothing ? topts.proclist : sopts.proclist
256+
select_option = (sopt, topt) -> isnothing(topt) ? sopt : topt
257+
258+
single = select_option(sopts.single, topts.single)
259+
allow_errors = select_option(sopts.allow_errors, topts.allow_errors)
260+
proclist = select_option(sopts.proclist, topts.proclist)
259261
ThunkOptions(single,
260262
proclist,
261263
topts.time_util,

src/sch/eager.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ function eager_cleanup(state, uid)
124124
# N.B. cache and errored expire automatically
125125
delete!(state.thunk_dict, tid)
126126
end
127+
remotecall_wait(1, uid) do uid
128+
lock(Dagger.EAGER_THUNK_STREAMS) do global_streams
129+
if haskey(global_streams, uid)
130+
delete!(global_streams, uid)
131+
end
132+
end
133+
end
127134
end
128135

129136
function _find_thunk(e::Dagger.DTask)

src/stream-buffers.jl

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""
2+
A buffer that drops all elements put into it.
3+
"""
4+
mutable struct DropBuffer{T}
5+
open::Bool
6+
DropBuffer{T}() where T = new{T}(true)
7+
end
8+
DropBuffer{T}(_) where T = DropBuffer{T}()
9+
Base.isempty(::DropBuffer) = true
10+
isfull(::DropBuffer) = false
11+
capacity(::DropBuffer) = typemax(Int)
12+
Base.length(::DropBuffer) = 0
13+
Base.isopen(buf::DropBuffer) = buf.open
14+
function Base.close(buf::DropBuffer)
15+
buf.open = false
16+
end
17+
function Base.put!(buf::DropBuffer, _)
18+
if !isopen(buf)
19+
throw(InvalidStateException("DropBuffer is closed", :closed))
20+
end
21+
task_may_cancel!(; must_force=true)
22+
yield()
23+
return
24+
end
25+
function Base.take!(buf::DropBuffer)
26+
while true
27+
if !isopen(buf)
28+
throw(InvalidStateException("DropBuffer is closed", :closed))
29+
end
30+
task_may_cancel!(; must_force=true)
31+
yield()
32+
end
33+
end
34+
35+
"A process-local ring buffer."
36+
mutable struct ProcessRingBuffer{T}
37+
read_idx::Int
38+
write_idx::Int
39+
@atomic count::Int
40+
buffer::Vector{T}
41+
@atomic open::Bool
42+
function ProcessRingBuffer{T}(len::Int=1024) where T
43+
buffer = Vector{T}(undef, len)
44+
return new{T}(1, 1, 0, buffer, true)
45+
end
46+
end
47+
Base.isempty(rb::ProcessRingBuffer) = (@atomic rb.count) == 0
48+
isfull(rb::ProcessRingBuffer) = (@atomic rb.count) == length(rb.buffer)
49+
capacity(rb::ProcessRingBuffer) = length(rb.buffer)
50+
Base.length(rb::ProcessRingBuffer) = @atomic rb.count
51+
Base.isopen(rb::ProcessRingBuffer) = @atomic rb.open
52+
function Base.close(rb::ProcessRingBuffer)
53+
@atomic rb.open = false
54+
end
55+
function Base.put!(rb::ProcessRingBuffer{T}, x) where T
56+
while isfull(rb)
57+
yield()
58+
if !isopen(rb)
59+
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
60+
end
61+
task_may_cancel!(; must_force=true)
62+
end
63+
to_write_idx = mod1(rb.write_idx, length(rb.buffer))
64+
rb.buffer[to_write_idx] = convert(T, x)
65+
rb.write_idx += 1
66+
@atomic rb.count += 1
67+
end
68+
function Base.take!(rb::ProcessRingBuffer)
69+
while isempty(rb)
70+
yield()
71+
if !isopen(rb) && isempty(rb)
72+
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
73+
end
74+
if task_cancelled() && isempty(rb)
75+
# We respect a graceful cancellation only if the buffer is empty.
76+
# Otherwise, we may have values to continue communicating.
77+
task_may_cancel!()
78+
end
79+
task_may_cancel!(; must_force=true)
80+
end
81+
to_read_idx = rb.read_idx
82+
rb.read_idx += 1
83+
@atomic rb.count -= 1
84+
to_read_idx = mod1(to_read_idx, length(rb.buffer))
85+
return rb.buffer[to_read_idx]
86+
end
87+
88+
"""
89+
`take!()` all the elements from a buffer and put them in a `Vector`.
90+
"""
91+
function collect!(rb::ProcessRingBuffer{T}) where T
92+
output = Vector{T}(undef, rb.count)
93+
for i in 1:rb.count
94+
output[i] = take!(rb)
95+
end
96+
97+
return output
98+
end

src/stream-transfer.jl

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
struct RemoteChannelFetcher
2+
chan::RemoteChannel
3+
RemoteChannelFetcher() = new(RemoteChannel())
4+
end
5+
const _THEIR_TID = TaskLocalValue{Int}(()->0)
6+
function stream_push_values!(fetcher::RemoteChannelFetcher, T, our_store::StreamStore, their_stream::Stream, buffer)
7+
our_tid = STREAM_THUNK_ID[]
8+
our_uid = our_store.uid
9+
their_uid = their_stream.uid
10+
if _THEIR_TID[] == 0
11+
_THEIR_TID[] = remotecall_fetch(1) do
12+
lock(Sch.EAGER_ID_MAP) do id_map
13+
id_map[their_uid]
14+
end
15+
end
16+
end
17+
their_tid = _THEIR_TID[]
18+
@dagdebug our_tid :stream_push "taking output value: $our_tid -> $their_tid"
19+
value = try
20+
take!(buffer)
21+
catch
22+
close(fetcher.chan)
23+
rethrow()
24+
end
25+
@lock our_store.lock notify(our_store.lock)
26+
@dagdebug our_tid :stream_push "pushing output value: $our_tid -> $their_tid"
27+
try
28+
put!(fetcher.chan, value)
29+
catch err
30+
if err isa InvalidStateException && !isopen(fetcher.chan)
31+
@dagdebug our_tid :stream_push "channel closed: $our_tid -> $their_tid"
32+
throw(InterruptException())
33+
end
34+
rethrow(err)
35+
end
36+
@dagdebug our_tid :stream_push "finished pushing output value: $our_tid -> $their_tid"
37+
end
38+
function stream_pull_values!(fetcher::RemoteChannelFetcher, T, our_store::StreamStore, their_stream::Stream, buffer)
39+
our_tid = STREAM_THUNK_ID[]
40+
our_uid = our_store.uid
41+
their_uid = their_stream.uid
42+
if _THEIR_TID[] == 0
43+
_THEIR_TID[] = remotecall_fetch(1) do
44+
lock(Sch.EAGER_ID_MAP) do id_map
45+
id_map[their_uid]
46+
end
47+
end
48+
end
49+
their_tid = _THEIR_TID[]
50+
@dagdebug our_tid :stream_pull "pulling input value: $their_tid -> $our_tid"
51+
value = try
52+
take!(fetcher.chan)
53+
catch err
54+
if err isa InvalidStateException && !isopen(fetcher.chan)
55+
@dagdebug our_tid :stream_pull "channel closed: $their_tid -> $our_tid"
56+
throw(InterruptException())
57+
end
58+
rethrow(err)
59+
end
60+
@dagdebug our_tid :stream_pull "putting input value: $their_tid -> $our_tid"
61+
try
62+
put!(buffer, value)
63+
catch
64+
close(fetcher.chan)
65+
rethrow()
66+
end
67+
@lock our_store.lock notify(our_store.lock)
68+
@dagdebug our_tid :stream_pull "finished putting input value: $their_tid -> $our_tid"
69+
end
70+
71+
#= TODO: Remove me
72+
# This is a bad implementation because it wants to sleep on the remote side to
73+
# wait for values, but this isn't semantically valid when done with MemPool.access_ref
74+
struct RemoteFetcher end
75+
function stream_push_values!(::Type{RemoteFetcher}, T, our_store::StreamStore, their_stream::Stream, buffer)
76+
sleep(1)
77+
end
78+
function stream_pull_values!(::Type{RemoteFetcher}, T, our_store::StreamStore, their_stream::Stream, buffer)
79+
id = our_store.uid
80+
thunk_id = STREAM_THUNK_ID[]
81+
@dagdebug thunk_id :stream "fetching values"
82+
83+
free_space = capacity(buffer) - length(buffer)
84+
if free_space == 0
85+
@dagdebug thunk_id :stream "waiting for drain of full input buffer"
86+
yield()
87+
task_may_cancel!()
88+
wait_for_nonfull_input(our_store, their_stream.uid)
89+
return
90+
end
91+
92+
values = T[]
93+
while isempty(values)
94+
values, closed = MemPool.access_ref(their_stream.store_ref.handle, id, T, thunk_id, free_space) do their_store, id, T, thunk_id, free_space
95+
@dagdebug thunk_id :stream "trying to fetch values at worker $(myid())"
96+
STREAM_THUNK_ID[] = thunk_id
97+
values = T[]
98+
@dagdebug thunk_id :stream "trying to fetch with free_space: $free_space"
99+
wait_for_nonempty_output(their_store, id)
100+
if isempty(their_store, id) && !isopen(their_store, id)
101+
@dagdebug thunk_id :stream "remote stream is closed, returning"
102+
return values, true
103+
end
104+
while !isempty(their_store, id) && length(values) < free_space
105+
value = take!(their_store, id)::T
106+
@dagdebug thunk_id :stream "fetched $value"
107+
push!(values, value)
108+
end
109+
return values, false
110+
end::Tuple{Vector{T},Bool}
111+
if closed
112+
throw(InterruptException())
113+
end
114+
115+
# We explicitly yield in the loop to allow other tasks to run. This
116+
# matters on single-threaded instances because MemPool.access_ref()
117+
# might not yield when accessing data locally, which can cause this loop
118+
# to spin forever.
119+
yield()
120+
task_may_cancel!()
121+
end
122+
123+
@dagdebug thunk_id :stream "fetched $(length(values)) values"
124+
for value in values
125+
put!(buffer, value)
126+
end
127+
end
128+
=#

0 commit comments

Comments
 (0)