@@ -446,6 +446,50 @@ var _ = Describe("Source", func() {
446
446
447
447
close (done )
448
448
})
449
+ It ("should stop when the source channel is closed" , func () {
450
+ q := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "test" )
451
+ // if we didn't stop, we'd start spamming the queue with empty
452
+ // messages as we "received" a zero-valued GenericEvent from
453
+ // the source channel
454
+
455
+ By ("creating a channel with one element, then closing it" )
456
+ ch := make (chan event.GenericEvent , 1 )
457
+ evt := event.GenericEvent {}
458
+ ch <- evt
459
+ close (ch )
460
+
461
+ By ("feeding that channel to a channel source" )
462
+ src := & source.Channel {Source : ch }
463
+ Expect (inject .StopChannelInto (ctx .Done (), src )).To (BeTrue ())
464
+
465
+ processed := make (chan struct {})
466
+ defer close (processed )
467
+
468
+ err := src .Start (ctx , handler.Funcs {
469
+ CreateFunc : func (event.CreateEvent , workqueue.RateLimitingInterface ) {
470
+ defer GinkgoRecover ()
471
+ Fail ("Unexpected CreateEvent" )
472
+ },
473
+ UpdateFunc : func (event.UpdateEvent , workqueue.RateLimitingInterface ) {
474
+ defer GinkgoRecover ()
475
+ Fail ("Unexpected UpdateEvent" )
476
+ },
477
+ DeleteFunc : func (event.DeleteEvent , workqueue.RateLimitingInterface ) {
478
+ defer GinkgoRecover ()
479
+ Fail ("Unexpected DeleteEvent" )
480
+ },
481
+ GenericFunc : func (evt event.GenericEvent , q2 workqueue.RateLimitingInterface ) {
482
+ defer GinkgoRecover ()
483
+
484
+ processed <- struct {}{}
485
+ },
486
+ }, q )
487
+ Expect (err ).NotTo (HaveOccurred ())
488
+
489
+ By ("expecting to only get one event" )
490
+ Eventually (processed ).Should (Receive ())
491
+ Consistently (processed ).ShouldNot (Receive ())
492
+ })
449
493
It ("should get error if no source specified" , func (done Done ) {
450
494
q := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "test" )
451
495
instance := & source.Channel { /*no source specified*/ }
@@ -461,7 +505,6 @@ var _ = Describe("Source", func() {
461
505
Expect (err ).To (Equal (fmt .Errorf ("must call InjectStop on Channel before calling Start" )))
462
506
close (done )
463
507
})
464
-
465
508
})
466
509
Context ("for multi sources (handlers)" , func () {
467
510
It ("should provide GenericEvents for all handlers" , func (done Done ) {
0 commit comments