Skip to content

⚠️ [Warm Replicas] Implement warm replica support for controllers. #3192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
8239300
[Warm Replicas] Implement warm replica support for controllers.
godwinpang Apr 9, 2025
73fc8fa
Remove irrelevant runnable_group.go code.
godwinpang Apr 14, 2025
be1b1c2
Rename ShouldWarmup.
godwinpang Apr 14, 2025
c9b99eb
fmt
godwinpang Apr 14, 2025
e7a2bbf
Change to atomic.Bool to avoid race in test.
godwinpang Apr 14, 2025
854987c
Address comments.
godwinpang Apr 29, 2025
072ad4b
Add ready check to block controller startup until warmup is complete.
godwinpang May 2, 2025
43118a3
Keep test helper structs private.
godwinpang May 2, 2025
b67bc65
Address comments.
godwinpang May 12, 2025
fc7c8c5
Fix lint.
godwinpang May 12, 2025
6bb4616
Address naming + comments from sbueringer.
godwinpang May 13, 2025
ccc7485
Refactor tests to use HaveValue.
godwinpang May 13, 2025
54f4fe3
Document + add UT for WaitForWarmupComplete behavior on ctx cancellat…
godwinpang May 14, 2025
667bb03
Add unit test that exercises controller warmup integration with manager.
godwinpang May 14, 2025
66e3be4
Add UT that verifies WaitForWarmupComplete blocking / non-blocking be…
godwinpang May 14, 2025
d9cc96b
Verify r.Others.startQueue in runnables test cases.
godwinpang May 14, 2025
65a04d5
Fix UT to verify runnable ordering.
godwinpang May 14, 2025
c201bfa
Fix UT for WaitForWarmupComplete blocking.
godwinpang May 15, 2025
5a13db4
Document !NeedLeaderElection+NeedWarmup behavior
godwinpang May 15, 2025
4879527
Fix test race.
godwinpang May 16, 2025
57acc77
Cleanup test wrapper runnables.
godwinpang May 16, 2025
1987b54
Make didStartEventSources run once with sync.Once + UT.
godwinpang May 16, 2025
a49f3a4
Rewrite Warmup to avoid polling.
godwinpang May 16, 2025
89f5479
Rename NeedWarmup to EnableWarmup.
godwinpang May 16, 2025
9d5ddfb
Clarify comment on Warmup.
godwinpang May 16, 2025
66f64f0
Move reset watches critical section inside of startEventSources.
godwinpang May 16, 2025
0563114
Add test to assert startEventSources blocking behavior.
godwinpang May 16, 2025
aa20ef5
Make Start threadsafe with Warmup + UT.
godwinpang May 16, 2025
c9a2973
Change warmup to use buffered error channel and add New method.
godwinpang May 19, 2025
79a7b95
Fail in warmup directly and rely on sync.Once for warmup thread-safet…
godwinpang May 20, 2025
c1d8ea4
Sync controller EnableWarmup comments.
godwinpang May 20, 2025
5df573f
Rename to startEventSourcesLocked and lock with c.mu
godwinpang May 21, 2025
d8650df
Address edge case for watch added after warmup completes.
godwinpang May 21, 2025
a03f404
Fix test description and set leaderelection==true
godwinpang May 21, 2025
dcf4b8b
Fix lint.
godwinpang May 21, 2025
ba51d28
Change shutdown order to shutdown warmup runnables in parallel with o…
godwinpang May 22, 2025
ea2aa0e
Fix test races by ensuring goroutines do not outlive their It blocks.
godwinpang May 22, 2025
730b30e
Block on source start on context cancel.
godwinpang May 22, 2025
bca3e2a
Guard access to c.Queue explicitly.
godwinpang May 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ type Controller struct {
// Defaults to true, which means the controller will use leader election.
NeedLeaderElection *bool

// NeedWarmup specifies whether the controller should start its sources when the manager is not
// the leader. This is useful for cases where sources take a long time to start, as it allows
// for the controller to warm up its caches even before it is elected as the leader. This
// improves leadership failover time, as the caches will be prepopulated before the controller
// transitions to be leader.
//
// When set to true, the controller will start its sources without transitioning to be leader.
// Defaults to false.
NeedWarmup *bool

// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
// priority queue.
//
Expand Down
18 changes: 16 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
Copyright 2018 The Kubernetes Authors.
/* Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,6 +92,16 @@ type TypedOptions[request comparable] struct {
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue *bool

// NeedWarmup specifies whether the controller should start its sources when the manager is not
// the leader. This is useful for cases where sources take a long time to start, as it allows
// for the controller to warm up its caches even before it is elected as the leader. This
// improves leadership failover time, as the caches will be prepopulated before the controller
// transitions to be leader.
//
// When set to true, the controller will start its sources without transitioning to be leader.
// Defaults to false.
NeedWarmup *bool
}

// DefaultFromConfig defaults the config from a config.Controller
Expand Down Expand Up @@ -124,6 +133,10 @@ func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller
if options.NeedLeaderElection == nil {
options.NeedLeaderElection = config.NeedLeaderElection
}

if options.NeedWarmup == nil {
options.NeedWarmup = config.NeedWarmup
}
}

// Controller implements an API. A Controller manages a work queue fed reconcile.Requests
Expand Down Expand Up @@ -253,6 +266,7 @@ func NewTypedUnmanaged[request comparable](name string, options TypedOptions[req
LogConstructor: options.LogConstructor,
RecoverPanic: options.RecoverPanic,
LeaderElected: options.NeedLeaderElection,
NeedWarmup: options.NeedWarmup,
}, nil
}

Expand Down
70 changes: 70 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,5 +474,75 @@ var _ = Describe("controller.Controller", func() {
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
Expect(ok).To(BeFalse())
})

It("should set ShouldWarmupWithoutLeadership correctly", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

// Test with ShouldWarmupWithoutLeadership set to true
ctrlWithWarmup, err := controller.New("warmup-enabled-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
NeedWarmup: ptr.To(true),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithWarmup, ok := ctrlWithWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlWithWarmup.NeedWarmup).To(BeTrue())

// Test with ShouldWarmupWithoutLeadership set to false
ctrlWithoutWarmup, err := controller.New("warmup-disabled-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
NeedWarmup: ptr.To(false),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithoutWarmup, ok := ctrlWithoutWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithoutWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlWithoutWarmup.NeedWarmup).To(BeFalse())

// Test with ShouldWarmupWithoutLeadership not set (should default to nil)
ctrlWithDefaultWarmup, err := controller.New("warmup-default-ctrl", m, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlWithDefaultWarmup, ok := ctrlWithDefaultWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlWithDefaultWarmup.NeedWarmup).To(BeNil())
})

It("should inherit ShouldWarmupWithoutLeadership from manager config", func() {
// Test with manager default setting ShouldWarmupWithoutLeadership to true
managerWithWarmup, err := manager.New(cfg, manager.Options{
Controller: config.Controller{
NeedWarmup: ptr.To(true),
},
})
Expect(err).NotTo(HaveOccurred())
ctrlInheritingWarmup, err := controller.New("inherit-warmup-enabled", managerWithWarmup, controller.Options{
Reconciler: reconcile.Func(nil),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlInheritingWarmup, ok := ctrlInheritingWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlInheritingWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlInheritingWarmup.NeedWarmup).To(BeTrue())

// Test that explicit controller setting overrides manager setting
ctrlOverridingWarmup, err := controller.New("override-warmup-disabled", managerWithWarmup, controller.Options{
Reconciler: reconcile.Func(nil),
NeedWarmup: ptr.To(false),
})
Expect(err).NotTo(HaveOccurred())

internalCtrlOverridingWarmup, ok := ctrlOverridingWarmup.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())
Expect(internalCtrlOverridingWarmup.NeedWarmup).NotTo(BeNil())
Expect(*internalCtrlOverridingWarmup.NeedWarmup).To(BeFalse())
})
})
})
69 changes: 67 additions & 2 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand All @@ -38,6 +40,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

const (
// syncedPollPeriod is the period to poll for cache sync
syncedPollPeriod = 100 * time.Millisecond
)

// Controller implements controller.Controller.
type Controller[request comparable] struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Expand Down Expand Up @@ -83,6 +90,16 @@ type Controller[request comparable] struct {
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []source.TypedSource[request]

// didStartEventSources is used to indicate whether the event sources have been started.
didStartEventSources atomic.Bool

// didEventSourcesFinishSync is used to indicate whether the event sources have finished
// successfully. It stores a *bool where
// - nil: not finished syncing
// - true: finished syncing without error
// - false: finished syncing with error
didEventSourcesFinishSync atomic.Value

// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
Expand All @@ -95,6 +112,12 @@ type Controller[request comparable] struct {

// LeaderElected indicates whether the controller is leader elected or always running.
LeaderElected *bool

// NeedWarmup specifies whether the controller should start its sources
// when the manager is not the leader.
// Defaults to false, which means that the controller will wait for leader election to start
// before starting sources.
NeedWarmup *bool
}

// Reconcile implements reconcile.Reconciler.
Expand Down Expand Up @@ -144,6 +167,38 @@ func (c *Controller[request]) NeedLeaderElection() bool {
return *c.LeaderElected
}

// Warmup implements the manager.WarmupRunnable interface.
func (c *Controller[request]) Warmup(ctx context.Context) error {
if c.NeedWarmup == nil || !*c.NeedWarmup {
return nil
}
return c.startEventSources(ctx)
}

// DidFinishWarmup implements the manager.WarmupRunnable interface.
func (c *Controller[request]) DidFinishWarmup(ctx context.Context) bool {
err := wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (bool, error) {
didFinishSync, ok := c.didEventSourcesFinishSync.Load().(*bool)
if !ok {
return false, errors.New("unexpected error: didEventSourcesFinishSync is not a bool pointer")
}

if didFinishSync == nil {
// event sources not finished syncing
return false, nil
}

if !*didFinishSync {
// event sources finished syncing with an error
return true, errors.New("event sources did not finish syncing successfully")
}

return true, nil
})

return err == nil
}

// Start implements controller.Controller.
func (c *Controller[request]) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
Expand Down Expand Up @@ -221,13 +276,19 @@ func (c *Controller[request]) Start(ctx context.Context) error {
// startEventSources launches all the sources registered with this controller and waits
// for them to sync. It returns an error if any of the sources fail to start or sync.
func (c *Controller[request]) startEventSources(ctx context.Context) error {
Copy link
Member

@sbueringer sbueringer May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think c.Started doesn't work anymore as expected.

With the current version of the PR it's used for two purposes:

  1. Ensure a 2nd call to controller.Start returns an error
  2. Ensure we are appending Sources to c.startWatches only until we call Start()

=> 1. still works as expected. 2 leads to problems

Now the following can happen:

  1. Warmup is completed (i.e. c.startWatches is nil, c.Started is false)
  2. Watch is called => source gets added to c.startWatches
  3. Start is called and skips over the sync.Once in startEventSources

=> So the Source added in 2. is never started

I would suggest to

  • introduce a new field called c.startedEventSources
  • use c.startedEventSources in l.205 instead of c.Started
  • set c.startedEventSources to true in startEventSources after we set c.startWatches to nil
  • Update godoc comment in l.207 to:
	// Sources weren't started yet, store the sources locally and return.
	// These sources are going to be held until either Warmup() or Start(...) is called.
  • Add the following to Warmup after we lock / defer unlock (so further calls to Watch are using this context to start watches instead of nil, Start will overwrite this later)
	// Set the internal context.
	c.ctx = ctx

@alvaroaleman does this sound correct to you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense. Also, lets add a test for this

Copy link
Contributor Author

@godwinpang godwinpang May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d8650df fixes this but there is another race that this uncovers while testing

L336 starts a goroutine that can potentially outlive the duration of the caller holding the lock. The errGroup blocks on case <-sourceStartCtx.Done(), meaning that when the context is cancelled, startEventSources() doesn't wait for the the goroutine on L336 to complete before returning. This means that the mutex acquired in Warmup can be released before L339 is executed.

The problem variable is c.Queue since it can be read on L339, but also assigned to in Start on L261.

Copy link
Contributor Author

@godwinpang godwinpang May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can do something as follows? Keep the current error handling behavior, but add a channel blocking until watch.Start is called in the defer block

select {
case err := <-sourceStartErrChan:
	return err
case <-sourceStartCtx.Done():
        defer func() {<-sourceStartErrChan}() // <-- ⏰ this is the change
	if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
		return <-sourceStartErrChan
	}
	if ctx.Err() != nil { // Don't return an error if the root context got cancelled
		return nil
	}
	return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
}

edit: 730b30e was the attempt, but looks like it doesn't work because it fails the case where watch.Start blocks indefinitely.

edit 2:
bca3e2a I added a hasAccessedQueueChan channel to track whether or not c.Queue has been accessed and is safe to release the lock. Ideas for a better name is welcome :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is bigger. Does Warmup start the sources with a nil queue? (if it is called before Start)

Copy link
Member

@sbueringer sbueringer May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would have to write c.Queue already in New. This also starts the queue though.
So this would also change the observable behavior of functions like NewUnmanaged / NewTypedUnmanaged.
So I think we can't move it into New.

The easiest might be to move it into startEventSourcesLocked (and rename that func to startEventSourcesAndQueueLocked)

Like this

	c.didStartEventSourcesOnce.Do(func() {
		queue := c.NewQueue(c.Name, c.RateLimiter)
		if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue {
			c.Queue = priorityQueue
		} else {
			c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue}
		}
		go func() {
			<-ctx.Done()
			c.Queue.ShutDown()
		}()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one consequence is that depending on if Warmup or Start is actually running the sync.Once in startEventSourcesLocked the queue is getting shutdown if either the Warmup or one of the other runnable groups is shutdown, but this should be fine?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is bigger. Does Warmup start the sources with a nil queue? (if it is called before Start)

Ohh, extremely good catch.

The easiest might be to move it into startEventSourcesLocked (and rename that func to startEventSourcesAndQueueLocked)

This makes sense to me.

I think one consequence is that depending on if Warmup or Start is actually running the sync.Once in startEventSourcesLocked the queue is getting shutdown if either the Warmup or one of the other runnable groups is shutdown, but this should be fine?

I think so, yeah

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure that our tests validate that the queue we pass to the sources is non-nil? This should've been caught by tests and not by a human ideally

// CAS returns false if value is already true, so early exit since another goroutine must have
// called startEventSources previously
if !c.didStartEventSources.CompareAndSwap(false, true) {
c.LogConstructor(nil).Info("Skipping starting event sources since it was already started")
return nil
}

errGroup := &errgroup.Group{}
for _, watch := range c.startWatches {
log := c.LogConstructor(nil)
_, ok := watch.(interface {
String() string
})

if !ok {
log = log.WithValues("source", fmt.Sprintf("%T", watch))
} else {
Expand Down Expand Up @@ -274,7 +335,11 @@ func (c *Controller[request]) startEventSources(ctx context.Context) error {
}
})
}
return errGroup.Wait()
err := errGroup.Wait()

c.didEventSourcesFinishSync.Store(ptr.To(err == nil))

return err
}

// processNextWorkItem will read a single work item off the workqueue and
Expand Down
85 changes: 84 additions & 1 deletion pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -403,7 +404,7 @@ var _ = Describe("controller", func() {
return expectedErr
})

// // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
// Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
ctrl.CacheSyncTimeout = 5 * time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
err := ctrl.startEventSources(ctx)
Expand Down Expand Up @@ -1014,6 +1015,88 @@ var _ = Describe("controller", func() {
})
})
})

Describe("Warmup", func() {
JustBeforeEach(func() {
ctrl.NeedWarmup = ptr.To(true)
})

It("should track warmup status correctly with successful sync", func() {
// Setup controller with sources that complete successfully
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.CacheSyncTimeout = time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
return nil
}),
}

err := ctrl.Warmup(ctx)
Expect(err).NotTo(HaveOccurred())

// Verify DidFinishWarmup returns true for successful sync
result := ctrl.DidFinishWarmup(ctx)
Expect(result).To(BeTrue())
})

It("should track warmup status correctly with unsuccessful sync", func() {
// Setup controller with sources that complete with error
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.CacheSyncTimeout = time.Second
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
return errors.New("sync error")
}),
}

err := ctrl.Warmup(ctx)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sync error"))

// Verify DidFinishWarmup returns false for unsuccessful sync
result := ctrl.DidFinishWarmup(ctx)
Expect(result).To(BeFalse())
})
})

Describe("Warmup with warmup disabled", func() {
JustBeforeEach(func() {
ctrl.NeedWarmup = ptr.To(false)
})

It("should not start sources when Warmup is called if warmup is disabled but start it when Start is called.", func() {
// Setup controller with sources that complete successfully
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctrl.CacheSyncTimeout = time.Second
var isSourceStarted atomic.Bool
isSourceStarted.Store(false)
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
isSourceStarted.Store(true)
return nil
}),
}

By("Calling Warmup when NeedWarmup is false")
err := ctrl.Warmup(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(isSourceStarted.Load()).To(BeFalse())

By("Calling Start when NeedWarmup is false")
// Now call Start
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).To(Succeed())
}()
Eventually(isSourceStarted.Load).Should(BeTrue())
})
})
})

var _ = Describe("ReconcileIDFromContext function", func() {
Expand Down
9 changes: 9 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
return fmt.Errorf("failed to start other runnables: %w", err)
}

// Start and wait for sources to start.
if err := cm.runnables.Warmup.Start(cm.internalCtx); err != nil {
return fmt.Errorf("failed to start warmup runnables: %w", err)
}

// Start the leader election and all required runnables.
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -544,6 +549,10 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
cm.runnables.LeaderElection.startOnce.Do(func() {})
cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx)

// Stop the warmup runnables
cm.logger.Info("Stopping and waiting for warmup runnables")
cm.runnables.Warmup.StopAndWait(cm.shutdownCtx)

// Stop the caches before the leader election runnables, this is an important
// step to make sure that we don't race with the reconcilers by receiving more events
// from the API servers and enqueueing them.
Expand Down
Loading
Loading