Skip to content

Commit 2888fa4

Browse files
committed
Sch: Bifurcate signature metrics on processor
1 parent e33216c commit 2888fa4

File tree

2 files changed

+33
-17
lines changed

2 files changed

+33
-17
lines changed

src/sch/Sch.jl

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ Fields:
7373
- `worker_loadavg::Dict{Int,NTuple{3,Float64}}` - Worker load average
7474
- `worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}}` - Communication channels between the scheduler and each worker
7575
- `procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}}` - Cached linked list of processors ready to be used
76-
- `signature_time_cost::Dict{Signature,UInt64}` - Cache of estimated CPU time (in nanoseconds) required to compute calls with the given signature
77-
- `signature_alloc_cost::Dict{Signature,UInt64}` - Cache of estimated CPU RAM (in bytes) required to compute calls with the given signature
76+
- `signature_time_cost::Dict{Signature,Dict{Processor,UInt64}}` - Cache of estimated CPU time (in nanoseconds) required to compute calls with the given signature on a given processor
77+
- `signature_alloc_cost::Dict{Signature,Dict{Processor,UInt64}}` - Cache of estimated CPU RAM (in bytes) required to compute calls with the given signature on a given processor
7878
- `transfer_rate::Ref{UInt64}` - Estimate of the network transfer rate in bytes per second
7979
- `halt::Base.Event` - Event indicating that the scheduler is halting
8080
- `lock::ReentrantLock` - Lock around operations which modify the state
@@ -100,8 +100,8 @@ struct ComputeState
100100
worker_loadavg::Dict{Int,NTuple{3,Float64}}
101101
worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}}
102102
procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}}
103-
signature_time_cost::Dict{Signature,UInt64}
104-
signature_alloc_cost::Dict{Signature,UInt64}
103+
signature_time_cost::Dict{Signature,Dict{Processor,UInt64}}
104+
signature_alloc_cost::Dict{Signature,Dict{Processor,UInt64}}
105105
transfer_rate::Ref{UInt64}
106106
halt::Base.Event
107107
lock::ReentrantLock
@@ -130,8 +130,8 @@ function start_state(deps::Dict, node_order, chan)
130130
Dict{Int,NTuple{3,Float64}}(),
131131
Dict{Int, Tuple{RemoteChannel,RemoteChannel}}(),
132132
Ref{Union{ProcessorCacheEntry,Nothing}}(nothing),
133-
Dict{Signature,UInt64}(),
134-
Dict{Signature,UInt64}(),
133+
Dict{Signature,Dict{Processor,UInt64}}(),
134+
Dict{Signature,Dict{Processor,UInt64}}(),
135135
Ref{UInt64}(1_000_000),
136136
Base.Event(),
137137
ReentrantLock(),
@@ -590,16 +590,23 @@ function scheduler_run(ctx, state::ComputeState, d::Thunk, options)
590590
end
591591
node = unwrap_weak_checked(state.thunk_dict[thunk_id])
592592
if metadata !== nothing
593+
# Update metrics
593594
state.worker_time_pressure[pid][proc] = metadata.time_pressure
594595
#to_storage = fetch(node.options.storage)
595596
#state.worker_storage_pressure[pid][to_storage] = metadata.storage_pressure
596597
#state.worker_storage_capacity[pid][to_storage] = metadata.storage_capacity
597598
state.worker_loadavg[pid] = metadata.loadavg
599+
598600
sig = signature(state, node)
599-
state.signature_time_cost[sig] = (metadata.threadtime + get(state.signature_time_cost, sig, 0)) ÷ 2
600-
state.signature_alloc_cost[sig] = (metadata.gc_allocd + get(state.signature_alloc_cost, sig, 0)) ÷ 2
601+
time_costs_proc = get!(Dict{Processor,UInt64}, state.signature_time_cost, sig)
602+
time_cost = get(time_costs_proc, proc, UInt64(0))
603+
time_costs_proc[proc] = (metadata.threadtime + time_cost) ÷ UInt64(2)
604+
alloc_costs_proc = get!(Dict{Processor,UInt64}, state.signature_alloc_cost, sig)
605+
alloc_cost = get(alloc_costs_proc, proc, UInt64(0))
606+
alloc_costs_proc[proc] = (metadata.gc_allocd + alloc_cost) ÷ UInt64(2)
607+
601608
if metadata.transfer_rate !== nothing
602-
state.transfer_rate[] = (state.transfer_rate[] + metadata.transfer_rate) ÷ 2
609+
state.transfer_rate[] = (state.transfer_rate[] + metadata.transfer_rate) ÷ UInt64(2)
603610
end
604611
end
605612
state.cache[node] = res

src/sch/util.jl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -400,17 +400,24 @@ end
400400

401401
function has_capacity(state, p, gp, time_util, alloc_util, occupancy, sig)
402402
T = typeof(p)
403-
# FIXME: MaxUtilization
404-
est_time_util = round(UInt64, if time_util !== nothing && haskey(time_util, T)
405-
time_util[T] * 1000^3
403+
404+
@warn "Use special lookup to use other proc estimates" maxlog=1
405+
est_time_util = if time_util !== nothing && haskey(time_util, T)
406+
round(UInt64, time_util[T] * 1000^3)
407+
elseif haskey(state.signature_time_cost, sig) && haskey(state.signature_time_cost[sig], p)
408+
state.signature_time_cost[sig][p]
406409
else
407-
get(state.signature_time_cost, sig, 1000^3)
408-
end)
410+
UInt64(1000^3)
411+
end
412+
409413
est_alloc_util = if alloc_util !== nothing && haskey(alloc_util, T)
410-
alloc_util[T]
414+
alloc_util[T]::UInt64
415+
elseif haskey(state.signature_alloc_cost, sig) && haskey(state.signature_alloc_cost[sig], p)
416+
state.signature_alloc_cost[sig][p]
411417
else
412-
get(state.signature_alloc_cost, sig, UInt64(0))
413-
end::UInt64
418+
UInt64(0)
419+
end
420+
414421
est_occupancy::UInt32 = typemax(UInt32)
415422
if occupancy !== nothing
416423
occ = nothing
@@ -424,6 +431,7 @@ function has_capacity(state, p, gp, time_util, alloc_util, occupancy, sig)
424431
est_occupancy = Base.unsafe_trunc(UInt32, clamp(occ, 0, 1) * typemax(UInt32))
425432
end
426433
end
434+
427435
#= FIXME: Estimate if cached data can be swapped to storage
428436
storage = storage_resource(p)
429437
real_alloc_util = state.worker_storage_pressure[gp][storage]
@@ -432,6 +440,7 @@ function has_capacity(state, p, gp, time_util, alloc_util, occupancy, sig)
432440
return false, est_time_util, est_alloc_util
433441
end
434442
=#
443+
435444
return true, est_time_util, est_alloc_util, est_occupancy
436445
end
437446

0 commit comments

Comments
 (0)