Skip to content

Commit 8cc205a

Browse files
committed
🐛 Error when source.Start() never returns
Contrary to everything else in controller-runtime, we expect `source.Start` to be non-blocking. If someone implements a custom source and gets this wrong, the resulting behavior is that the binary starts successfully, but no reconciliation happens which is extremely difficult to understand and debug. This change makes us use the `CacheSyncTimeout` not only for the sources `WaitForSync` but also for its `Start`. It is worth noting that the current design of both requiring `Start` to not block and `WaitForSync` to block is very confusing. It likely came to be because we basicaly require two distinct contexsts in `Start`, one to indicate the lifetime of the `Source` and one to indicate the `Start` timeout. To overall simplify and improve the code, the change also parallelizes the `Start` of the sources.
1 parent aaaefb4 commit 8cc205a

File tree

6 files changed

+91
-45
lines changed

6 files changed

+91
-45
lines changed

examples/scratch-env/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ require (
4444
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
4545
golang.org/x/net v0.30.0 // indirect
4646
golang.org/x/oauth2 v0.23.0 // indirect
47+
golang.org/x/sync v0.8.0 // indirect
4748
golang.org/x/sys v0.26.0 // indirect
4849
golang.org/x/term v0.25.0 // indirect
4950
golang.org/x/text v0.19.0 // indirect

examples/scratch-env/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
129129
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
130130
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
131131
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
132+
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
133+
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
132134
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
133135
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
134136
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ require (
3030
sigs.k8s.io/yaml v1.4.0
3131
)
3232

33+
require golang.org/x/sync v0.8.0
34+
3335
require (
3436
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
3537
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
@@ -79,7 +81,6 @@ require (
7981
go.uber.org/multierr v1.11.0 // indirect
8082
golang.org/x/net v0.30.0 // indirect
8183
golang.org/x/oauth2 v0.23.0 // indirect
82-
golang.org/x/sync v0.8.0 // indirect
8384
golang.org/x/term v0.25.0 // indirect
8485
golang.org/x/text v0.19.0 // indirect
8586
golang.org/x/time v0.7.0 // indirect

pkg/internal/controller/controller.go

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
"errors"
2222
"fmt"
2323
"sync"
24+
"sync/atomic"
2425
"time"
2526

2627
"github.com/go-logr/logr"
28+
"golang.org/x/sync/errgroup"
2729
"k8s.io/apimachinery/pkg/types"
2830
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2931
"k8s.io/apimachinery/pkg/util/uuid"
@@ -171,41 +173,55 @@ func (c *Controller[request]) Start(ctx context.Context) error {
171173
// NB(directxman12): launch the sources *before* trying to wait for the
172174
// caches to sync so that they have a chance to register their intendeded
173175
// caches.
176+
errGroup, _ := errgroup.WithContext(ctx)
174177
for _, watch := range c.startWatches {
175-
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch))
176-
177-
if err := watch.Start(ctx, c.Queue); err != nil {
178-
return err
179-
}
180-
}
181-
182-
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
183-
c.LogConstructor(nil).Info("Starting Controller")
184-
185-
for _, watch := range c.startWatches {
186-
syncingSource, ok := watch.(source.SyncingSource)
187-
if !ok {
188-
continue
189-
}
190-
191-
if err := func() error {
178+
log := c.LogConstructor(nil).WithValues("source", fmt.Sprintf("%s", watch))
179+
didStartSyncingSource := &atomic.Bool{}
180+
errGroup.Go(func() error {
192181
// use a context with timeout for launching sources and syncing caches.
193182
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
194183
defer cancel()
195184

196-
// WaitForSync waits for a definitive timeout, and returns if there
197-
// is an error or a timeout
198-
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
199-
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
200-
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
185+
sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
186+
go func() {
187+
defer close(sourceStartErrChan)
188+
log.Info("Starting EventSource")
189+
if err := watch.Start(ctx, c.Queue); err != nil {
190+
sourceStartErrChan <- err
191+
return
192+
}
193+
syncingSource, ok := watch.(source.SyncingSource)
194+
if !ok {
195+
return
196+
}
197+
didStartSyncingSource.Store(true)
198+
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
199+
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
200+
log.Error(err, "Could not wait for Cache to sync")
201+
sourceStartErrChan <- err
202+
}
203+
}()
204+
205+
select {
206+
case err := <-sourceStartErrChan:
201207
return err
208+
case <-sourceStartCtx.Done():
209+
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
210+
return <-sourceStartErrChan
211+
}
212+
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
213+
return nil
214+
}
215+
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
202216
}
203-
204-
return nil
205-
}(); err != nil {
206-
return err
207-
}
217+
})
208218
}
219+
if err := errGroup.Wait(); err != nil {
220+
return err
221+
}
222+
223+
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
224+
c.LogConstructor(nil).Info("Starting Controller")
209225

210226
// All the watches have been started, we can reset the local slice.
211227
//

pkg/internal/controller/controller_test.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ var _ = Describe("controller", func() {
145145

146146
Describe("Start", func() {
147147
It("should return an error if there is an error waiting for the informers", func() {
148+
ctrl.CacheSyncTimeout = time.Second
148149
f := false
149150
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
150151
source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}),
@@ -158,12 +159,11 @@ var _ = Describe("controller", func() {
158159
})
159160

160161
It("should error when cache sync timeout occurs", func() {
161-
ctrl.CacheSyncTimeout = 10 * time.Nanosecond
162-
163162
c, err := cache.New(cfg, cache.Options{})
164163
Expect(err).NotTo(HaveOccurred())
165164
c = &cacheWithIndefinitelyBlockingGetInformer{c}
166165

166+
ctrl.CacheSyncTimeout = time.Second
167167
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
168168
source.Kind(c, &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}),
169169
}
@@ -174,7 +174,7 @@ var _ = Describe("controller", func() {
174174
Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced"))
175175
})
176176

177-
It("should not error when context cancelled", func() {
177+
It("should not error when controller Start context is cancelled during Sources WaitForSync", func() {
178178
ctrl.CacheSyncTimeout = 1 * time.Second
179179

180180
sourceSynced := make(chan struct{})
@@ -200,27 +200,40 @@ var _ = Describe("controller", func() {
200200
<-sourceSynced
201201
})
202202

203-
It("should not error when cache sync timeout is of sufficiently high", func() {
204-
ctrl.CacheSyncTimeout = 1 * time.Second
203+
It("should error when Start() is blocking forever", func() {
204+
ctrl.CacheSyncTimeout = 0
205+
206+
controllerDone := make(chan struct{})
207+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
208+
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
209+
<-controllerDone
210+
return ctx.Err()
211+
})}
212+
213+
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
214+
defer cancel()
205215

216+
err := ctrl.Start(ctx)
217+
Expect(err).To(HaveOccurred())
218+
Expect(err.Error()).To(ContainSubstring("Please ensure that its Start() method is non-blocking"))
219+
220+
close(controllerDone)
221+
})
222+
223+
It("should not error when cache sync timeout is of sufficiently high", func() {
224+
ctrl.CacheSyncTimeout = 10 * time.Second
206225
ctx, cancel := context.WithCancel(context.Background())
207226
defer cancel()
208227

209228
sourceSynced := make(chan struct{})
210-
c, err := cache.New(cfg, cache.Options{})
211-
Expect(err).NotTo(HaveOccurred())
229+
c := &informertest.FakeInformers{}
212230
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
213231
&singnallingSourceWrapper{
214232
SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}),
215233
cacheSyncDone: sourceSynced,
216234
},
217235
}
218236

219-
go func() {
220-
defer GinkgoRecover()
221-
Expect(c.Start(ctx)).To(Succeed())
222-
}()
223-
224237
go func() {
225238
defer GinkgoRecover()
226239
Expect(ctrl.Start(ctx)).To(Succeed())
@@ -230,6 +243,7 @@ var _ = Describe("controller", func() {
230243
})
231244

232245
It("should process events from source.Channel", func() {
246+
ctrl.CacheSyncTimeout = 10 * time.Second
233247
// channel to be closed when event is processed
234248
processed := make(chan struct{})
235249
// source channel
@@ -269,6 +283,7 @@ var _ = Describe("controller", func() {
269283
})
270284

271285
It("should error when channel source is not specified", func() {
286+
ctrl.CacheSyncTimeout = 10 * time.Second
272287
ctx, cancel := context.WithCancel(context.Background())
273288
defer cancel()
274289

@@ -281,24 +296,26 @@ var _ = Describe("controller", func() {
281296
})
282297

283298
It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
299+
ctrl.CacheSyncTimeout = 10 * time.Second
284300
started := false
301+
ctx, cancel := context.WithCancel(context.Background())
285302
src := source.Func(func(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
286303
defer GinkgoRecover()
287304
Expect(q).To(Equal(ctrl.Queue))
288305

289306
started = true
307+
cancel()
290308
return nil
291309
})
292310
Expect(ctrl.Watch(src)).NotTo(HaveOccurred())
293311

294-
// Use a cancelled context so Start doesn't block
295-
ctx, cancel := context.WithCancel(context.Background())
296-
cancel()
297-
Expect(ctrl.Start(ctx)).To(Succeed())
312+
err := ctrl.Start(ctx)
313+
Expect(err).To(Succeed())
298314
Expect(started).To(BeTrue())
299315
})
300316

301317
It("should return an error if there is an error starting sources", func() {
318+
ctrl.CacheSyncTimeout = 10 * time.Second
302319
err := fmt.Errorf("Expected Error: could not start source")
303320
src := source.Func(func(context.Context,
304321
workqueue.TypedRateLimitingInterface[reconcile.Request],
@@ -852,6 +869,15 @@ type singnallingSourceWrapper struct {
852869
source.SyncingSource
853870
}
854871

872+
func (s *singnallingSourceWrapper) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
873+
err := s.SyncingSource.Start(ctx, q)
874+
if err != nil {
875+
// WaitForSync will never be called if this errors, so close the channel to prevent deadlocks in tests
876+
close(s.cacheSyncDone)
877+
}
878+
return err
879+
}
880+
855881
func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
856882
defer func() {
857883
close(s.cacheSyncDone)

pkg/internal/source/kind.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type
5252
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
5353
// sync that informer (most commonly due to RBAC issues).
5454
ctx, ks.startCancel = context.WithCancel(ctx)
55-
ks.startedErr = make(chan error)
55+
ks.startedErr = make(chan error, 1) // Buffer chan to not leak goroutines if WaitForSync isn't called
5656
go func() {
5757
var (
5858
i cache.Informer

0 commit comments

Comments
 (0)