Skip to content

Commit 3b9d201

Browse files
committed
Don't hotloop on channel source closure
Previously, when we received from the channel in a channel source, we didn't check if it was still open. This lead to hotlooping and spamming the source's handler/queue when the input channel is closed. Now, we stop the source when the channel is closed, similarly to what happens if the context is closed.
1 parent 9e78e65 commit 3b9d201

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

pkg/source/source.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,13 @@ func (cs *Channel) syncLoop(ctx context.Context) {
274274
// Close destination channels
275275
cs.doStop()
276276
return
277-
case evt := <-cs.Source:
277+
case evt, stillOpen := <-cs.Source:
278+
if !stillOpen {
279+
// if the source channel is closed, we're never gonna get
280+
// anything more on it, so stop & bail
281+
cs.doStop()
282+
return
283+
}
278284
cs.distribute(evt)
279285
}
280286
}

pkg/source/source_test.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,50 @@ var _ = Describe("Source", func() {
446446

447447
close(done)
448448
})
449+
It("should stop when the source channel is closed", func() {
450+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
451+
// if we didn't stop, we'd start spamming the queue with empty
452+
// messages as we "received" a zero-valued GenericEvent from
453+
// the source channel
454+
455+
By("creating a channel with one element, then closing it")
456+
ch := make(chan event.GenericEvent, 1)
457+
evt := event.GenericEvent{}
458+
ch <- evt
459+
close(ch)
460+
461+
By("feeding that channel to a channel source")
462+
src := &source.Channel{Source: ch}
463+
Expect(inject.StopChannelInto(ctx.Done(), src)).To(BeTrue())
464+
465+
processed := make(chan struct{})
466+
defer close(processed)
467+
468+
err := src.Start(ctx, handler.Funcs{
469+
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
470+
defer GinkgoRecover()
471+
Fail("Unexpected CreateEvent")
472+
},
473+
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
474+
defer GinkgoRecover()
475+
Fail("Unexpected UpdateEvent")
476+
},
477+
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
478+
defer GinkgoRecover()
479+
Fail("Unexpected DeleteEvent")
480+
},
481+
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
482+
defer GinkgoRecover()
483+
484+
processed <- struct{}{}
485+
},
486+
}, q)
487+
Expect(err).NotTo(HaveOccurred())
488+
489+
By("expecting to only get one event")
490+
Eventually(processed).Should(Receive())
491+
Consistently(processed).ShouldNot(Receive())
492+
})
449493
It("should get error if no source specified", func(done Done) {
450494
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
451495
instance := &source.Channel{ /*no source specified*/ }
@@ -461,7 +505,6 @@ var _ = Describe("Source", func() {
461505
Expect(err).To(Equal(fmt.Errorf("must call InjectStop on Channel before calling Start")))
462506
close(done)
463507
})
464-
465508
})
466509
Context("for multi sources (handlers)", func() {
467510
It("should provide GenericEvents for all handlers", func(done Done) {

0 commit comments

Comments
 (0)