@@ -13,6 +13,23 @@ include("util.jl")
13
13
include (" fault-handler.jl" )
14
14
include (" dynamic.jl" )
15
15
16
+ mutable struct ProcessorCacheEntry
17
+ gproc:: OSProc
18
+ proc:: Processor
19
+ next:: ProcessorCacheEntry
20
+
21
+ ProcessorCacheEntry (gproc:: OSProc , proc:: Processor ) = new (gproc, proc)
22
+ end
23
+ function Base. show (io:: IO , entry:: ProcessorCacheEntry )
24
+ entries = 1
25
+ next = entry. next
26
+ while next != = entry
27
+ entries += 1
28
+ next = next. next
29
+ end
30
+ print (io, " ProcessorCacheEntry(pid $(entry. gproc. pid) , $(entry. proc) , $entries entries)" )
31
+ end
32
+
16
33
"""
17
34
ComputeState
18
35
@@ -33,7 +50,8 @@ Fields:
33
50
- worker_pressure::Dict{Int,Dict{Type,Float64}} - Cache of worker pressure
34
51
- worker_capacity::Dict{Int,Dict{Type,Float64}} - Maps from worker ID to capacity
35
52
- worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}} - Communication channels between the scheduler and each worker
36
- - halt::Ref{Bool} - Flag indicating, when set, that the scheduler should halt immediately
53
+ - procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}} - Cached linked list of processors ready to be used
54
+ - halt::Base.RefValue{Bool} - Flag indicating, when set, that the scheduler should halt immediately
37
55
- lock::ReentrantLock() - Lock around operations which modify the state
38
56
- futures::Dict{Thunk, Vector{ThunkFuture}} - Futures registered for waiting on the result of a thunk.
39
57
- errored::Set{Thunk} - Thunks that threw an error
@@ -54,13 +72,54 @@ struct ComputeState
54
72
worker_pressure:: Dict{Int,Dict{Type,Float64}}
55
73
worker_capacity:: Dict{Int,Dict{Type,Float64}}
56
74
worker_chans:: Dict{Int, Tuple{RemoteChannel,RemoteChannel}}
57
- halt:: Ref{Bool}
75
+ procs_cache_list:: Base.RefValue{Union{ProcessorCacheEntry,Nothing}}
76
+ halt:: Base.RefValue{Bool}
58
77
lock:: ReentrantLock
59
78
futures:: Dict{Thunk, Vector{ThunkFuture}}
60
79
errored:: Set{Thunk}
61
80
chan:: Channel{Any}
62
81
end
63
82
83
+ function start_state (deps:: Dict , node_order, chan)
84
+ state = ComputeState (rand (UInt64),
85
+ deps,
86
+ Set {Thunk} (),
87
+ OneToMany (),
88
+ OneToMany (),
89
+ Vector {Thunk} (undef, 0 ),
90
+ Dict {Thunk, Any} (),
91
+ Set {Thunk} (),
92
+ Dict {Int, Thunk} (),
93
+ node_order,
94
+ Dict {Int,OSProc} (),
95
+ Dict {Int,Dict{Type,Float64}} (),
96
+ Dict {Int,Dict{Type,Float64}} (),
97
+ Dict {Int, Tuple{RemoteChannel,RemoteChannel}} (),
98
+ Ref {Union{ProcessorCacheEntry,Nothing}} (nothing ),
99
+ Ref {Bool} (false ),
100
+ ReentrantLock (),
101
+ Dict {Thunk, Vector{ThunkFuture}} (),
102
+ Set {Thunk} (),
103
+ chan)
104
+
105
+ nodes = sort (collect (keys (deps)), by= node_order)
106
+ # N.B. Using merge! here instead would modify deps
107
+ for (key,val) in deps
108
+ state. waiting_data[key] = copy (val)
109
+ end
110
+ for k in nodes
111
+ if istask (k)
112
+ waiting = Set {Thunk} (Iterators. filter (istask, inputs (k)))
113
+ if isempty (waiting)
114
+ push! (state. ready, k)
115
+ else
116
+ state. waiting[k] = waiting
117
+ end
118
+ end
119
+ end
120
+ state
121
+ end
122
+
64
123
"""
65
124
SchedulerOptions
66
125
@@ -86,6 +145,8 @@ skipped, the error will be displayed, and the scheduler will execute as usual.
86
145
- `network=nothing`: Which network should be used to transfer bulk data over.
87
146
If `nothing`, defaults to Distributed's cluster network
88
147
(`MemPool.DistributedNetwork()`).
148
+ - `round_robin::Bool=false`: Whether to schedule in round-robin mode, which
149
+ spreads load instead of the default behavior of filling processors to capacity.
89
150
"""
90
151
Base. @kwdef struct SchedulerOptions
91
152
single:: Int = 0
@@ -94,6 +155,7 @@ Base.@kwdef struct SchedulerOptions
94
155
checkpoint = nothing
95
156
restore = nothing
96
157
network = nothing
158
+ round_robin:: Bool = false
97
159
end
98
160
99
161
"""
@@ -353,55 +415,132 @@ function schedule!(ctx, state, procs=procs_to_use(ctx))
353
415
lock (state. lock) do
354
416
safepoint (state)
355
417
@assert length (procs) > 0
356
- proc_keys = map (x-> x. pid, procs)
357
- proc_set = Set {Any} ()
358
- for p in proc_keys
359
- for proc in get_processors (OSProc (p))
360
- push! (proc_set, p=> proc)
418
+
419
+ # Populate the cache if empty
420
+ if state. procs_cache_list[] === nothing
421
+ current = nothing
422
+ for p in map (x-> x. pid, procs[2 : end ])
423
+ for proc in get_processors (OSProc (p))
424
+ next = ProcessorCacheEntry (OSProc (p), proc)
425
+ if current === nothing
426
+ current = next
427
+ current. next = current
428
+ state. procs_cache_list[] = current
429
+ else
430
+ current. next = next
431
+ current = next
432
+ current. next = state. procs_cache_list[]
433
+ end
434
+ end
435
+ end
436
+ # FIXME : Sort by lowest absolute utilization
437
+ end
438
+
439
+ function can_use_proc (task, proc, opts)
440
+ # Check against proclist
441
+ if opts. proclist === nothing
442
+ if ! default_enabled (proc)
443
+ return false
444
+ end
445
+ elseif opts. proclist isa Function
446
+ if ! opts. proclist (proc)
447
+ return false
448
+ end
449
+ elseif opts. proclist isa Vector
450
+ if ! (proc in opts. proclist)
451
+ return false
452
+ end
453
+ else
454
+ error (" proclist must be a Function, Vector, or nothing" )
455
+ end
456
+
457
+ # Check against single
458
+ if opts. single != 0
459
+ if gp. pid != opts. single
460
+ return false
461
+ end
462
+ end
463
+
464
+ return true
465
+ end
466
+ function has_capacity (p, gp, procutil)
467
+ T = typeof (p)
468
+ extra_util = get (procutil, T, 1 )
469
+ real_util = state. worker_pressure[gp][T]
470
+ cap = state. worker_capacity[gp][T]
471
+ if ((extra_util isa MaxUtilization) && (real_util > 0 )) ||
472
+ ((extra_util isa Real) && (extra_util + real_util > cap))
473
+ return false , cap, extra_util
361
474
end
475
+ return true , cap, extra_util
362
476
end
477
+
478
+ # Schedule tasks
363
479
failed_scheduling = Thunk[]
364
480
while ! isempty (state. ready)
481
+ # Select a new task and get its options
365
482
task = pop! (state. ready)
366
483
opts = merge (ctx. options, task. options)
367
- proclist = opts. proclist
368
- proc_set_useable = if proclist === nothing
369
- filter (x-> default_enabled (x[2 ]), proc_set)
370
- elseif proclist isa Function
371
- filter (x-> proclist (x[2 ]), proc_set)
484
+
485
+ # Try to select a processor
486
+ selected_entry = nothing
487
+ entry = state. procs_cache_list[]
488
+ cap, extra_util = nothing , nothing
489
+ procs_found = false
490
+ # N.B. if we only have one processor, we need to select it now
491
+ if can_use_proc (task, entry. proc, opts)
492
+ has_cap, cap, extra_util = has_capacity (entry. proc, entry. gproc. pid, opts. procutil)
493
+ if has_cap
494
+ selected_entry = entry
495
+ else
496
+ procs_found = true
497
+ entry = entry. next
498
+ end
372
499
else
373
- filter (x-> typeof (x[2 ]) in proclist, proc_set)
374
- end
375
- if opts. single != 0
376
- proc_set_useable = filter (x-> x[1 ]== opts. single, proc_set_useable)
500
+ entry = entry. next
377
501
end
378
- @assert ! isempty (proc_set_useable) " No processors available, try making proclist more liberal"
379
- procutil = opts. procutil
380
- gproc = nothing
381
- proc = nothing
382
- extra_util = nothing
383
- cap = nothing
384
- # FIXME : Sort by lowest utilization
385
- for (gp,p) in proc_set_useable
386
- T = typeof (p)
387
- extra_util = get (procutil, T, 1 )
388
- real_util = state. worker_pressure[gp][T]
389
- cap = state. worker_capacity[gp][T]
390
- if ((extra_util isa MaxUtilization) && (real_util > 0 )) ||
391
- ((extra_util isa Real) && (extra_util + real_util > cap))
392
- continue
502
+ while selected_entry === nothing
503
+ if entry === state. procs_cache_list[]
504
+ if procs_found
505
+ push! (failed_scheduling, task)
506
+ break
507
+ else
508
+ error (" No processors available, try making proclist more liberal" )
509
+ end
510
+ end
511
+
512
+ if can_use_proc (task, entry. proc, opts)
513
+ has_cap, cap, extra_util = has_capacity (entry. proc, entry. gproc. pid, opts. procutil)
514
+ if has_cap
515
+ # Select this processor
516
+ selected_entry = entry
517
+ else
518
+ # We could have selected it otherwise
519
+ procs_found = true
520
+ entry = entry. next
521
+ end
393
522
else
394
- gproc = OSProc (gp)
395
- proc = p
396
- break
523
+ # Try next processor
524
+ entry = entry. next
397
525
end
398
526
end
399
- if proc != = nothing
400
- extra_util = extra_util isa MaxUtilization ? cap : extra_util
401
- fire_task! (ctx, task, (gproc, proc), state; util= extra_util)
527
+ selected_entry === nothing && continue
528
+
529
+ # Schedule task onto proc
530
+ gproc, proc = entry. gproc, entry. proc
531
+ extra_util = extra_util isa MaxUtilization ? cap : extra_util
532
+ fire_task! (ctx, task, (gproc, proc), state; util= extra_util)
533
+
534
+ # Progress through list
535
+ if ctx. options. round_robin
536
+ # Proceed to next entry to spread work
537
+ state. procs_cache_list[] = state. procs_cache_list[]. next
402
538
continue
403
- else
404
- push! (failed_scheduling, task)
539
+ end
540
+ util = state. worker_pressure[gproc. pid][typeof (proc)]
541
+ if util >= cap
542
+ # Proceed to next entry due to over-pressure
543
+ state. procs_cache_list[] = state. procs_cache_list[]. next
405
544
end
406
545
end
407
546
append! (state. ready, failed_scheduling)
@@ -568,45 +707,6 @@ function finish_task!(state, node, thunk_failed; free=true)
568
707
end
569
708
end
570
709
571
- function start_state (deps:: Dict , node_order, chan)
572
- state = ComputeState (rand (UInt64),
573
- deps,
574
- Set {Thunk} (),
575
- OneToMany (),
576
- OneToMany (),
577
- Vector {Thunk} (undef, 0 ),
578
- Dict {Thunk, Any} (),
579
- Set {Thunk} (),
580
- Dict {Int, Thunk} (),
581
- node_order,
582
- Dict {Int,OSProc} (),
583
- Dict {Int,Dict{Type,Float64}} (),
584
- Dict {Int,Dict{Type,Float64}} (),
585
- Dict {Int, Tuple{RemoteChannel,RemoteChannel}} (),
586
- Ref {Bool} (false ),
587
- ReentrantLock (),
588
- Dict {Thunk, Vector{ThunkFuture}} (),
589
- Set {Thunk} (),
590
- chan)
591
-
592
- nodes = sort (collect (keys (deps)), by= node_order)
593
- # N.B. Using merge! here instead would modify deps
594
- for (key,val) in deps
595
- state. waiting_data[key] = copy (val)
596
- end
597
- for k in nodes
598
- if istask (k)
599
- waiting = Set {Thunk} (Iterators. filter (istask, inputs (k)))
600
- if isempty (waiting)
601
- push! (state. ready, k)
602
- else
603
- state. waiting[k] = waiting
604
- end
605
- end
606
- end
607
- state
608
- end
609
-
610
710
function fetch_report (task)
611
711
try
612
712
fetch (task)
0 commit comments