Skip to content

Commit 4730cab

Browse files
authored
Merge pull request #3156 from sbueringer/pr-pq-metrics
🌱 Add priority label to PQ depth metric
2 parents d40234e + 3c3fd3e commit 4730cab

File tree

5 files changed

+128
-56
lines changed

5 files changed

+128
-56
lines changed

pkg/controller/priorityqueue/metrics.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"k8s.io/client-go/util/workqueue"
88
"k8s.io/utils/clock"
9+
"sigs.k8s.io/controller-runtime/pkg/internal/metrics"
910
)
1011

1112
// This file is mostly a copy of unexported code from
@@ -14,8 +15,9 @@ import (
1415
// The only two differences are the addition of mapLock in defaultQueueMetrics and converging retryMetrics into queueMetrics.
1516

1617
type queueMetrics[T comparable] interface {
17-
add(item T)
18-
get(item T)
18+
add(item T, priority int)
19+
get(item T, priority int)
20+
updateDepthWithPriorityMetric(oldPriority, newPriority int)
1921
done(item T)
2022
updateUnfinishedWork()
2123
retry()
@@ -25,9 +27,9 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl
2527
if len(name) == 0 {
2628
return noMetrics[T]{}
2729
}
28-
return &defaultQueueMetrics[T]{
30+
31+
dqm := &defaultQueueMetrics[T]{
2932
clock: clock,
30-
depth: mp.NewDepthMetric(name),
3133
adds: mp.NewAddsMetric(name),
3234
latency: mp.NewLatencyMetric(name),
3335
workDuration: mp.NewWorkDurationMetric(name),
@@ -37,14 +39,22 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl
3739
processingStartTimes: map[T]time.Time{},
3840
retries: mp.NewRetriesMetric(name),
3941
}
42+
43+
if mpp, ok := mp.(metrics.MetricsProviderWithPriority); ok {
44+
dqm.depthWithPriority = mpp.NewDepthMetricWithPriority(name)
45+
} else {
46+
dqm.depth = mp.NewDepthMetric(name)
47+
}
48+
return dqm
4049
}
4150

4251
// defaultQueueMetrics expects the caller to lock before setting any metrics.
4352
type defaultQueueMetrics[T comparable] struct {
4453
clock clock.Clock
4554

4655
// current depth of a workqueue
47-
depth workqueue.GaugeMetric
56+
depth workqueue.GaugeMetric
57+
depthWithPriority metrics.DepthMetricWithPriority
4858
// total number of adds handled by a workqueue
4959
adds workqueue.CounterMetric
5060
// how long an item stays in a workqueue
@@ -64,13 +74,17 @@ type defaultQueueMetrics[T comparable] struct {
6474
}
6575

6676
// add is called for ready items only
67-
func (m *defaultQueueMetrics[T]) add(item T) {
77+
func (m *defaultQueueMetrics[T]) add(item T, priority int) {
6878
if m == nil {
6979
return
7080
}
7181

7282
m.adds.Inc()
73-
m.depth.Inc()
83+
if m.depthWithPriority != nil {
84+
m.depthWithPriority.Inc(priority)
85+
} else {
86+
m.depth.Inc()
87+
}
7488

7589
m.mapLock.Lock()
7690
defer m.mapLock.Unlock()
@@ -80,12 +94,16 @@ func (m *defaultQueueMetrics[T]) add(item T) {
8094
}
8195
}
8296

83-
func (m *defaultQueueMetrics[T]) get(item T) {
97+
func (m *defaultQueueMetrics[T]) get(item T, priority int) {
8498
if m == nil {
8599
return
86100
}
87101

88-
m.depth.Dec()
102+
if m.depthWithPriority != nil {
103+
m.depthWithPriority.Dec(priority)
104+
} else {
105+
m.depth.Dec()
106+
}
89107

90108
m.mapLock.Lock()
91109
defer m.mapLock.Unlock()
@@ -97,6 +115,13 @@ func (m *defaultQueueMetrics[T]) get(item T) {
97115
}
98116
}
99117

118+
func (m *defaultQueueMetrics[T]) updateDepthWithPriorityMetric(oldPriority, newPriority int) {
119+
if m.depthWithPriority != nil {
120+
m.depthWithPriority.Dec(oldPriority)
121+
m.depthWithPriority.Inc(newPriority)
122+
}
123+
}
124+
100125
func (m *defaultQueueMetrics[T]) done(item T) {
101126
if m == nil {
102127
return
@@ -139,8 +164,9 @@ func (m *defaultQueueMetrics[T]) retry() {
139164

140165
type noMetrics[T any] struct{}
141166

142-
func (noMetrics[T]) add(item T) {}
143-
func (noMetrics[T]) get(item T) {}
144-
func (noMetrics[T]) done(item T) {}
145-
func (noMetrics[T]) updateUnfinishedWork() {}
146-
func (noMetrics[T]) retry() {}
167+
func (noMetrics[T]) add(item T, priority int) {}
168+
func (noMetrics[T]) get(item T, priority int) {}
169+
func (noMetrics[T]) updateDepthWithPriorityMetric(oldPriority, newPriority int) {}
170+
func (noMetrics[T]) done(item T) {}
171+
func (noMetrics[T]) updateUnfinishedWork() {}
172+
func (noMetrics[T]) retry() {}

pkg/controller/priorityqueue/metrics_test.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ import (
44
"sync"
55

66
"k8s.io/client-go/util/workqueue"
7+
"sigs.k8s.io/controller-runtime/pkg/internal/metrics"
78
)
89

910
func newFakeMetricsProvider() *fakeMetricsProvider {
1011
return &fakeMetricsProvider{
11-
depth: make(map[string]int),
12+
depth: make(map[string]map[int]int),
1213
adds: make(map[string]int),
1314
latency: make(map[string][]float64),
1415
workDuration: make(map[string][]float64),
@@ -19,8 +20,10 @@ func newFakeMetricsProvider() *fakeMetricsProvider {
1920
}
2021
}
2122

23+
var _ metrics.MetricsProviderWithPriority = &fakeMetricsProvider{}
24+
2225
type fakeMetricsProvider struct {
23-
depth map[string]int
26+
depth map[string]map[int]int
2427
adds map[string]int
2528
latency map[string][]float64
2629
workDuration map[string][]float64
@@ -31,9 +34,13 @@ type fakeMetricsProvider struct {
3134
}
3235

3336
func (f *fakeMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
37+
panic("Should never be called. Expected NewDepthMetricWithPriority to be called instead")
38+
}
39+
40+
func (f *fakeMetricsProvider) NewDepthMetricWithPriority(name string) metrics.DepthMetricWithPriority {
3441
f.mu.Lock()
3542
defer f.mu.Unlock()
36-
f.depth[name] = 0
43+
f.depth[name] = map[int]int{}
3744
return &fakeGaugeMetric{m: &f.depth, mu: &f.mu, name: name}
3845
}
3946

@@ -80,21 +87,21 @@ func (f *fakeMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMet
8087
}
8188

8289
type fakeGaugeMetric struct {
83-
m *map[string]int
90+
m *map[string]map[int]int
8491
mu *sync.Mutex
8592
name string
8693
}
8794

88-
func (fg *fakeGaugeMetric) Inc() {
95+
func (fg *fakeGaugeMetric) Inc(priority int) {
8996
fg.mu.Lock()
9097
defer fg.mu.Unlock()
91-
(*fg.m)[fg.name]++
98+
(*fg.m)[fg.name][priority]++
9299
}
93100

94-
func (fg *fakeGaugeMetric) Dec() {
101+
func (fg *fakeGaugeMetric) Dec(priority int) {
95102
fg.mu.Lock()
96103
defer fg.mu.Unlock()
97-
(*fg.m)[fg.name]--
104+
(*fg.m)[fg.name][priority]--
98105
}
99106

100107
type fakeCounterMetric struct {

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
156156
w.items[key] = item
157157
w.queue.ReplaceOrInsert(item)
158158
if item.ReadyAt == nil {
159-
w.metrics.add(key)
159+
w.metrics.add(key, item.Priority)
160160
}
161161
w.addedCounter++
162162
continue
@@ -166,12 +166,16 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
166166
// will affect the order - Just delete and re-add.
167167
item, _ := w.queue.Delete(w.items[key])
168168
if o.Priority > item.Priority {
169+
// Update depth metric only if the item in the queue was already added to the depth metric.
170+
if item.ReadyAt == nil || w.becameReady.Has(key) {
171+
w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority)
172+
}
169173
item.Priority = o.Priority
170174
}
171175

172176
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
173177
if readyAt == nil && !w.becameReady.Has(key) {
174-
w.metrics.add(key)
178+
w.metrics.add(key, item.Priority)
175179
}
176180
item.ReadyAt = readyAt
177181
}
@@ -223,7 +227,7 @@ func (w *priorityqueue[T]) spin() {
223227
return false
224228
}
225229
if !w.becameReady.Has(item.Key) {
226-
w.metrics.add(item.Key)
230+
w.metrics.add(item.Key, item.Priority)
227231
w.becameReady.Insert(item.Key)
228232
}
229233
}
@@ -239,7 +243,7 @@ func (w *priorityqueue[T]) spin() {
239243
return true
240244
}
241245

242-
w.metrics.get(item.Key)
246+
w.metrics.get(item.Key, item.Priority)
243247
w.locked.Insert(item.Key)
244248
w.waiters.Add(-1)
245249
delete(w.items, item.Key)

0 commit comments

Comments
 (0)