Skip to content

Commit 49edf91

Browse files
committed
Add Task Monitor
1 parent 9bd2961 commit 49edf91

File tree

5 files changed

+55
-14
lines changed

5 files changed

+55
-14
lines changed

Project.toml

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
1212
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
1313
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
1414
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"
15+
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
1516
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
1617
Requires = "ae029012-a4dd-5104-9daa-d747884805df"
1718
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
@@ -23,6 +24,17 @@ StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
2324
TimespanLogging = "a526e669-04d3-4846-9525-c66122c55f63"
2425
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2526

27+
[weakdeps]
28+
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
29+
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
30+
GraphViz = "f526b714-d49f-11e8-06ff-31ed36ee7ee0"
31+
Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80"
32+
33+
[extensions]
34+
GraphVizExt = "GraphViz"
35+
GraphVizSimpleExt = "Colors"
36+
PlotsExt = ["DataFrames", "Plots"]
37+
2638
[compat]
2739
DataStructures = "0.18"
2840
Graphs = "1"
@@ -37,19 +49,8 @@ StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33, 0.34"
3749
TimespanLogging = "0.1"
3850
julia = "1.8"
3951

40-
[extensions]
41-
GraphVizSimpleExt = "Colors"
42-
GraphVizExt = "GraphViz"
43-
PlotsExt = ["DataFrames", "Plots"]
44-
4552
[extras]
4653
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
4754
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
4855
GraphViz = "f526b714-d49f-11e8-06ff-31ed36ee7ee0"
4956
Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80"
50-
51-
[weakdeps]
52-
Colors = "5ae59095-9a9b-59fe-a467-6f913c188581"
53-
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
54-
GraphViz = "f526b714-d49f-11e8-06ff-31ed36ee7ee0"
55-
Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80"

src/Dagger.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ include("scopes.jl")
4141
include("utils/scopes.jl")
4242
include("eager_thunk.jl")
4343
include("queue.jl")
44+
include("task_monitor.jl")
4445
include("thunk.jl")
4546
include("submission.jl")
4647
include("chunks.jl")

src/queue.jl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ enqueue!(::EagerTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk}) =
1414
enqueue!(::EagerTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
1515
eager_launch!(specs)
1616

17-
enqueue!(spec::Pair{EagerTaskSpec,EagerThunk}) =
18-
enqueue!(get_options(:task_queue, EagerTaskQueue()), spec)
17+
function enqueue!(spec::Pair{EagerTaskSpec,EagerThunk})
18+
println("Hi There")
19+
enqueue!(MONITOR_QUEUE,spec)
20+
end
21+
1922
enqueue!(specs::Vector{Pair{EagerTaskSpec,EagerThunk}}) =
2023
enqueue!(get_options(:task_queue, EagerTaskQueue()), specs)
2124

src/task_monitor.jl

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using ProgressMeter
2+
3+
struct MonitorTaskQueue <: AbstractTaskQueue
4+
runningTasks::Vector{Pair{EagerTaskSpec,EagerThunk}}
5+
finishedTasks::Vector{Pair{EagerTaskSpec,EagerThunk}}
6+
MonitorTaskQueue() = new(Pair{EagerTaskSpec,EagerThunk}[], Pair{EagerTaskSpec,EagerThunk}[])
7+
end
8+
function enqueue!(queue::MonitorTaskQueue, spec::Pair{EagerTaskSpec,EagerThunk})
9+
push!(queue.runningTasks, spec)
10+
upper = get_options(:task_queue, EagerTaskQueue())
11+
enqueue!(upper,spec)
12+
end
13+
14+
function enqueue!(queue::MonitorTaskQueue, specs::Vector{Pair{EagerTaskSpec,EagerThunk}})
15+
append!(queue.runningTasks, specs)
16+
end
17+
18+
const MONITOR_QUEUE = MonitorTaskQueue()
19+
20+
function monitor()
21+
errormonitor(Threads.@spawn begin
22+
runningTasks = MONITOR_QUEUE.runningTasks
23+
finishedTasks = MONITOR_QUEUE.finishedTasks
24+
meter = Progress(length(runningTasks))
25+
while !isempty(runningTasks)
26+
for (i,task) in reverse(collect(enumerate(runningTasks)))
27+
if isready(task[2])
28+
next!(meter)
29+
push!(finishedTasks,task)
30+
deleteat!(runningTasks,i)
31+
end
32+
end
33+
sleep(0.1)
34+
end
35+
end)
36+
end

src/thunk.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ function spawn(f, args...; kwargs...)
309309
args_kwargs = args_kwargs_to_pairs(args, kwargs)
310310

311311
# Get task queue, and don't let it propagate
312-
task_queue = get_options(:task_queue, EagerTaskQueue())
312+
task_queue = get_options(:task_queue, MONITOR_QUEUE)
313313
options = NamedTuple(filter(opt->opt[1] != :task_queue, Base.pairs(options)))
314314
propagates = filter(prop->prop != :task_queue, propagates)
315315
options = merge(options, (;propagates))

0 commit comments

Comments
 (0)