Skip to content

Commit 76355e6

Browse files
johnmaxrinjpsamaroo
andcommitted
Add Task Monitor
Co-authored-by: Julian Samaroo <jpsamaroo@gmail.com>
1 parent 9bd2961 commit 76355e6

File tree

5 files changed

+65
-14
lines changed

5 files changed

+65
-14
lines changed

Project.toml

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
1212
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
1313
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
1414
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"
15+
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
1516
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
1617
Requires = "ae029012-a4dd-5104-9daa-d747884805df"
1718
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
@@ -20,9 +21,21 @@ SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383"
2021
SparseArrays = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"
2122
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
2223
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
24+
TaskLocalValues = "ed4db957-447d-4319-bfb6-7fa9ae7ecf34"
2325
TimespanLogging = "a526e669-04d3-4846-9525-c66122c55f63"
2426
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2527

28+
[weakdeps]
29+
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
30+
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
31+
GraphViz = "f526b714-d49f-11e8-06ff-31ed36ee7ee0"
32+
Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80"
33+
34+
[extensions]
35+
GraphVizExt = "GraphViz"
36+
GraphVizSimpleExt = "Colors"
37+
PlotsExt = ["DataFrames", "Plots"]
38+
2639
[compat]
2740
DataStructures = "0.18"
2841
Graphs = "1"
@@ -34,22 +47,12 @@ Requires = "1"
3447
ScopedValues = "1.1"
3548
Statistics = "1"
3649
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33, 0.34"
50+
TaskLocalValues = "0.1"
3751
TimespanLogging = "0.1"
3852
julia = "1.8"
3953

40-
[extensions]
41-
GraphVizSimpleExt = "Colors"
42-
GraphVizExt = "GraphViz"
43-
PlotsExt = ["DataFrames", "Plots"]
44-
4554
[extras]
4655
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
4756
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
4857
GraphViz = "f526b714-d49f-11e8-06ff-31ed36ee7ee0"
4958
Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80"
50-
51-
[weakdeps]
52-
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
53-
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
54-
GraphViz = "f526b714-d49f-11e8-06ff-31ed36ee7ee0"
55-
Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80"

src/Dagger.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ include("scopes.jl")
4141
include("utils/scopes.jl")
4242
include("eager_thunk.jl")
4343
include("queue.jl")
44+
include("task_monitor.jl")
4445
include("thunk.jl")
4546
include("submission.jl")
4647
include("chunks.jl")

src/queue.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ enqueue!(::EagerTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
1515
eager_launch!(specs)
1616

1717
enqueue!(spec::Pair{EagerTaskSpec,EagerThunk}) =
18-
enqueue!(get_options(:task_queue, EagerTaskQueue()), spec)
18+
enqueue!(get_options(:task_queue, MONITOR_QUEUE[]), spec)
1919
enqueue!(specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
20-
enqueue!(get_options(:task_queue, EagerTaskQueue()), specs)
20+
enqueue!(get_options(:task_queue, MONITOR_QUEUE[]), specs)
2121

2222
struct LazyTaskQueue <: AbstractTaskQueue
2323
tasks::Vector{Pair{EagerTaskSpec,EagerThunk}}

src/task_monitor.jl

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using ProgressMeter
2+
using TaskLocalValues
3+
4+
struct MonitorTaskQueue <: AbstractTaskQueue
5+
running_tasks::Vector{WeakRef}
6+
MonitorTaskQueue() = new(WeakRef[])
7+
end
8+
function enqueue!(queue::MonitorTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk})
9+
push!(queue.running_tasks, WeakRef(spec[2]))
10+
upper = get_options(:task_queue, EagerTaskQueue())
11+
enqueue!(upper, spec)
12+
end
13+
14+
function enqueue!(queue::MonitorTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}})
15+
for (_, task) in specs
16+
push!(queue.running_tasks, WeakRef(task))
17+
end
18+
upper = get_options(:task_queue, EagerTaskQueue())
19+
enqueue!(upper, specs)
20+
end
21+
22+
const MONITOR_QUEUE = TaskLocalValue{MonitorTaskQueue}(MonitorTaskQueue)
23+
24+
"Monitors and displays the progress of any still-executing tasks."
25+
function monitor()
26+
queue = MONITOR_QUEUE[]
27+
running_tasks = queue.running_tasks
28+
isempty(running_tasks) && return
29+
30+
ntasks = length(running_tasks)
31+
meter = Progress(ntasks;
32+
desc="Waiting for $ntasks tasks...",
33+
dt=0.01, showspeed=true)
34+
while !isempty(running_tasks)
35+
for (i, task_weak) in reverse(collect(enumerate(running_tasks)))
36+
task = task_weak.value
37+
if task === nothing || isready(task)
38+
next!(meter)
39+
deleteat!(running_tasks, i)
40+
end
41+
end
42+
sleep(0.01)
43+
end
44+
finish!(meter)
45+
46+
return
47+
end

src/thunk.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ function spawn(f, args...; kwargs...)
309309
args_kwargs = args_kwargs_to_pairs(args, kwargs)
310310

311311
# Get task queue, and don't let it propagate
312-
task_queue = get_options(:task_queue, EagerTaskQueue())
312+
task_queue = get_options(:task_queue, MONITOR_QUEUE[])
313313
options = NamedTuple(filter(opt->opt[1] != :task_queue, Base.pairs(options)))
314314
propagates = filter(prop->prop != :task_queue, propagates)
315315
options = merge(options, (;propagates))

0 commit comments

Comments
 (0)