From b34ac667ce2bd64f8eee51fd4fda7930f8cfa554 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Thu, 26 Jun 2025 22:41:00 +0000 Subject: [PATCH 1/3] test: add goroutine leak test for slow runnables during shutdown --- pkg/manager/manager_test.go | 48 +++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 247a33f9dc..25bc02c3e1 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1886,6 +1886,54 @@ var _ = Describe("manger.Manager", func() { Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) }) + It("should not leak goroutines when a runnable returns error slowly after being signaled to stop", func() { + // This test reproduces the race condition where the manager's Start method + // exits due to context cancellation, leaving no one to drain errChan + + currentGRs := goleak.IgnoreCurrent() + + // Create manager with a very short graceful shutdown timeout to reliablytrigger the race condition + shortGracefulShutdownTimeout := 10 * time.Millisecond + m, err := New(cfg, Options{ + GracefulShutdownTimeout: &shortGracefulShutdownTimeout, + }) + Expect(err).NotTo(HaveOccurred()) + + // Add the slow runnable that will return an error after some delay + for i := 0; i < 3; i++ { + slowRunnable := RunnableFunc(func(c context.Context) error { + <-c.Done() + + // Simulate some work that delays the error from being returned + // Choosing a large delay to reliably trigger the race condition + time.Sleep(100 * time.Millisecond) + + // This simulates the race condition where runnables try to send + // errors after the manager has stopped reading from errChan + return errors.New("slow runnable error") + }) + + Expect(m.Add(slowRunnable)).To(Succeed()) + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).To(HaveOccurred()) // We expect error here because the slow runnables will return errors + }() + + // Wait for context to be cancelled + <-ctx.Done() + + // Give time for any leaks to become apparent. This makes sure that we don't false alarm on go routine leaks because runnables are still running. + time.Sleep(300 * time.Millisecond) + + // force-close keep-alive connections + clientTransport.CloseIdleConnections() + Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) + }) + It("should provide a function to get the Config", func() { m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) From 88d95415d730ea8ac5dc6361b525e3d685295828 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Thu, 26 Jun 2025 22:49:39 +0000 Subject: [PATCH 2/3] fix: prevent goroutine leak on manager shutdown timeout --- pkg/manager/manager.go | 2 +- pkg/manager/runnable_group.go | 32 +++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index c3ae317b04..09207fa302 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -417,7 +417,7 @@ func New(config *rest.Config, options Options) (Manager, error) { } errChan := make(chan error, 1) - runnables := newRunnables(options.BaseContext, errChan) + runnables := newRunnables(options.BaseContext, errChan).withLogger(options.Logger) return &controllerManager{ stopProcedureEngaged: ptr.To(int64(0)), cluster: cluster, diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index db5cda7c88..f224b38e5e 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -5,6 +5,7 @@ import ( "errors" "sync" + "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -46,6 +47,16 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { } } +// withLogger returns the runnables with the logger set for all runnable groups. +func (r *runnables) withLogger(logger logr.Logger) *runnables { + r.HTTPServers.withLogger(logger) + r.Webhooks.withLogger(logger) + r.Caches.withLogger(logger) + r.LeaderElection.withLogger(logger) + r.Others.withLogger(logger) + return r +} + // Add adds a runnable to closest group of runnable that they belong to. // // Add should be able to be called before and after Start, but not after StopAndWait. @@ -105,6 +116,9 @@ type runnableGroup struct { // wg is an internal sync.WaitGroup that allows us to properly stop // and wait for all the runnables to finish before returning. wg *sync.WaitGroup + + // logger is used for logging when errors are dropped during shutdown + logger logr.Logger } func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup { @@ -113,12 +127,18 @@ func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnable errChan: errChan, ch: make(chan *readyRunnable), wg: new(sync.WaitGroup), + logger: logr.Discard(), // Default to no-op logger } r.ctx, r.cancel = context.WithCancel(baseContext()) return r } +// withLogger sets the logger for this runnable group. +func (r *runnableGroup) withLogger(logger logr.Logger) { + r.logger = logger +} + // Started returns true if the group has started. func (r *runnableGroup) Started() bool { r.start.Lock() @@ -224,7 +244,17 @@ func (r *runnableGroup) reconcile() { // Start the runnable. if err := rn.Start(r.ctx); err != nil { - r.errChan <- err + // Send error with context awareness to prevent blocking during shutdown + select { + case r.errChan <- err: + // Error sent successfully + case <-r.ctx.Done(): + // Context cancelled (shutdown), drop error to prevent blocking forever + // This prevents goroutine leaks when error drain go routine has exited after timeout + if !errors.Is(err, context.Canceled) { // don't log context.Canceled errors as they are expected during shutdown + r.logger.Info("error dropped during shutdown to prevent goroutine leak", "error", err) + } + } } }(runnable) } From 0279a299c355e3fcbc3a2df9b62c5eca8ac2ceb1 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Tue, 8 Jul 2025 21:10:26 +0000 Subject: [PATCH 3/3] address comments --- pkg/manager/runnable_group.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index f224b38e5e..4bf4aff43c 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -244,16 +244,26 @@ func (r *runnableGroup) reconcile() { // Start the runnable. if err := rn.Start(r.ctx); err != nil { - // Send error with context awareness to prevent blocking during shutdown - select { - case r.errChan <- err: - // Error sent successfully - case <-r.ctx.Done(): - // Context cancelled (shutdown), drop error to prevent blocking forever - // This prevents goroutine leaks when error drain go routine has exited after timeout - if !errors.Is(err, context.Canceled) { // don't log context.Canceled errors as they are expected during shutdown - r.logger.Info("error dropped during shutdown to prevent goroutine leak", "error", err) + // Check if we're during the shutdown process. + r.stop.RLock() + isStopped := r.stopped + r.stop.RUnlock() + + if isStopped { + // During shutdown, try to send error first (error drain goroutine might still be running) + // but drop if it would block to prevent goroutine leaks + select { + case r.errChan <- err: + // Error sent successfully (error drain goroutine is still running) + default: + // Error drain goroutine has exited, drop error to prevent goroutine leak + if !errors.Is(err, context.Canceled) { // don't log context.Canceled errors as they are expected during shutdown + r.logger.Info("error dropped during shutdown to prevent goroutine leak", "error", err) + } } + } else { + // During normal operation, always try to send errors (may block briefly) + r.errChan <- err } } }(runnable)