Skip to content

🐛 Fix(manager): Prevent goroutine leak on shutdown timeout #3247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
}

errChan := make(chan error, 1)
runnables := newRunnables(options.BaseContext, errChan)
runnables := newRunnables(options.BaseContext, errChan).withLogger(options.Logger)
return &controllerManager{
stopProcedureEngaged: ptr.To(int64(0)),
cluster: cluster,
Expand Down
48 changes: 48 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,54 @@ var _ = Describe("manger.Manager", func() {
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
})

It("should not leak goroutines when a runnable returns error slowly after being signaled to stop", func() {
// This test reproduces the race condition where the manager's Start method
// exits due to context cancellation, leaving no one to drain errChan

currentGRs := goleak.IgnoreCurrent()

// Create manager with a very short graceful shutdown timeout to reliablytrigger the race condition
shortGracefulShutdownTimeout := 10 * time.Millisecond
m, err := New(cfg, Options{
GracefulShutdownTimeout: &shortGracefulShutdownTimeout,
})
Expect(err).NotTo(HaveOccurred())

// Add the slow runnable that will return an error after some delay
for i := 0; i < 3; i++ {
slowRunnable := RunnableFunc(func(c context.Context) error {
<-c.Done()

// Simulate some work that delays the error from being returned
// Choosing a large delay to reliably trigger the race condition
time.Sleep(100 * time.Millisecond)

// This simulates the race condition where runnables try to send
// errors after the manager has stopped reading from errChan
return errors.New("slow runnable error")
})

Expect(m.Add(slowRunnable)).To(Succeed())
}

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).To(HaveOccurred()) // We expect error here because the slow runnables will return errors
}()

// Wait for context to be cancelled
<-ctx.Done()

// Give time for any leaks to become apparent. This makes sure that we don't false alarm on go routine leaks because runnables are still running.
time.Sleep(300 * time.Millisecond)

// force-close keep-alive connections
clientTransport.CloseIdleConnections()
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
})

It("should provide a function to get the Config", func() {
m, err := New(cfg, Options{})
Expect(err).NotTo(HaveOccurred())
Expand Down
42 changes: 41 additions & 1 deletion pkg/manager/runnable_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"sync"

"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

Expand Down Expand Up @@ -46,6 +47,16 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
}
}

// withLogger returns the runnables with the logger set for all runnable groups.
func (r *runnables) withLogger(logger logr.Logger) *runnables {
r.HTTPServers.withLogger(logger)
r.Webhooks.withLogger(logger)
r.Caches.withLogger(logger)
r.LeaderElection.withLogger(logger)
r.Others.withLogger(logger)
return r
}

// Add adds a runnable to closest group of runnable that they belong to.
//
// Add should be able to be called before and after Start, but not after StopAndWait.
Expand Down Expand Up @@ -105,6 +116,9 @@ type runnableGroup struct {
// wg is an internal sync.WaitGroup that allows us to properly stop
// and wait for all the runnables to finish before returning.
wg *sync.WaitGroup

// logger is used for logging when errors are dropped during shutdown
logger logr.Logger
}

func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
Expand All @@ -113,12 +127,18 @@ func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnable
errChan: errChan,
ch: make(chan *readyRunnable),
wg: new(sync.WaitGroup),
logger: logr.Discard(), // Default to no-op logger
}

r.ctx, r.cancel = context.WithCancel(baseContext())
return r
}

// withLogger sets the logger for this runnable group.
func (r *runnableGroup) withLogger(logger logr.Logger) {
r.logger = logger
}

// Started returns true if the group has started.
func (r *runnableGroup) Started() bool {
r.start.Lock()
Expand Down Expand Up @@ -224,7 +244,27 @@ func (r *runnableGroup) reconcile() {

// Start the runnable.
if err := rn.Start(r.ctx); err != nil {
r.errChan <- err
// Check if we're during the shutdown process.
r.stop.RLock()
isStopped := r.stopped
r.stop.RUnlock()

if isStopped {
// During shutdown, try to send error first (error drain goroutine might still be running)
// but drop if it would block to prevent goroutine leaks
select {
case r.errChan <- err:
// Error sent successfully (error drain goroutine is still running)
default:
// Error drain goroutine has exited, drop error to prevent goroutine leak
if !errors.Is(err, context.Canceled) { // don't log context.Canceled errors as they are expected during shutdown
r.logger.Info("error dropped during shutdown to prevent goroutine leak", "error", err)
}
}
} else {
// During normal operation, always try to send errors (may block briefly)
r.errChan <- err
}
}
}(runnable)
}
Expand Down