Skip to content

Commit cf69be2

Browse files
authored
Merge pull request #2767 from sbueringer/pr-add-queue-option
✨ Add controller workqueue option
2 parents e181c0d + 6e23721 commit cf69be2

File tree

4 files changed

+85
-19
lines changed

4 files changed

+85
-19
lines changed

pkg/controller/controller.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ type Options struct {
5959
// The overall is a token bucket and the per-item is exponential.
6060
RateLimiter ratelimiter.RateLimiter
6161

62+
// NewQueue constructs the queue for this controller once the controller is ready to start.
63+
// With NewQueue a custom queue implementation can be used, e.g. a priority queue to prioritize with which
64+
// priority/order objects are reconciled (e.g. to reconcile objects with changes first).
65+
// This is a func because the standard Kubernetes work queues start themselves immediately, which
66+
// leads to goroutine leaks if something calls controller.New repeatedly.
67+
// The NewQueue func gets the controller name and the RateLimiter option (defaulted if necessary) passed in.
68+
// NewQueue defaults to NewRateLimitingQueueWithConfig.
69+
//
70+
// NOTE: LOW LEVEL PRIMITIVE!
71+
// Only use a custom NewQueue if you know what you are doing.
72+
NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface
73+
6274
// LogConstructor is used to construct a logger used for this controller and passed
6375
// to each reconciliation via the context field.
6476
LogConstructor func(request *reconcile.Request) logr.Logger
@@ -147,6 +159,14 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
147159
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
148160
}
149161

162+
if options.NewQueue == nil {
163+
options.NewQueue = func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface {
164+
return workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{
165+
Name: controllerName,
166+
})
167+
}
168+
}
169+
150170
if options.RecoverPanic == nil {
151171
options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
152172
}
@@ -157,12 +177,9 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
157177

158178
// Create controller with dependencies set
159179
return &controller.Controller{
160-
Do: options.Reconciler,
161-
MakeQueue: func() workqueue.RateLimitingInterface {
162-
return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{
163-
Name: name,
164-
})
165-
},
180+
Do: options.Reconciler,
181+
RateLimiter: options.RateLimiter,
182+
NewQueue: options.NewQueue,
166183
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
167184
CacheSyncTimeout: options.CacheSyncTimeout,
168185
Name: name,

pkg/controller/controller_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
. "github.com/onsi/gomega"
2525
"go.uber.org/goleak"
2626
corev1 "k8s.io/api/core/v1"
27+
"k8s.io/client-go/util/workqueue"
2728
"k8s.io/utils/ptr"
2829

2930
"sigs.k8s.io/controller-runtime/pkg/config"
@@ -32,6 +33,7 @@ import (
3233
"sigs.k8s.io/controller-runtime/pkg/handler"
3334
internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller"
3435
"sigs.k8s.io/controller-runtime/pkg/manager"
36+
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3537
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3638
"sigs.k8s.io/controller-runtime/pkg/source"
3739
)
@@ -133,6 +135,48 @@ var _ = Describe("controller.Controller", func() {
133135
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
134136
})
135137

138+
It("should default RateLimiter and NewQueue if not specified", func() {
139+
m, err := manager.New(cfg, manager.Options{})
140+
Expect(err).NotTo(HaveOccurred())
141+
142+
c, err := controller.New("new-controller", m, controller.Options{
143+
Reconciler: reconcile.Func(nil),
144+
})
145+
Expect(err).NotTo(HaveOccurred())
146+
147+
ctrl, ok := c.(*internalcontroller.Controller)
148+
Expect(ok).To(BeTrue())
149+
150+
Expect(ctrl.RateLimiter).NotTo(BeNil())
151+
Expect(ctrl.NewQueue).NotTo(BeNil())
152+
})
153+
154+
It("should not override RateLimiter and NewQueue if specified", func() {
155+
m, err := manager.New(cfg, manager.Options{})
156+
Expect(err).NotTo(HaveOccurred())
157+
158+
customRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second)
159+
customNewQueueCalled := false
160+
customNewQueue := func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface {
161+
customNewQueueCalled = true
162+
return nil
163+
}
164+
165+
c, err := controller.New("new-controller", m, controller.Options{
166+
Reconciler: reconcile.Func(nil),
167+
RateLimiter: customRateLimiter,
168+
NewQueue: customNewQueue,
169+
})
170+
Expect(err).NotTo(HaveOccurred())
171+
172+
ctrl, ok := c.(*internalcontroller.Controller)
173+
Expect(ok).To(BeTrue())
174+
175+
Expect(ctrl.RateLimiter).To(BeIdenticalTo(customRateLimiter))
176+
ctrl.NewQueue("controller1", nil)
177+
Expect(customNewQueueCalled).To(BeTrue(), "Expected customNewQueue to be called")
178+
})
179+
136180
It("should default RecoverPanic from the manager", func() {
137181
m, err := manager.New(cfg, manager.Options{Controller: config.Controller{RecoverPanic: ptr.To(true)}})
138182
Expect(err).NotTo(HaveOccurred())

pkg/internal/controller/controller.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
3434
logf "sigs.k8s.io/controller-runtime/pkg/log"
3535
"sigs.k8s.io/controller-runtime/pkg/predicate"
36+
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3637
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3738
"sigs.k8s.io/controller-runtime/pkg/source"
3839
)
@@ -50,10 +51,13 @@ type Controller struct {
5051
// Defaults to the DefaultReconcileFunc.
5152
Do reconcile.Reconciler
5253

53-
// MakeQueue constructs the queue for this controller once the controller is ready to start.
54-
// This exists because the standard Kubernetes workqueues start themselves immediately, which
54+
// RateLimiter is used to limit how frequently requests may be queued into the work queue.
55+
RateLimiter ratelimiter.RateLimiter
56+
57+
// NewQueue constructs the queue for this controller once the controller is ready to start.
58+
// This is a func because the standard Kubernetes work queues start themselves immediately, which
5559
// leads to goroutine leaks if something calls controller.New repeatedly.
56-
MakeQueue func() workqueue.RateLimitingInterface
60+
NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface
5761

5862
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
5963
// the Queue for processing
@@ -158,7 +162,7 @@ func (c *Controller) Start(ctx context.Context) error {
158162
// Set the internal context.
159163
c.ctx = ctx
160164

161-
c.Queue = c.MakeQueue()
165+
c.Queue = c.NewQueue(c.Name, c.RateLimiter)
162166
go func() {
163167
<-ctx.Done()
164168
c.Queue.ShutDown()

pkg/internal/controller/controller_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
4444
"sigs.k8s.io/controller-runtime/pkg/internal/log"
4545
"sigs.k8s.io/controller-runtime/pkg/predicate"
46+
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
4647
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4748
"sigs.k8s.io/controller-runtime/pkg/source"
4849
)
@@ -68,7 +69,7 @@ var _ = Describe("controller", func() {
6869
ctrl = &Controller{
6970
MaxConcurrentReconciles: 1,
7071
Do: fakeReconcile,
71-
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
72+
NewQueue: func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return queue },
7273
LogConstructor: func(_ *reconcile.Request) logr.Logger {
7374
return log.RuntimeLog.WithName("controller").WithName("test")
7475
},
@@ -408,8 +409,8 @@ var _ = Describe("controller", func() {
408409
// TODO(directxman12): we should ensure that backoff occurrs with error requeue
409410

410411
It("should not reset backoff until there's a non-error result", func() {
411-
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
412-
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
412+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
413+
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
413414

414415
ctx, cancel := context.WithCancel(context.Background())
415416
defer cancel()
@@ -444,8 +445,8 @@ var _ = Describe("controller", func() {
444445
})
445446

446447
It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() {
447-
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
448-
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
448+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
449+
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
449450

450451
ctx, cancel := context.WithCancel(context.Background())
451452
defer cancel()
@@ -474,8 +475,8 @@ var _ = Describe("controller", func() {
474475
})
475476

476477
It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() {
477-
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
478-
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
478+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
479+
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
479480

480481
ctx, cancel := context.WithCancel(context.Background())
481482
defer cancel()
@@ -504,8 +505,8 @@ var _ = Describe("controller", func() {
504505
})
505506

506507
It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
507-
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
508-
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
508+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
509+
ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
509510

510511
ctx, cancel := context.WithCancel(context.Background())
511512
defer cancel()

0 commit comments

Comments
 (0)