Skip to content

Commit 63a3800

Browse files
authored
Merge pull request #2997 from alvaroaleman/start-take-two
🐛 Error when source.Start() never returns
2 parents 48ec3b7 + 8cc205a commit 63a3800

File tree

6 files changed

+91
-45
lines changed

6 files changed

+91
-45
lines changed

examples/scratch-env/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ require (
4444
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
4545
golang.org/x/net v0.30.0 // indirect
4646
golang.org/x/oauth2 v0.23.0 // indirect
47+
golang.org/x/sync v0.8.0 // indirect
4748
golang.org/x/sys v0.26.0 // indirect
4849
golang.org/x/term v0.25.0 // indirect
4950
golang.org/x/text v0.19.0 // indirect

examples/scratch-env/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
129129
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
130130
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
131131
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
132+
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
133+
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
132134
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
133135
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
134136
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ require (
3030
sigs.k8s.io/yaml v1.4.0
3131
)
3232

33+
require golang.org/x/sync v0.8.0
34+
3335
require (
3436
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
3537
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
@@ -79,7 +81,6 @@ require (
7981
go.uber.org/multierr v1.11.0 // indirect
8082
golang.org/x/net v0.30.0 // indirect
8183
golang.org/x/oauth2 v0.23.0 // indirect
82-
golang.org/x/sync v0.8.0 // indirect
8384
golang.org/x/term v0.25.0 // indirect
8485
golang.org/x/text v0.19.0 // indirect
8586
golang.org/x/time v0.7.0 // indirect

pkg/internal/controller/controller.go

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
"errors"
2222
"fmt"
2323
"sync"
24+
"sync/atomic"
2425
"time"
2526

2627
"github.com/go-logr/logr"
28+
"golang.org/x/sync/errgroup"
2729
"k8s.io/apimachinery/pkg/types"
2830
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2931
"k8s.io/apimachinery/pkg/util/uuid"
@@ -171,41 +173,55 @@ func (c *Controller[request]) Start(ctx context.Context) error {
171173
// NB(directxman12): launch the sources *before* trying to wait for the
172174
// caches to sync so that they have a chance to register their intendeded
173175
// caches.
176+
errGroup, _ := errgroup.WithContext(ctx)
174177
for _, watch := range c.startWatches {
175-
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch))
176-
177-
if err := watch.Start(ctx, c.Queue); err != nil {
178-
return err
179-
}
180-
}
181-
182-
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
183-
c.LogConstructor(nil).Info("Starting Controller")
184-
185-
for _, watch := range c.startWatches {
186-
syncingSource, ok := watch.(source.SyncingSource)
187-
if !ok {
188-
continue
189-
}
190-
191-
if err := func() error {
178+
log := c.LogConstructor(nil).WithValues("source", fmt.Sprintf("%s", watch))
179+
didStartSyncingSource := &atomic.Bool{}
180+
errGroup.Go(func() error {
192181
// use a context with timeout for launching sources and syncing caches.
193182
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
194183
defer cancel()
195184

196-
// WaitForSync waits for a definitive timeout, and returns if there
197-
// is an error or a timeout
198-
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
199-
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
200-
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
185+
sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
186+
go func() {
187+
defer close(sourceStartErrChan)
188+
log.Info("Starting EventSource")
189+
if err := watch.Start(ctx, c.Queue); err != nil {
190+
sourceStartErrChan <- err
191+
return
192+
}
193+
syncingSource, ok := watch.(source.SyncingSource)
194+
if !ok {
195+
return
196+
}
197+
didStartSyncingSource.Store(true)
198+
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
199+
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
200+
log.Error(err, "Could not wait for Cache to sync")
201+
sourceStartErrChan <- err
202+
}
203+
}()
204+
205+
select {
206+
case err := <-sourceStartErrChan:
201207
return err
208+
case <-sourceStartCtx.Done():
209+
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
210+
return <-sourceStartErrChan
211+
}
212+
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
213+
return nil
214+
}
215+
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
202216
}
203-
204-
return nil
205-
}(); err != nil {
206-
return err
207-
}
217+
})
208218
}
219+
if err := errGroup.Wait(); err != nil {
220+
return err
221+
}
222+
223+
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
224+
c.LogConstructor(nil).Info("Starting Controller")
209225

210226
// All the watches have been started, we can reset the local slice.
211227
//

pkg/internal/controller/controller_test.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ var _ = Describe("controller", func() {
145145

146146
Describe("Start", func() {
147147
It("should return an error if there is an error waiting for the informers", func() {
148+
ctrl.CacheSyncTimeout = time.Second
148149
f := false
149150
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
150151
source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}),
@@ -158,12 +159,11 @@ var _ = Describe("controller", func() {
158159
})
159160

160161
It("should error when cache sync timeout occurs", func() {
161-
ctrl.CacheSyncTimeout = 10 * time.Nanosecond
162-
163162
c, err := cache.New(cfg, cache.Options{})
164163
Expect(err).NotTo(HaveOccurred())
165164
c = &cacheWithIndefinitelyBlockingGetInformer{c}
166165

166+
ctrl.CacheSyncTimeout = time.Second
167167
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
168168
source.Kind(c, &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}),
169169
}
@@ -174,7 +174,7 @@ var _ = Describe("controller", func() {
174174
Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced"))
175175
})
176176

177-
It("should not error when context cancelled", func() {
177+
It("should not error when controller Start context is cancelled during Sources WaitForSync", func() {
178178
ctrl.CacheSyncTimeout = 1 * time.Second
179179

180180
sourceSynced := make(chan struct{})
@@ -200,27 +200,40 @@ var _ = Describe("controller", func() {
200200
<-sourceSynced
201201
})
202202

203-
It("should not error when cache sync timeout is of sufficiently high", func() {
204-
ctrl.CacheSyncTimeout = 1 * time.Second
203+
It("should error when Start() is blocking forever", func() {
204+
ctrl.CacheSyncTimeout = 0
205+
206+
controllerDone := make(chan struct{})
207+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
208+
source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
209+
<-controllerDone
210+
return ctx.Err()
211+
})}
212+
213+
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
214+
defer cancel()
205215

216+
err := ctrl.Start(ctx)
217+
Expect(err).To(HaveOccurred())
218+
Expect(err.Error()).To(ContainSubstring("Please ensure that its Start() method is non-blocking"))
219+
220+
close(controllerDone)
221+
})
222+
223+
It("should not error when cache sync timeout is of sufficiently high", func() {
224+
ctrl.CacheSyncTimeout = 10 * time.Second
206225
ctx, cancel := context.WithCancel(context.Background())
207226
defer cancel()
208227

209228
sourceSynced := make(chan struct{})
210-
c, err := cache.New(cfg, cache.Options{})
211-
Expect(err).NotTo(HaveOccurred())
229+
c := &informertest.FakeInformers{}
212230
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
213231
&singnallingSourceWrapper{
214232
SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}),
215233
cacheSyncDone: sourceSynced,
216234
},
217235
}
218236

219-
go func() {
220-
defer GinkgoRecover()
221-
Expect(c.Start(ctx)).To(Succeed())
222-
}()
223-
224237
go func() {
225238
defer GinkgoRecover()
226239
Expect(ctrl.Start(ctx)).To(Succeed())
@@ -230,6 +243,7 @@ var _ = Describe("controller", func() {
230243
})
231244

232245
It("should process events from source.Channel", func() {
246+
ctrl.CacheSyncTimeout = 10 * time.Second
233247
// channel to be closed when event is processed
234248
processed := make(chan struct{})
235249
// source channel
@@ -269,6 +283,7 @@ var _ = Describe("controller", func() {
269283
})
270284

271285
It("should error when channel source is not specified", func() {
286+
ctrl.CacheSyncTimeout = 10 * time.Second
272287
ctx, cancel := context.WithCancel(context.Background())
273288
defer cancel()
274289

@@ -281,24 +296,26 @@ var _ = Describe("controller", func() {
281296
})
282297

283298
It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
299+
ctrl.CacheSyncTimeout = 10 * time.Second
284300
started := false
301+
ctx, cancel := context.WithCancel(context.Background())
285302
src := source.Func(func(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
286303
defer GinkgoRecover()
287304
Expect(q).To(Equal(ctrl.Queue))
288305

289306
started = true
307+
cancel()
290308
return nil
291309
})
292310
Expect(ctrl.Watch(src)).NotTo(HaveOccurred())
293311

294-
// Use a cancelled context so Start doesn't block
295-
ctx, cancel := context.WithCancel(context.Background())
296-
cancel()
297-
Expect(ctrl.Start(ctx)).To(Succeed())
312+
err := ctrl.Start(ctx)
313+
Expect(err).To(Succeed())
298314
Expect(started).To(BeTrue())
299315
})
300316

301317
It("should return an error if there is an error starting sources", func() {
318+
ctrl.CacheSyncTimeout = 10 * time.Second
302319
err := fmt.Errorf("Expected Error: could not start source")
303320
src := source.Func(func(context.Context,
304321
workqueue.TypedRateLimitingInterface[reconcile.Request],
@@ -852,6 +869,15 @@ type singnallingSourceWrapper struct {
852869
source.SyncingSource
853870
}
854871

872+
func (s *singnallingSourceWrapper) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
873+
err := s.SyncingSource.Start(ctx, q)
874+
if err != nil {
875+
// WaitForSync will never be called if this errors, so close the channel to prevent deadlocks in tests
876+
close(s.cacheSyncDone)
877+
}
878+
return err
879+
}
880+
855881
func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
856882
defer func() {
857883
close(s.cacheSyncDone)

pkg/internal/source/kind.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type
5252
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
5353
// sync that informer (most commonly due to RBAC issues).
5454
ctx, ks.startCancel = context.WithCancel(ctx)
55-
ks.startedErr = make(chan error)
55+
ks.startedErr = make(chan error, 1) // Buffer chan to not leak goroutines if WaitForSync isn't called
5656
go func() {
5757
var (
5858
i cache.Informer

0 commit comments

Comments
 (0)