Skip to content

Add streaming throughput monitor #494

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

julia_version = "1.8.5"
manifest_format = "2.0"
project_hash = "5333a6c200b6e6add81c46547527f66ddc0dc16c"
project_hash = "1e12d6aa088ae431916872c11d09544380c7a130"

[[deps.Artifacts]]
uuid = "56f22d72-fd6d-98f1-02f0-08ddc0907c33"
Expand All @@ -12,9 +12,9 @@ uuid = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"

[[deps.ChainRulesCore]]
deps = ["Compat", "LinearAlgebra", "SparseArrays"]
git-tree-sha1 = "b66b8f8e3db5d7835fb8cbe2589ffd1cd456e491"
git-tree-sha1 = "575cd02e080939a33b6df6c5853d14924c08e35b"
uuid = "d360d2e6-b24c-11e9-a2a3-2a2ae2dbcce4"
version = "1.17.0"
version = "1.23.0"

[[deps.ChangesOfVariables]]
deps = ["InverseFunctions", "LinearAlgebra", "Test"]
Expand All @@ -23,26 +23,26 @@ uuid = "9e997f8a-9a97-42d5-a9f1-ce6bfc15e2c0"
version = "0.1.8"

[[deps.Compat]]
deps = ["Dates", "LinearAlgebra", "UUIDs"]
git-tree-sha1 = "8a62af3e248a8c4bad6b32cbbe663ae02275e32c"
deps = ["Dates", "LinearAlgebra", "TOML", "UUIDs"]
git-tree-sha1 = "c955881e3c981181362ae4088b35995446298b80"
uuid = "34da2185-b29b-5c13-b0c7-acf172513d20"
version = "4.10.0"
version = "4.14.0"

[[deps.CompilerSupportLibraries_jll]]
deps = ["Artifacts", "Libdl"]
uuid = "e66e0078-7015-5450-92f7-15fbd957f2ae"
version = "1.0.1+0"

[[deps.DataAPI]]
git-tree-sha1 = "8da84edb865b0b5b0100c0666a9bc9a0b71c553c"
git-tree-sha1 = "abe83f3a2f1b857aac70ef8b269080af17764bbe"
uuid = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
version = "1.15.0"
version = "1.16.0"

[[deps.DataStructures]]
deps = ["Compat", "InteractiveUtils", "OrderedCollections"]
git-tree-sha1 = "3dbd312d370723b6bb43ba9d02fc36abade4518d"
git-tree-sha1 = "0f4b5d62a88d8f59003e43c25a8a90de9eb76317"
uuid = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
version = "0.18.15"
version = "0.18.18"

[[deps.Dates]]
deps = ["Printf"]
Expand Down Expand Up @@ -91,28 +91,28 @@ uuid = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"

[[deps.LogExpFunctions]]
deps = ["ChainRulesCore", "ChangesOfVariables", "DocStringExtensions", "InverseFunctions", "IrrationalConstants", "LinearAlgebra"]
git-tree-sha1 = "7d6dd4e9212aebaeed356de34ccf262a3cd415aa"
git-tree-sha1 = "18144f3e9cbe9b15b070288eef858f71b291ce37"
uuid = "2ab3a3ac-af41-5b50-aa03-7779005ae688"
version = "0.3.26"
version = "0.3.27"

[[deps.Logging]]
uuid = "56ddb016-857b-54e1-b83d-db4d58db5568"

[[deps.MacroTools]]
deps = ["Markdown", "Random"]
git-tree-sha1 = "9ee1618cbf5240e6d4e0371d6f24065083f60c48"
git-tree-sha1 = "2fa9ee3e63fd3a4f7a9a4f4744a52f4856de82df"
uuid = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
version = "0.5.11"
version = "0.5.13"

[[deps.Markdown]]
deps = ["Base64"]
uuid = "d6f4376e-aef5-505a-96c1-9c027394607a"

[[deps.MemPool]]
deps = ["DataStructures", "Distributed", "Mmap", "Random", "Serialization", "Sockets"]
git-tree-sha1 = "b9c1a032c3c1310a857c061ce487c632eaa1faa4"
deps = ["DataStructures", "Distributed", "Mmap", "Random", "ScopedValues", "Serialization", "Sockets"]
git-tree-sha1 = "60dd4ac427d39e0b3f15b193845324523ee71c03"
uuid = "f9f48841-c794-520a-933b-121f7ba6ed94"
version = "0.4.4"
version = "0.4.6"

[[deps.Missings]]
deps = ["DataAPI"]
Expand All @@ -133,9 +133,9 @@ uuid = "4536629a-c528-5b80-bd46-f80d51c5b363"
version = "0.3.20+0"

[[deps.OrderedCollections]]
git-tree-sha1 = "2e73fe17cac3c62ad1aebe70d44c963c3cfdc3e3"
git-tree-sha1 = "dfdf5519f235516220579f949664f1bf44e741c5"
uuid = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
version = "1.6.2"
version = "1.6.3"

[[deps.PrecompileTools]]
deps = ["Preferences"]
Expand All @@ -145,9 +145,9 @@ version = "1.2.0"

[[deps.Preferences]]
deps = ["TOML"]
git-tree-sha1 = "00805cd429dcb4870060ff49ef443486c262e38e"
git-tree-sha1 = "9306f6085165d270f7e3db02af26a400d580f5c6"
uuid = "21216c6a-2e73-6563-6e65-726566657250"
version = "1.4.1"
version = "1.4.3"

[[deps.Printf]]
deps = ["Unicode"]
Expand All @@ -173,9 +173,9 @@ version = "0.7.0"

[[deps.ScopedValues]]
deps = ["HashArrayMappedTries", "Logging"]
git-tree-sha1 = "e3b5e4ccb1702db2ae9ac2a660d4b6b2a8595742"
git-tree-sha1 = "c27d546a4749c81f70d1fabd604da6aa5054e3d2"
uuid = "7e506255-f358-4e82-b7e4-beb19740aa63"
version = "1.1.0"
version = "1.2.0"

[[deps.Serialization]]
uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Expand All @@ -189,9 +189,9 @@ uuid = "6462fe0b-24de-5631-8697-dd941f90decc"

[[deps.SortingAlgorithms]]
deps = ["DataStructures"]
git-tree-sha1 = "c60ec5c62180f27efea3ba2908480f8055e17cee"
git-tree-sha1 = "66e0a8e672a0bdfca2c3f5937efb8538b9ddc085"
uuid = "a2af1166-a08f-5f64-846c-94a0d3cef48c"
version = "1.1.1"
version = "1.2.1"

[[deps.SparseArrays]]
deps = ["LinearAlgebra", "Random"]
Expand Down
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ makedocs(;
"Task Spawning" => "task-spawning.md",
"Data Management" => "data-management.md",
"Distributed Arrays" => "darray.md",
"Streaming Tasks" => "streaming.md",
"Scopes" => "scopes.md",
"Processors" => "processors.md",
"Task Queues" => "task-queues.md",
Expand Down
105 changes: 105 additions & 0 deletions docs/src/streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Streaming Tasks

Dagger tasks have a limited lifetime - they are created, execute, finish, and
are eventually destroyed when they're no longer needed. Thus, if one wants
to run the same kind of computations over and over, one might re-create a
similar set of tasks for each unit of data that needs processing.

This might be fine for computations which take a long time to run (thus
dwarfing the cost of task creation, which is quite small), or when working with
a limited set of data, but this approach is not great for doing lots of small
computations on a large (or endless) amount of data. For example, processing
image frames from a webcam, reacting to messages from a message bus, reading
samples from a software radio, etc. All of these tasks are better suited to a
"streaming" model of data processing, where data is simply piped into a
continuously-running task (or DAG of tasks) forever, or until the data runs
out.

Thankfully, if you have a problem which is best modeled as a streaming system
of tasks, Dagger has you covered! Building on its support for
["Task Queues"](@ref), Dagger provides a means to convert an entire DAG of
tasks into a streaming DAG, where data flows into and out of each task
asynchronously, using the `spawn_streaming` function:

```julia
Dagger.spawn_streaming() do # enters a streaming region
vals = Dagger.@spawn rand()
print_vals = Dagger.@spawn println(vals)
end # exits the streaming region, and starts the DAG running
```

In the above example, `vals` is a Dagger task which has been transformed to run
in a streaming manner - instead of just calling `rand()` once and returning its
result, it will re-run `rand()` endlessly, continuously producing new random
values. In typical Dagger style, `print_vals` is a Dagger task which depends on
`vals`, but in streaming form - it will continuously `println` the random
values produced from `vals`. Both tasks will run forever, and will run
efficiently, only doing the work necessary to generate, transfer, and consume
values.

As the comments point out, `spawn_streaming` creates a streaming region, during
which `vals` and `print_vals` are created and configured. Both tasks are halted
until `spawn_streaming` returns, allowing large DAGs to be built all at once,
without any task losing a single value. If desired, streaming regions can be
connected, although some values might be lost while tasks are being connected:

```julia
vals = Dagger.spawn_streaming() do
Dagger.@spawn rand()
end

# Some values might be generated by `vals` but thrown away
# before `print_vals` is fully setup and connected to it

print_vals = Dagger.spawn_streaming() do
Dagger.@spawn println(vals)
end
```

More complicated streaming DAGs can be easily constructed, without doing
anything different. For example, we can generate multiple streams of random
numbers, write them all to their own files, and print the combined results:

```julia
Dagger.spawn_streaming() do
all_vals = [Dagger.spawn(rand) for i in 1:4]
all_vals_written = map(1:4) do i
Dagger.spawn(all_vals[i]) do val
open("results_$i.txt"; write=true, create=true, append=true) do io
println(io, repr(val))
end
return val
end
end
Dagger.spawn(all_vals_written...) do all_vals_written...
vals_sum = sum(all_vals_written)
println(vals_sum)
end
end
```

If you want to stop the streaming DAG and tear it all down, you can call
`Dagger.kill!(all_vals[1])` (or `Dagger.kill!(all_vals_written[2])`, etc., the
kill propagates throughout the DAG).

Alternatively, tasks can stop themselves from the inside with
`finish_streaming`, optionally returning a value that can be `fetch`'d. Let's
do this when our randomly-drawn number falls within some arbitrary range:

```julia
vals = Dagger.spawn_streaming() do
Dagger.spawn() do
x = rand()
if x < 0.001
# That's good enough, let's be done
return Dagger.finish_streaming("Finished!")
end
return x
end
end
fetch(vals)
```

In this example, the call to `fetch` will hang (while random numbers continue
to be drawn), until a drawn number is less than 0.001; at that point, `fetch`
will return with "Finished!", and the task `vals` will have terminated.
6 changes: 6 additions & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ include("utils/system_uuid.jl")
include("utils/caching.jl")
include("sch/Sch.jl"); using .Sch

# Streaming
include("stream-buffers.jl")
include("stream-fetchers.jl")
include("stream-utils.jl")
include("stream.jl")

# Array computations
include("array/darray.jl")
include("array/alloc.jl")
Expand Down
14 changes: 13 additions & 1 deletion src/eager_thunk.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ end
Options(;options...) = Options((;options...))
Options(options...) = Options((;options...))

"""
EagerThunkMetadata

Represents some useful metadata pertaining to an `EagerThunk`:
- `return_type::Type` - The inferred return type of the task
"""
mutable struct EagerThunkMetadata
return_type::Type
end

"""
EagerThunk

Expand All @@ -39,9 +49,11 @@ be `fetch`'d or `wait`'d on at any time.
mutable struct EagerThunk
uid::UInt
future::ThunkFuture
metadata::EagerThunkMetadata
finalizer_ref::DRef
thunk_ref::DRef
EagerThunk(uid, future, finalizer_ref) = new(uid, future, finalizer_ref)
EagerThunk(uid, future, metadata, finalizer_ref) =
new(uid, future, metadata, finalizer_ref)
end

Base.isready(t::EagerThunk) = isready(t.future)
Expand Down
7 changes: 7 additions & 0 deletions src/sch/eager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ function eager_cleanup(state, uid)
# N.B. cache and errored expire automatically
delete!(state.thunk_dict, tid)
end
remotecall_wait(1, uid) do uid
lock(Dagger.EAGER_THUNK_STREAMS) do global_streams
if haskey(global_streams, uid)
delete!(global_streams, uid)
end
end
end
end

function _find_thunk(e::Dagger.EagerThunk)
Expand Down
19 changes: 13 additions & 6 deletions src/sch/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,19 @@ function has_capacity(state, p, gp, time_util, alloc_util, occupancy, sig)
else
get(state.signature_alloc_cost, sig, UInt64(0))
end::UInt64
est_occupancy = if occupancy !== nothing && haskey(occupancy, T)
# Clamp to 0-1, and scale between 0 and `typemax(UInt32)`
Base.unsafe_trunc(UInt32, clamp(occupancy[T], 0, 1) * typemax(UInt32))
else
typemax(UInt32)
end::UInt32
est_occupancy::UInt32 = typemax(UInt32)
if occupancy !== nothing
occ = nothing
if haskey(occupancy, T)
occ = occupancy[T]
elseif haskey(occupancy, Any)
occ = occupancy[Any]
end
if occ !== nothing
# Clamp to 0-1, and scale between 0 and `typemax(UInt32)`
est_occupancy = Base.unsafe_trunc(UInt32, clamp(occ, 0, 1) * typemax(UInt32))
end
end
#= FIXME: Estimate if cached data can be swapped to storage
storage = storage_resource(p)
real_alloc_util = state.worker_storage_pressure[gp][storage]
Expand Down
Loading