Skip to content

Commit d289118

Browse files
authored
fix: race condition when stopping health check during shutdown (#2177)
Signed-off-by: Alessandro Yuichi Okimoto <yuichijpn@gmail.com>
1 parent 7ca7f44 commit d289118

File tree

9 files changed

+205
-23
lines changed

9 files changed

+205
-23
lines changed

manifests/bucketeer/charts/api/templates/deployment.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,11 @@ spec:
185185
- -c
186186
- |
187187
admin_port={{ .Values.envoy.adminPort }}
188-
# Wait for load balancer propagation (must match app container propagation delay)
189-
sleep 15
190188
wget -q -T 1 -O- --method=POST --body-data='' \
191189
"http://localhost:${admin_port}/drain_listeners?graceful" || true
190+
191+
# Wait for load balancer propagation (must match app container propagation delay)
192+
sleep 15
192193
exit 0
193194
command: ["envoy"]
194195
args:

manifests/bucketeer/charts/batch/templates/deployment.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,11 @@ spec:
223223
- -c
224224
- |
225225
admin_port={{ .Values.envoy.adminPort }}
226-
# Wait for load balancer propagation (must match app container propagation delay)
227-
sleep 15
228226
wget -q -T 1 -O- --method=POST --body-data='' \
229227
"http://localhost:${admin_port}/drain_listeners?graceful" || true
228+
229+
# Wait for load balancer propagation (must match app container propagation delay)
230+
sleep 15
230231
exit 0
231232
command: ["envoy"]
232233
args:

manifests/bucketeer/charts/subscriber/templates/deployment.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,6 @@ spec:
205205
- -c
206206
- |
207207
admin_port={{ .Values.envoy.adminPort }}
208-
# Wait for load balancer propagation (must match app container propagation delay)
209-
sleep 15
210208
wget -q -T 1 -O- --method=POST --body-data='' \
211209
"http://localhost:${admin_port}/drain_listeners?graceful" || true
212210
exit 0

manifests/bucketeer/charts/web/templates/deployment.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,11 @@ spec:
273273
- -c
274274
- |
275275
admin_port={{ .Values.envoy.adminPort }}
276-
# Wait for load balancer propagation (must match app container propagation delay)
277-
sleep 15
278276
wget -q -T 1 -O- --method=POST --body-data='' \
279277
"http://localhost:${admin_port}/drain_listeners?graceful" || true
278+
279+
# Wait for load balancer propagation (must match app container propagation delay)
280+
sleep 15
280281
exit 0
281282
command: ["envoy"]
282283
args:

pkg/api/cmd/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -567,16 +567,16 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
567567
defer func() {
568568
shutdownStartTime := time.Now()
569569

570-
// Wait for K8s endpoint propagation
571-
// This prevents "context deadline exceeded" errors during high traffic.
572-
time.Sleep(propagationDelay)
573-
574570
// Mark as unhealthy so readiness probes fail
575571
// This ensures Kubernetes readiness probe fails on next check,
576572
// preventing new traffic from being routed to this pod.
577573
healthChecker.Stop()
578574
restHealthChecker.Stop()
579575

576+
// Wait for K8s endpoint propagation
577+
// This prevents "context deadline exceeded" errors during high traffic.
578+
time.Sleep(propagationDelay)
579+
580580
// Shutdown order matters due to dependencies:
581581
// 1. apiGateway/httpServer make gRPC calls to the backend server
582582
// 2. We MUST drain them BEFORE stopping the backend sever

pkg/batch/cmd/server/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -632,15 +632,15 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
632632
defer func() {
633633
shutdownStartTime := time.Now()
634634

635-
// Wait for K8s endpoint propagation
636-
// This prevents "context deadline exceeded" errors during high traffic.
637-
time.Sleep(propagationDelay)
638-
639635
// Mark as unhealthy so readiness probes fail
640636
// This ensures Kubernetes readiness probe fails on next check,
641637
// preventing new traffic from being routed to this pod.
642638
healthChecker.Stop()
643639

640+
// Wait for K8s endpoint propagation
641+
// This prevents "context deadline exceeded" errors during high traffic.
642+
time.Sleep(propagationDelay)
643+
644644
// Gracefully stop gRPC Gateway (calls the gRPC server internally)
645645
batchGateway.Stop(serverShutDownTimeout)
646646

pkg/health/health.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ func (s Status) String() string {
4646
type check func(context.Context) Status
4747

4848
type checker struct {
49-
status uint32
49+
status uint32
50+
stopped uint32 // 0 = running, 1 = stopped
5051

5152
interval time.Duration
5253
timeout time.Duration
@@ -104,6 +105,13 @@ func (hc *checker) Run(ctx context.Context) {
104105
}
105106

106107
func (hc *checker) check(ctx context.Context) {
108+
// Don't run checks if already stopped
109+
// This prevents the race condition where Stop() sets status to Unhealthy
110+
// but then check() overrides it back to Healthy
111+
if hc.isStopped() {
112+
return
113+
}
114+
107115
resultChan := make(chan Status, len(hc.checks))
108116
ctx, cancel := context.WithTimeout(ctx, hc.timeout)
109117
defer cancel()
@@ -118,7 +126,12 @@ func (hc *checker) check(ctx context.Context) {
118126
return
119127
}
120128
}
121-
hc.setStatus(Healthy)
129+
130+
// Only set to Healthy if not stopped
131+
// This prevents the race condition during shutdown
132+
if !hc.isStopped() {
133+
hc.setStatus(Healthy)
134+
}
122135
}
123136

124137
func (hc *checker) ServeReadyHTTP(resp http.ResponseWriter, req *http.Request) {
@@ -148,6 +161,13 @@ func (hc *checker) setStatus(s Status) {
148161
atomic.StoreUint32(&hc.status, uint32(s))
149162
}
150163

164+
func (hc *checker) isStopped() bool {
165+
return atomic.LoadUint32(&hc.stopped) == 1
166+
}
167+
151168
func (hc *checker) Stop() {
169+
// Set stopped flag first to prevent any concurrent check() from setting status back to Healthy
170+
atomic.StoreUint32(&hc.stopped, 1)
171+
// Then set status to Unhealthy
152172
hc.setStatus(Unhealthy)
153173
}

pkg/health/health_test.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,3 +372,162 @@ func TestGRPCReadyAffectedByStop(t *testing.T) {
372372
})
373373
}
374374
}
375+
376+
func TestStopPreventsCheckFromSettingHealthy(t *testing.T) {
377+
t.Parallel()
378+
patterns := []struct {
379+
desc string
380+
setupFunc func() *restChecker
381+
expectedStatus int
382+
expectedStopped bool
383+
expectedInternal Status
384+
}{
385+
{
386+
desc: "check() after Stop() does not override status to Healthy",
387+
setupFunc: func() *restChecker {
388+
healthyCheck := func(ctx context.Context) Status {
389+
return Healthy
390+
}
391+
c := NewRestChecker(version, service, WithCheck("healthy", healthyCheck))
392+
c.check(context.Background())
393+
return c
394+
},
395+
expectedStatus: http.StatusServiceUnavailable,
396+
expectedStopped: true,
397+
expectedInternal: Unhealthy,
398+
},
399+
{
400+
desc: "multiple check() calls after Stop() remain Unhealthy",
401+
setupFunc: func() *restChecker {
402+
healthyCheck := func(ctx context.Context) Status {
403+
return Healthy
404+
}
405+
c := NewRestChecker(version, service, WithCheck("healthy", healthyCheck))
406+
c.check(context.Background())
407+
return c
408+
},
409+
expectedStatus: http.StatusServiceUnavailable,
410+
expectedStopped: true,
411+
expectedInternal: Unhealthy,
412+
},
413+
}
414+
415+
for _, p := range patterns {
416+
t.Run(p.desc, func(t *testing.T) {
417+
checker := p.setupFunc()
418+
419+
// Verify initial state is healthy
420+
if checker.getStatus() != Healthy {
421+
t.Errorf("Initial state should be Healthy, got %v", checker.getStatus())
422+
}
423+
424+
// Call Stop()
425+
checker.Stop()
426+
427+
// Verify stopped flag is set
428+
if !checker.isStopped() {
429+
t.Error("Expected isStopped() to be true after Stop()")
430+
}
431+
432+
// Verify status is Unhealthy
433+
if checker.getStatus() != Unhealthy {
434+
t.Errorf("Expected status to be Unhealthy after Stop(), got %v", checker.getStatus())
435+
}
436+
437+
// Call check() to simulate the race condition
438+
// This should NOT override the status back to Healthy
439+
checker.check(context.Background())
440+
441+
// Verify status remains Unhealthy
442+
if checker.getStatus() != p.expectedInternal {
443+
t.Errorf("Expected status to remain %v after check(), got %v",
444+
p.expectedInternal, checker.getStatus())
445+
}
446+
447+
// Verify HTTP response is 503
448+
req := httptest.NewRequest("GET", fmt.Sprintf("%s%s%s", version, service, readyPath), nil)
449+
resp := httptest.NewRecorder()
450+
checker.ServeReadyHTTP(resp, req)
451+
if resp.Code != p.expectedStatus {
452+
t.Errorf("Expected HTTP status %d, got %d", p.expectedStatus, resp.Code)
453+
}
454+
455+
// Call check() multiple times to ensure it stays Unhealthy
456+
for i := 0; i < 5; i++ {
457+
checker.check(context.Background())
458+
if checker.getStatus() != p.expectedInternal {
459+
t.Errorf("After %d check() calls, expected status %v, got %v",
460+
i+1, p.expectedInternal, checker.getStatus())
461+
}
462+
}
463+
})
464+
}
465+
}
466+
467+
func TestStopWithRunningGoroutine(t *testing.T) {
468+
t.Parallel()
469+
patterns := []struct {
470+
desc string
471+
setupFunc func() (*restChecker, context.CancelFunc)
472+
expectedErr error
473+
expectedFinal int
474+
}{
475+
{
476+
desc: "Stop() prevents Run() goroutine from setting status to Healthy",
477+
setupFunc: func() (*restChecker, context.CancelFunc) {
478+
healthyCheck := func(ctx context.Context) Status {
479+
return Healthy
480+
}
481+
c := NewRestChecker(version, service,
482+
WithCheck("healthy", healthyCheck),
483+
WithInterval(10*time.Millisecond))
484+
ctx, cancel := context.WithCancel(context.Background())
485+
go c.Run(ctx)
486+
// Wait for first check to complete
487+
time.Sleep(50 * time.Millisecond)
488+
return c, cancel
489+
},
490+
expectedErr: nil,
491+
expectedFinal: http.StatusServiceUnavailable,
492+
},
493+
}
494+
495+
for _, p := range patterns {
496+
t.Run(p.desc, func(t *testing.T) {
497+
checker, cancel := p.setupFunc()
498+
defer cancel()
499+
500+
// Verify initial state is healthy
501+
if checker.getStatus() != Healthy {
502+
t.Errorf("Initial state should be Healthy, got %v", checker.getStatus())
503+
}
504+
505+
// Call Stop() while Run() goroutine is still running
506+
checker.Stop()
507+
508+
// Verify status is immediately Unhealthy
509+
if checker.getStatus() != Unhealthy {
510+
t.Errorf("Expected status to be Unhealthy immediately after Stop(), got %v",
511+
checker.getStatus())
512+
}
513+
514+
// Wait for multiple check intervals to pass
515+
// The Run() goroutine should NOT override status back to Healthy
516+
time.Sleep(100 * time.Millisecond)
517+
518+
// Verify status remains Unhealthy
519+
if checker.getStatus() != Unhealthy {
520+
t.Errorf("Expected status to remain Unhealthy after waiting, got %v",
521+
checker.getStatus())
522+
}
523+
524+
// Verify HTTP response is 503
525+
req := httptest.NewRequest("GET", fmt.Sprintf("%s%s%s", version, service, readyPath), nil)
526+
resp := httptest.NewRecorder()
527+
checker.ServeReadyHTTP(resp, req)
528+
if resp.Code != p.expectedFinal {
529+
t.Errorf("Expected HTTP status %d, got %d", p.expectedFinal, resp.Code)
530+
}
531+
})
532+
}
533+
}

pkg/web/cmd/server/server.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -861,15 +861,17 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
861861
defer func() {
862862
shutdownStartTime := time.Now()
863863

864-
// Wait for K8s endpoint propagation
865-
// This prevents "context deadline exceeded" errors during high traffic.
866-
time.Sleep(propagationDelay)
867-
868864
// Mark as unhealthy so readiness probes fail
869865
// This ensures Kubernetes readiness probe fails on next check,
870866
// preventing new traffic from being routed to this pod.
871-
healthcheckServer.Stop(5 * time.Second)
867+
// IMPORTANT: Stop the health checker FIRST before stopping the HTTP server
868+
// to prevent race condition where health checks return 200 during shutdown
872869
restHealthChecker.Stop()
870+
healthcheckServer.Stop(5 * time.Second)
871+
872+
// Wait for K8s endpoint propagation
873+
// This prevents "context deadline exceeded" errors during high traffic.
874+
time.Sleep(propagationDelay)
873875

874876
// Stop REST servers in parallel (these call gRPC servers internally)
875877
// Stop these first to drain REST traffic before stopping gRPC

0 commit comments

Comments
 (0)