Skip to content

Commit 79a7b95

Browse files
committed
Fail in warmup directly and rely on sync.Once for warmup thread-safety without WaitForWarmupComplete.
1 parent c9a2973 commit 79a7b95

File tree

6 files changed

+29
-172
lines changed

6 files changed

+29
-172
lines changed

pkg/internal/controller/controller.go

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,6 @@ type ControllerOptions[request comparable] struct {
8383
}
8484

8585
// Controller implements controller.Controller.
86-
// WARNING: If directly instantiating a Controller vs. using the New method, ensure that the
87-
// warmupResultChan is instantiated as a buffered channel of size 1. Otherwise, the controller will
88-
// panic on having Warmup called.
8986
type Controller[request comparable] struct {
9087
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
9188
Name string
@@ -133,10 +130,6 @@ type Controller[request comparable] struct {
133130
// didStartEventSourcesOnce is used to ensure that the event sources are only started once.
134131
didStartEventSourcesOnce sync.Once
135132

136-
// warmupResultChan receives the result (nil / non-nil error) of the warmup method. It is
137-
// consumed by the WaitForWarmupComplete method that the warmup has finished.
138-
warmupResultChan chan error
139-
140133
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
141134
// or for example when a watch is started.
142135
// Note: LogConstructor has to be able to handle nil requests as we are also using it
@@ -169,7 +162,6 @@ func New[request comparable](options ControllerOptions[request]) *Controller[req
169162
RecoverPanic: options.RecoverPanic,
170163
LeaderElected: options.LeaderElected,
171164
EnableWarmup: options.EnableWarmup,
172-
warmupResultChan: make(chan error, 1),
173165
}
174166
}
175167

@@ -226,31 +218,7 @@ func (c *Controller[request]) Warmup(ctx context.Context) error {
226218
return nil
227219
}
228220

229-
// Hold the lock to avoid concurrent access to c.startWatches with Start() when calling
230-
// startEventSources
231-
c.mu.Lock()
232-
defer c.mu.Unlock()
233-
234-
err := c.startEventSources(ctx)
235-
c.warmupResultChan <- err
236-
237-
return err
238-
}
239-
240-
// WaitForWarmupComplete returns true if warmup has completed without error, and false if there was
241-
// an error during warmup. If context is cancelled, it returns true.
242-
func (c *Controller[request]) WaitForWarmupComplete(ctx context.Context) bool {
243-
if c.EnableWarmup == nil || !*c.EnableWarmup {
244-
return true
245-
}
246-
247-
warmupError, ok := <-c.warmupResultChan
248-
if !ok {
249-
// channel closed unexpectedly
250-
return false
251-
}
252-
253-
return warmupError == nil
221+
return c.startEventSources(ctx)
254222
}
255223

256224
// Start implements controller.Controller.

pkg/internal/controller/controller_test.go

Lines changed: 12 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,12 +1121,7 @@ var _ = Describe("controller", func() {
11211121
}),
11221122
}
11231123

1124-
err := ctrl.Warmup(ctx)
1125-
Expect(err).NotTo(HaveOccurred())
1126-
1127-
// Verify WaitForWarmupComplete returns true for successful sync
1128-
result := ctrl.WaitForWarmupComplete(ctx)
1129-
Expect(result).To(BeTrue())
1124+
Expect(ctrl.Warmup(ctx)).To(Succeed())
11301125
})
11311126

11321127
It("should track warmup status correctly with unsuccessful sync", func() {
@@ -1144,10 +1139,6 @@ var _ = Describe("controller", func() {
11441139
err := ctrl.Warmup(ctx)
11451140
Expect(err).To(HaveOccurred())
11461141
Expect(err.Error()).To(ContainSubstring("sync error"))
1147-
1148-
// Verify WaitForWarmupComplete returns false for unsuccessful sync
1149-
result := ctrl.WaitForWarmupComplete(ctx)
1150-
Expect(result).To(BeFalse())
11511142
})
11521143

11531144
It("should return true if context is cancelled while waiting for source to start", func() {
@@ -1163,27 +1154,19 @@ var _ = Describe("controller", func() {
11631154
}),
11641155
}
11651156

1166-
resultChan := make(chan bool)
1167-
11681157
// Wait for the goroutines to finish before returning to avoid racing with the
11691158
// assignment in BeforeEach block.
11701159
var wg sync.WaitGroup
11711160

1172-
// Invoked in goroutines because the Warmup / WaitForWarmupComplete will block forever.
1173-
wg.Add(2)
1161+
// Invoked in a goroutine because Warmup will block
1162+
wg.Add(1)
11741163
go func() {
11751164
defer GinkgoRecover()
11761165
defer wg.Done()
11771166
Expect(ctrl.Warmup(ctx)).To(Succeed())
11781167
}()
1179-
go func() {
1180-
defer GinkgoRecover()
1181-
defer wg.Done()
1182-
resultChan <- ctrl.WaitForWarmupComplete(ctx)
1183-
}()
11841168

11851169
cancel()
1186-
Expect(<-resultChan).To(BeTrue())
11871170
wg.Wait()
11881171
})
11891172

@@ -1196,16 +1179,15 @@ var _ = Describe("controller", func() {
11961179
ctx, cancel := context.WithCancel(context.Background())
11971180
defer cancel()
11981181

1199-
hasCtrlWatchStarted, hasNonWarmupCtrlWatchStarted := atomic.Bool{}, atomic.Bool{}
1200-
1201-
// ctrl watch will block from finishing until the channel is produced to
1202-
ctrlWatchBlockingChan := make(chan struct{})
1182+
By("Creating a channel to track execution order")
1183+
runnableExecutionOrderChan := make(chan string, 2)
1184+
const nonWarmupRunnableName = "nonWarmupRunnable"
1185+
const warmupRunnableName = "warmupRunnable"
12031186

12041187
ctrl.CacheSyncTimeout = time.Second
12051188
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
12061189
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1207-
hasCtrlWatchStarted.Store(true)
1208-
<-ctrlWatchBlockingChan
1190+
runnableExecutionOrderChan <- warmupRunnableName
12091191
return nil
12101192
}),
12111193
}
@@ -1225,7 +1207,7 @@ var _ = Describe("controller", func() {
12251207
})
12261208
nonWarmupCtrl.startWatches = []source.TypedSource[reconcile.Request]{
12271209
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1228-
hasNonWarmupCtrlWatchStarted.Store(true)
1210+
runnableExecutionOrderChan <- nonWarmupRunnableName
12291211
return nil
12301212
}),
12311213
}
@@ -1249,13 +1231,9 @@ var _ = Describe("controller", func() {
12491231
Expect(m.Start(ctx)).To(Succeed())
12501232
}()
12511233

1252-
By("Waiting for the warmup controller to start")
1253-
Eventually(hasCtrlWatchStarted.Load).Should(BeTrue())
1254-
Expect(hasNonWarmupCtrlWatchStarted.Load()).To(BeFalse())
1255-
1256-
By("Unblocking the warmup controller source start")
1257-
close(ctrlWatchBlockingChan)
1258-
Eventually(hasNonWarmupCtrlWatchStarted.Load).Should(BeTrue())
1234+
<-m.Elected()
1235+
Expect(<-runnableExecutionOrderChan).To(Equal(warmupRunnableName))
1236+
Expect(<-runnableExecutionOrderChan).To(Equal(nonWarmupRunnableName))
12591237
})
12601238

12611239
It("should not race with Start and only start sources once", func() {
@@ -1326,58 +1304,6 @@ var _ = Describe("controller", func() {
13261304
Eventually(isSourceStarted.Load).Should(BeTrue())
13271305
})
13281306
})
1329-
1330-
Describe("WaitForWarmupComplete", func() {
1331-
It("should short circuit without blocking if warmup is disabled", func() {
1332-
ctrl.EnableWarmup = ptr.To(false)
1333-
1334-
ctx, cancel := context.WithCancel(context.Background())
1335-
defer cancel()
1336-
1337-
// Call WaitForWarmupComplete and expect it to return immediately
1338-
result := ctrl.WaitForWarmupComplete(ctx)
1339-
Expect(result).To(BeTrue())
1340-
})
1341-
1342-
It("should block until warmup is complete if warmup is enabled", func() {
1343-
ctrl.EnableWarmup = ptr.To(true)
1344-
// Setup controller with sources that complete successfully
1345-
ctx, cancel := context.WithCancel(context.Background())
1346-
defer cancel()
1347-
1348-
// Close the channel to signal watch completion
1349-
shouldWatchCompleteChan := make(chan struct{})
1350-
1351-
ctrl.CacheSyncTimeout = time.Second
1352-
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
1353-
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
1354-
<-shouldWatchCompleteChan
1355-
return nil
1356-
}),
1357-
}
1358-
1359-
By("Starting a blocking warmup")
1360-
go func() {
1361-
defer GinkgoRecover()
1362-
Expect(ctrl.Warmup(ctx)).To(Succeed())
1363-
}()
1364-
1365-
// didWaitForWarmupCompleteReturn is true when the call to WaitForWarmupComplete returns
1366-
didWaitForWarmupCompleteReturn := atomic.Bool{}
1367-
go func() {
1368-
defer GinkgoRecover()
1369-
// Verify WaitForWarmupComplete returns true for successful sync
1370-
Expect(ctrl.WaitForWarmupComplete(ctx)).To(BeTrue())
1371-
didWaitForWarmupCompleteReturn.Store(true)
1372-
}()
1373-
Consistently(didWaitForWarmupCompleteReturn.Load).Should(BeFalse())
1374-
1375-
By("Unblocking the watch to simulate initial sync completion")
1376-
close(shouldWatchCompleteChan)
1377-
Eventually(didWaitForWarmupCompleteReturn.Load).Should(BeTrue())
1378-
})
1379-
1380-
})
13811307
})
13821308

13831309
var _ = Describe("ReconcileIDFromContext function", func() {

pkg/manager/manager.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,6 @@ type LeaderElectionRunnable interface {
321321
type warmupRunnable interface {
322322
// Warmup will be called when the manager is started but before it becomes leader.
323323
Warmup(context.Context) error
324-
325-
// WaitForWarmupComplete is a blocking function that waits for the warmup to be completed. It
326-
// returns false if it could not successfully finish warmup.
327-
WaitForWarmupComplete(context.Context) bool
328324
}
329325

330326
// New returns a new Manager for creating Controllers.

pkg/manager/manager_test.go

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1930,11 +1930,10 @@ var _ = Describe("manger.Manager", func() {
19301930
})
19311931

19321932
It("should run warmup runnables before leader election is won", func() {
1933-
By("Creating channels to track execution order")
1934-
warmupCalled := atomic.Bool{}
1935-
leaderElectionRunnableStarted := atomic.Bool{}
1936-
// warmupRunnable's WarmupFunc will block until this channel is closed
1937-
warmupRunnableWarmupBlockingChan := make(chan struct{})
1933+
By("Creating a channel to track execution order")
1934+
runnableExecutionOrderChan := make(chan string, 2)
1935+
const leaderElectionRunnableName = "leaderElectionRunnable"
1936+
const warmupRunnableName = "warmupRunnable"
19381937

19391938
By("Creating a manager with leader election enabled")
19401939
m, err := New(cfg, Options{
@@ -1952,25 +1951,23 @@ var _ = Describe("manger.Manager", func() {
19521951
// Create a warmup runnable
19531952
warmupRunnable := newWarmupRunnableFunc(
19541953
func(ctx context.Context) error {
1955-
// This is the main runnable that will be executed after leader election
1956-
// Block forever
1954+
// This is the leader election runnable that will be executed after leader election
1955+
// It will block until context is done/cancelled
19571956
<-ctx.Done()
19581957
return nil
19591958
},
19601959
func(ctx context.Context) error {
19611960
// This should be called during startup before leader election
1962-
warmupCalled.Store(true)
1963-
<-warmupRunnableWarmupBlockingChan
1961+
runnableExecutionOrderChan <- warmupRunnableName
19641962
return nil
19651963
},
19661964
)
19671965
Expect(m.Add(warmupRunnable)).To(Succeed())
19681966

19691967
By("Creating a runnable that requires leader election")
1970-
19711968
leaderElectionRunnable := RunnableFunc(
19721969
func(ctx context.Context) error {
1973-
leaderElectionRunnableStarted.Store(true)
1970+
runnableExecutionOrderChan <- leaderElectionRunnableName
19741971
<-ctx.Done()
19751972
return nil
19761973
},
@@ -1985,18 +1982,9 @@ var _ = Describe("manger.Manager", func() {
19851982
Expect(m.Start(ctx)).To(Succeed())
19861983
}()
19871984

1988-
By("Waiting for the warmup runnable to be called")
1989-
Eventually(warmupCalled.Load).Should(BeTrue())
1990-
Expect(leaderElectionRunnableStarted.Load()).To(BeFalse())
1991-
1992-
By("Closing the channel to unblock the warmup runnable")
1993-
close(warmupRunnableWarmupBlockingChan)
1994-
1995-
By("Waiting for leader election to be won")
19961985
<-m.Elected()
1997-
1998-
By("Verifying the leader election runnable is called after election")
1999-
Eventually(leaderElectionRunnableStarted.Load).Should(BeTrue())
1986+
Expect(<-runnableExecutionOrderChan).To(Equal(warmupRunnableName))
1987+
Expect(<-runnableExecutionOrderChan).To(Equal(leaderElectionRunnableName))
20001988
})
20011989
})
20021990

pkg/manager/runnable_group.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,7 @@ func (r *runnables) Add(fn Runnable) error {
6969
return r.Webhooks.Add(fn, nil)
7070
case warmupRunnable, LeaderElectionRunnable:
7171
if warmupRunnable, ok := fn.(warmupRunnable); ok {
72-
if err := r.Warmup.Add(
73-
RunnableFunc(warmupRunnable.Warmup),
74-
func(ctx context.Context) bool {
75-
return warmupRunnable.WaitForWarmupComplete(ctx)
76-
},
77-
); err != nil {
72+
if err := r.Warmup.Add(RunnableFunc(warmupRunnable.Warmup), nil); err != nil {
7873
return err
7974
}
8075
}

pkg/manager/runnable_group_test.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,8 @@ func newWarmupRunnableFunc(
350350
warmupFunc func(context.Context) error,
351351
) *warmupRunnableFunc {
352352
return &warmupRunnableFunc{
353-
startFunc: startFunc,
354-
warmupFunc: warmupFunc,
355-
didWarmupFinishChan: make(chan struct{}),
353+
startFunc: startFunc,
354+
warmupFunc: warmupFunc,
356355
}
357356
}
358357

@@ -361,28 +360,14 @@ func newWarmupRunnableFunc(
361360
type warmupRunnableFunc struct {
362361
startFunc func(context.Context) error
363362
warmupFunc func(context.Context) error
364-
365-
// didWarmupFinishChan is closed when warmup is finished
366-
didWarmupFinishChan chan struct{}
367-
368-
// didWarmupFinishSuccessfully is set to true if warmup was successful
369-
didWarmupFinishSuccessfully atomic.Bool
370363
}
371364

372365
func (r *warmupRunnableFunc) Start(ctx context.Context) error {
373366
return r.startFunc(ctx)
374367
}
375368

376369
func (r *warmupRunnableFunc) Warmup(ctx context.Context) error {
377-
err := r.warmupFunc(ctx)
378-
r.didWarmupFinishSuccessfully.Store(err == nil)
379-
close(r.didWarmupFinishChan)
380-
return err
381-
}
382-
383-
func (r *warmupRunnableFunc) WaitForWarmupComplete(ctx context.Context) bool {
384-
<-r.didWarmupFinishChan
385-
return r.didWarmupFinishSuccessfully.Load()
370+
return r.warmupFunc(ctx)
386371
}
387372

388373
var _ LeaderElectionRunnable = &leaderElectionAndWarmupRunnable{}
@@ -401,9 +386,8 @@ func newLeaderElectionAndWarmupRunnable(
401386
) *leaderElectionAndWarmupRunnable {
402387
return &leaderElectionAndWarmupRunnable{
403388
warmupRunnableFunc: &warmupRunnableFunc{
404-
startFunc: startFunc,
405-
warmupFunc: warmupFunc,
406-
didWarmupFinishChan: make(chan struct{}),
389+
startFunc: startFunc,
390+
warmupFunc: warmupFunc,
407391
},
408392
needLeaderElection: needLeaderElection,
409393
}

0 commit comments

Comments
 (0)