Skip to content

Commit 62f8307

Browse files
authored
Merge pull request #463 from JuliaParallel/jps/stream2
Add streaming API
2 parents 4c51d84 + 3656030 commit 62f8307

30 files changed

+1838
-102
lines changed

.buildkite/pipeline.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
os: linux
77
arch: x86_64
88
command: "julia --project -e 'using Pkg; Pkg.develop(;path=\"lib/TimespanLogging\")'"
9+
910
.bench: &bench
1011
if: build.message =~ /\[run benchmarks\]/
1112
agents:
@@ -14,6 +15,7 @@
1415
os: linux
1516
arch: x86_64
1617
num_cpus: 16
18+
1719
steps:
1820
- label: Julia 1.9
1921
timeout_in_minutes: 90

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ makedocs(;
2222
"Task Spawning" => "task-spawning.md",
2323
"Data Management" => "data-management.md",
2424
"Distributed Arrays" => "darray.md",
25+
"Streaming Tasks" => "streaming.md",
2526
"Scopes" => "scopes.md",
2627
"Processors" => "processors.md",
2728
"Task Queues" => "task-queues.md",

docs/src/index.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,38 @@ Dagger.@spawn copyto!(C, X)
394394

395395
In contrast to the previous example, here, the tasks are executed without argument annotations. As a result, there is a possibility of the `copyto!` task being executed before the `sort!` task, leading to unexpected results in the output array `C`.
396396

397+
## Quickstart: Streaming
398+
399+
Dagger.jl provides a streaming API that allows you to process data in a streaming fashion, where data is processed as it becomes available, rather than waiting for the entire dataset to be loaded into memory.
400+
401+
For more details: [Streaming](@ref)
402+
403+
### Syntax
404+
405+
The `Dagger.spawn_streaming()` function is used to create a streaming region,
406+
where tasks are executed continuously, processing data as it becomes available:
407+
408+
```julia
409+
# Open a file to write to on this worker
410+
f = Dagger.@mutable open("output.txt", "w")
411+
t = Dagger.spawn_streaming() do
412+
# Generate random numbers continuously
413+
val = Dagger.@spawn rand()
414+
# Write each random number to a file
415+
Dagger.@spawn (f, val) -> begin
416+
if val < 0.01
417+
# Finish streaming when the random number is less than 0.01
418+
Dagger.finish_stream()
419+
end
420+
println(f, val)
421+
end
422+
end
423+
# Wait for all values to be generated and written
424+
wait(t)
425+
```
426+
427+
The above example demonstrates a streaming region that generates random numbers
428+
continuously and writes each random number to a file. The streaming region is
429+
terminated when a random number less than 0.01 is generated, which is done by
430+
calling `Dagger.finish_stream()` (this terminates the current task, and will
431+
also terminate all streaming tasks launched by `spawn_streaming`).

docs/src/streaming.md

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Streaming
2+
3+
Dagger tasks have a limited lifetime - they are created, execute, finish, and
4+
are eventually destroyed when they're no longer needed. Thus, if one wants
5+
to run the same kind of computations over and over, one might re-create a
6+
similar set of tasks for each unit of data that needs processing.
7+
8+
This might be fine for computations which take a long time to run (thus
9+
dwarfing the cost of task creation, which is quite small), or when working with
10+
a limited set of data, but this approach is not great for doing lots of small
11+
computations on a large (or endless) amount of data. For example, processing
12+
image frames from a webcam, reacting to messages from a message bus, reading
13+
samples from a software radio, etc. All of these tasks are better suited to a
14+
"streaming" model of data processing, where data is simply piped into a
15+
continuously-running task (or DAG of tasks) forever, or until the data runs
16+
out.
17+
18+
Thankfully, if you have a problem which is best modeled as a streaming system
19+
of tasks, Dagger has you covered! Building on its support for
20+
[Task Queues](@ref), Dagger provides a means to convert an entire DAG of
21+
tasks into a streaming DAG, where data flows into and out of each task
22+
asynchronously, using the `spawn_streaming` function:
23+
24+
```julia
25+
Dagger.spawn_streaming() do # enters a streaming region
26+
vals = Dagger.@spawn rand()
27+
print_vals = Dagger.@spawn println(vals)
28+
end # exits the streaming region, and starts the DAG running
29+
```
30+
31+
In the above example, `vals` is a Dagger task which has been transformed to run
32+
in a streaming manner - instead of just calling `rand()` once and returning its
33+
result, it will re-run `rand()` endlessly, continuously producing new random
34+
values. In typical Dagger style, `print_vals` is a Dagger task which depends on
35+
`vals`, but in streaming form - it will continuously `println` the random
36+
values produced from `vals`. Both tasks will run forever, and will run
37+
efficiently, only doing the work necessary to generate, transfer, and consume
38+
values.
39+
40+
As the comments point out, `spawn_streaming` creates a streaming region, during
41+
which `vals` and `print_vals` are created and configured. Both tasks are halted
42+
until `spawn_streaming` returns, allowing large DAGs to be built all at once,
43+
without any task losing a single value. If desired, streaming regions can be
44+
connected, although some values might be lost while tasks are being connected:
45+
46+
```julia
47+
vals = Dagger.spawn_streaming() do
48+
Dagger.@spawn rand()
49+
end
50+
51+
# Some values might be generated by `vals` but thrown away
52+
# before `print_vals` is fully setup and connected to it
53+
54+
print_vals = Dagger.spawn_streaming() do
55+
Dagger.@spawn println(vals)
56+
end
57+
```
58+
59+
More complicated streaming DAGs can be easily constructed, without doing
60+
anything different. For example, we can generate multiple streams of random
61+
numbers, write them all to their own files, and print the combined results:
62+
63+
```julia
64+
Dagger.spawn_streaming() do
65+
all_vals = [Dagger.spawn(rand) for i in 1:4]
66+
all_vals_written = map(1:4) do i
67+
Dagger.spawn(all_vals[i]) do val
68+
open("results_$i.txt"; write=true, create=true, append=true) do io
69+
println(io, repr(val))
70+
end
71+
return val
72+
end
73+
end
74+
Dagger.spawn(all_vals_written...) do all_vals_written...
75+
vals_sum = sum(all_vals_written)
76+
println(vals_sum)
77+
end
78+
end
79+
```
80+
81+
If you want to stop the streaming DAG and tear it all down, you can call
82+
`Dagger.cancel!(all_vals[1])` (or with any other task in the streaming DAG) to
83+
terminate all streaming tasks.
84+
85+
Alternatively, tasks can stop themselves from the inside with
86+
`finish_stream`, optionally returning a value that can be `fetch`'d. Let's
87+
do this when our randomly-drawn number falls within some arbitrary range:
88+
89+
```julia
90+
vals = Dagger.spawn_streaming() do
91+
Dagger.spawn() do
92+
x = rand()
93+
if x < 0.001
94+
# That's good enough, let's be done
95+
return Dagger.finish_stream("Finished!")
96+
end
97+
return x
98+
end
99+
end
100+
fetch(vals)
101+
```
102+
103+
In this example, the call to `fetch` will hang (while random numbers continue
104+
to be drawn), until a drawn number is less than 0.001; at that point, `fetch`
105+
will return with `"Finished!"`, and the task `vals` will have terminated.

src/Dagger.jl

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ if !isdefined(Base, :ScopedValues)
2121
else
2222
import Base.ScopedValues: ScopedValue, with
2323
end
24+
import TaskLocalValues: TaskLocalValue
2425

2526
if !isdefined(Base, :get_extension)
2627
import Requires: @require
@@ -55,16 +56,16 @@ include("processor.jl")
5556
include("threadproc.jl")
5657
include("context.jl")
5758
include("utils/processors.jl")
59+
include("dtask.jl")
60+
include("cancellation.jl")
5861
include("task-tls.jl")
5962
include("scopes.jl")
6063
include("utils/scopes.jl")
61-
include("dtask.jl")
6264
include("queue.jl")
6365
include("thunk.jl")
6466
include("submission.jl")
6567
include("chunks.jl")
6668
include("memory-spaces.jl")
67-
include("cancellation.jl")
6869

6970
# Task scheduling
7071
include("compute.jl")
@@ -76,6 +77,11 @@ include("sch/Sch.jl"); using .Sch
7677
# Data dependency task queue
7778
include("datadeps.jl")
7879

80+
# Streaming
81+
include("stream.jl")
82+
include("stream-buffers.jl")
83+
include("stream-transfer.jl")
84+
7985
# Array computations
8086
include("array/darray.jl")
8187
include("array/alloc.jl")
@@ -169,6 +175,20 @@ function __init__()
169175
ThreadProc(myid(), tid)
170176
end
171177
end
178+
179+
# Set up @dagdebug categories, if specified
180+
try
181+
if haskey(ENV, "JULIA_DAGGER_DEBUG")
182+
empty!(DAGDEBUG_CATEGORIES)
183+
for category in split(ENV["JULIA_DAGGER_DEBUG"], ",")
184+
if category != ""
185+
push!(DAGDEBUG_CATEGORIES, Symbol(category))
186+
end
187+
end
188+
end
189+
catch err
190+
@warn "Error parsing JULIA_DAGGER_DEBUG" exception=err
191+
end
172192
end
173193

174194
end # module

src/array/indexing.jl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import TaskLocalValues: TaskLocalValue
2-
31
### getindex
42

53
struct GetIndex{T,N} <: ArrayOp{T,N}

src/cancellation.jl

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,61 @@
1+
# DTask-level cancellation
2+
3+
mutable struct CancelToken
4+
@atomic cancelled::Bool
5+
@atomic graceful::Bool
6+
event::Base.Event
7+
end
8+
CancelToken() = CancelToken(false, false, Base.Event())
9+
function cancel!(token::CancelToken; graceful::Bool=true)
10+
if !graceful
11+
@atomic token.graceful = false
12+
end
13+
@atomic token.cancelled = true
14+
notify(token.event)
15+
return
16+
end
17+
function is_cancelled(token::CancelToken; must_force::Bool=false)
18+
if token.cancelled[]
19+
if must_force && token.graceful[]
20+
# If we're only responding to forced cancellation, ignore graceful cancellations
21+
return false
22+
end
23+
return true
24+
end
25+
return false
26+
end
27+
Base.wait(token::CancelToken) = wait(token.event)
28+
# TODO: Enable this for safety
29+
#Serialization.serialize(io::AbstractSerializer, ::CancelToken) =
30+
# throw(ConcurrencyViolationError("Cannot serialize a CancelToken"))
31+
32+
const DTASK_CANCEL_TOKEN = TaskLocalValue{Union{CancelToken,Nothing}}(()->nothing)
33+
34+
function clone_cancel_token_remote(orig_token::CancelToken, wid::Integer)
35+
remote_token = remotecall_fetch(wid) do
36+
return poolset(CancelToken())
37+
end
38+
errormonitor_tracked("remote cancel_token communicator", Threads.@spawn begin
39+
wait(orig_token)
40+
@dagdebug nothing :cancel "Cancelling remote token on worker $wid"
41+
MemPool.access_ref(remote_token) do remote_token
42+
cancel!(remote_token)
43+
end
44+
end)
45+
end
46+
47+
# Global-level cancellation
48+
149
"""
2-
cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)
50+
cancel!(task::DTask; force::Bool=false, graceful::Bool=true, halt_sch::Bool=false)
351
452
Cancels `task` at any point in its lifecycle, causing the scheduler to abandon
5-
it. If `force` is `true`, the task will be interrupted with an
6-
`InterruptException` (not recommended, this is unsafe). If `halt_sch` is
7-
`true`, the scheduler will be halted after the task is cancelled (it will
8-
restart automatically upon the next `@spawn`/`spawn` call).
53+
it.
54+
55+
# Keyword arguments
56+
- `force`: If `true`, the task will be interrupted with an `InterruptException` (not recommended, this is unsafe).
57+
- `graceful`: If `true`, the task will be allowed to finish its current execution before being cancelled; otherwise, it will be cancelled as soon as possible.
58+
- `halt_sch`: If `true`, the scheduler will be halted after the task is cancelled (it will restart automatically upon the next `@spawn`/`spawn` call).
959
1060
As an example, the following code will cancel task `t` before it finishes
1161
executing:
@@ -21,24 +71,24 @@ tasks which are waiting to run. Using `cancel!` is generally a much safer
2171
alternative to Ctrl+C, as it cooperates with the scheduler and runtime and
2272
avoids unintended side effects.
2373
"""
24-
function cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)
74+
function cancel!(task::DTask; force::Bool=false, graceful::Bool=true, halt_sch::Bool=false)
2575
tid = lock(Dagger.Sch.EAGER_ID_MAP) do id_map
2676
id_map[task.uid]
2777
end
28-
cancel!(tid; force, halt_sch)
78+
cancel!(tid; force, graceful, halt_sch)
2979
end
3080
function cancel!(tid::Union{Int,Nothing}=nothing;
31-
force::Bool=false, halt_sch::Bool=false)
81+
force::Bool=false, graceful::Bool=true, halt_sch::Bool=false)
3282
remotecall_fetch(1, tid, force, halt_sch) do tid, force, halt_sch
3383
state = Sch.EAGER_STATE[]
3484

3585
# Check that the scheduler isn't stopping or has already stopped
3686
if !isnothing(state) && !state.halt.set
37-
@lock state.lock _cancel!(state, tid, force, halt_sch)
87+
@lock state.lock _cancel!(state, tid, force, graceful, halt_sch)
3888
end
3989
end
4090
end
41-
function _cancel!(state, tid, force, halt_sch)
91+
function _cancel!(state, tid, force, graceful, halt_sch)
4292
@assert islocked(state.lock)
4393

4494
# Get the scheduler uid
@@ -48,7 +98,7 @@ function _cancel!(state, tid, force, halt_sch)
4898
for task in state.ready
4999
tid !== nothing && task.id != tid && continue
50100
@dagdebug tid :cancel "Cancelling ready task"
51-
state.cache[task] = InterruptException()
101+
state.cache[task] = DTaskFailedException(task, task, InterruptException())
52102
state.errored[task] = true
53103
Sch.set_failed!(state, task)
54104
end
@@ -58,7 +108,7 @@ function _cancel!(state, tid, force, halt_sch)
58108
for task in keys(state.waiting)
59109
tid !== nothing && task.id != tid && continue
60110
@dagdebug tid :cancel "Cancelling waiting task"
61-
state.cache[task] = InterruptException()
111+
state.cache[task] = DTaskFailedException(task, task, InterruptException())
62112
state.errored[task] = true
63113
Sch.set_failed!(state, task)
64114
end
@@ -80,11 +130,11 @@ function _cancel!(state, tid, force, halt_sch)
80130
Tf === typeof(Sch.eager_thunk) && continue
81131
istaskdone(task) && continue
82132
any_cancelled = true
83-
@dagdebug tid :cancel "Cancelling running task ($Tf)"
84133
if force
85134
@dagdebug tid :cancel "Interrupting running task ($Tf)"
86135
Threads.@spawn Base.throwto(task, InterruptException())
87136
else
137+
@dagdebug tid :cancel "Cancelling running task ($Tf)"
88138
# Tell the processor to just drop this task
89139
task_occupancy = task_spec[4]
90140
time_util = task_spec[2]
@@ -93,6 +143,7 @@ function _cancel!(state, tid, force, halt_sch)
93143
push!(istate.cancelled, tid)
94144
to_proc = istate.proc
95145
put!(istate.return_queue, (myid(), to_proc, tid, (InterruptException(), nothing)))
146+
cancel!(istate.cancel_tokens[tid]; graceful)
96147
end
97148
end
98149
end

src/compute.jl

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,6 @@ end
3636
Base.@deprecate gather(ctx, x) collect(ctx, x)
3737
Base.@deprecate gather(x) collect(x)
3838

39-
cleanup() = cleanup(Context(global_context()))
40-
function cleanup(ctx::Context)
41-
Sch.cleanup(ctx)
42-
nothing
43-
end
44-
4539
function get_type(s::String)
4640
local T
4741
for t in split(s, ".")

0 commit comments

Comments
 (0)