Skip to content

Commit 08e1f0c

Browse files
committed
fixup! Add Task Monitor
1 parent 49edf91 commit 08e1f0c

File tree

4 files changed

+39
-29
lines changed

4 files changed

+39
-29
lines changed

Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ SharedArrays = "1a1011a3-84de-559e-8e89-a11a2f7dc383"
2121
SparseArrays = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"
2222
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
2323
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
24+
TaskLocalValues = "ed4db957-447d-4319-bfb6-7fa9ae7ecf34"
2425
TimespanLogging = "a526e669-04d3-4846-9525-c66122c55f63"
2526
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2627

@@ -46,6 +47,7 @@ Requires = "1"
4647
ScopedValues = "1.1"
4748
Statistics = "1"
4849
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33, 0.34"
50+
TaskLocalValues = "0.1"
4951
TimespanLogging = "0.1"
5052
julia = "1.8"
5153

src/queue.jl

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,10 @@ enqueue!(::EagerTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk}) =
1414
enqueue!(::EagerTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
1515
eager_launch!(specs)
1616

17-
function enqueue!(spec::Pair{EagerTaskSpec,EagerThunk})
18-
println("Hi There")
19-
enqueue!(MONITOR_QUEUE,spec)
20-
end
21-
17+
enqueue!(spec::Pair{EagerTaskSpec,EagerThunk}) =
18+
enqueue!(get_options(:task_queue, MONITOR_QUEUE[]), spec)
2219
enqueue!(specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
23-
enqueue!(get_options(:task_queue, EagerTaskQueue()), specs)
20+
enqueue!(get_options(:task_queue, MONITOR_QUEUE[]), specs)
2421

2522
struct LazyTaskQueue <: AbstractTaskQueue
2623
tasks::Vector{Pair{EagerTaskSpec,EagerThunk}}

src/task_monitor.jl

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,47 @@
11
using ProgressMeter
2+
using TaskLocalValues
23

34
struct MonitorTaskQueue <: AbstractTaskQueue
4-
runningTasks::Vector{Pair{EagerTaskSpec,EagerThunk}}
5-
finishedTasks::Vector{Pair{EagerTaskSpec,EagerThunk}}
6-
MonitorTaskQueue() = new(Pair{EagerTaskSpec,EagerThunk}[], Pair{EagerTaskSpec,EagerThunk}[])
5+
running_tasks::Vector{WeakRef}
6+
MonitorTaskQueue() = new(WeakRef[])
77
end
88
function enqueue!(queue::MonitorTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk})
9-
push!(queue.runningTasks, spec)
9+
push!(queue.running_tasks, WeakRef(spec[2]))
1010
upper = get_options(:task_queue, EagerTaskQueue())
11-
enqueue!(upper,spec)
11+
enqueue!(upper, spec)
1212
end
1313

1414
function enqueue!(queue::MonitorTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}})
15-
append!(queue.runningTasks, specs)
15+
for (_, task) in specs
16+
push!(queue.running_tasks, WeakRef(task))
17+
end
18+
upper = get_options(:task_queue, EagerTaskQueue())
19+
enqueue!(upper, specs)
1620
end
1721

18-
const MONITOR_QUEUE = MonitorTaskQueue()
22+
const MONITOR_QUEUE = TaskLocalValue{MonitorTaskQueue}(MonitorTaskQueue)
1923

24+
"Monitors and displays the progress of any still-executing tasks."
2025
function monitor()
21-
errormonitor(Threads.@spawn begin
22-
runningTasks = MONITOR_QUEUE.runningTasks
23-
finishedTasks = MONITOR_QUEUE.finishedTasks
24-
meter = Progress(length(runningTasks))
25-
while !isempty(runningTasks)
26-
for (i,task) in reverse(collect(enumerate(runningTasks)))
27-
if isready(task[2])
28-
next!(meter)
29-
push!(finishedTasks,task)
30-
deleteat!(runningTasks,i)
31-
end
32-
end
33-
sleep(0.1)
26+
queue = MONITOR_QUEUE[]
27+
running_tasks = queue.running_tasks
28+
isempty(running_tasks) && return
29+
30+
ntasks = length(running_tasks)
31+
meter = Progress(ntasks;
32+
desc="Waiting for $ntasks tasks...",
33+
dt=0.01, showspeed=true)
34+
while !isempty(running_tasks)
35+
for (i, task_weak) in reverse(collect(enumerate(running_tasks)))
36+
task = task_weak.value
37+
if task === nothing || isready(task)
38+
next!(meter)
39+
deleteat!(running_tasks, i)
3440
end
35-
end)
36-
end
41+
end
42+
sleep(0.01)
43+
end
44+
finish!(meter)
45+
46+
return
47+
end

src/thunk.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ function spawn(f, args...; kwargs...)
309309
args_kwargs = args_kwargs_to_pairs(args, kwargs)
310310

311311
# Get task queue, and don't let it propagate
312-
task_queue = get_options(:task_queue, MONITOR_QUEUE)
312+
task_queue = get_options(:task_queue, MONITOR_QUEUE[])
313313
options = NamedTuple(filter(opt->opt[1] != :task_queue, Base.pairs(options)))
314314
propagates = filter(prop->prop != :task_queue, propagates)
315315
options = merge(options, (;propagates))

0 commit comments

Comments
 (0)