Skip to content

Commit 7d7e722

Browse files
committed
chore: change api agteway server context
Signed-off-by: Alessandro Yuichi Okimoto <yuichijpn@gmail.com>
1 parent 1d75ac9 commit 7d7e722

File tree

10 files changed

+70
-79
lines changed

10 files changed

+70
-79
lines changed

manifests/bucketeer/charts/api/templates/deployment.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,9 @@ spec:
189189
190190
# Wait for active requests to drain
191191
for i in $(seq 1 "$max_wait"); do
192-
active=$(wget -q -O- "http://localhost:${admin_port}/stats" 2>/dev/null | grep "http.ingress_http.downstream_rq_active" | awk '{print $2}' || echo "0")
192+
active=$(wget -q -T 1 -O- "http://127.0.0.1:${admin_port}/stats" 2>/dev/null \
193+
| grep -E '^http\.ingress_http\.downstream_rq_active:' \
194+
| awk '{print $2}' || echo "0")
193195
[ -z "$active" ] && active=0
194196
[ "$active" -eq 0 ] && break
195197
sleep 1

manifests/bucketeer/charts/batch/templates/deployment.yaml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,13 @@ spec:
223223
- -c
224224
- |
225225
admin_port={{ .Values.envoy.adminPort }}
226-
max_wait=35
227-
propagation_delay=15
228-
229-
# Wait for GCLB to detect unhealthy status and stop routing
230-
sleep "$propagation_delay"
226+
max_wait=60
231227
232228
# Wait for active requests to drain
233229
for i in $(seq 1 "$max_wait"); do
234-
active=$(wget -q -O- "http://localhost:${admin_port}/stats" 2>/dev/null | grep "http.ingress_http.downstream_rq_active" | awk '{print $2}' || echo "0")
230+
active=$(wget -q -T 1 -O- "http://127.0.0.1:${admin_port}/stats" 2>/dev/null \
231+
| grep -E '^http\.ingress_http\.downstream_rq_active:' \
232+
| awk '{print $2}' || echo "0")
235233
[ -z "$active" ] && active=0
236234
[ "$active" -eq 0 ] && break
237235
sleep 1

manifests/bucketeer/charts/subscriber/templates/deployment.yaml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -203,15 +203,13 @@ spec:
203203
- -c
204204
- |
205205
admin_port={{ .Values.envoy.adminPort }}
206-
max_wait=35
207-
propagation_delay=15
208-
209-
# Wait for GCLB to detect unhealthy status and stop routing
210-
sleep "$propagation_delay"
206+
max_wait=60
211207
212208
# Wait for active requests to drain
213209
for i in $(seq 1 "$max_wait"); do
214-
active=$(wget -q -O- "http://localhost:${admin_port}/stats" 2>/dev/null | grep "http.ingress_http.downstream_rq_active" | awk '{print $2}' || echo "0")
210+
active=$(wget -q -T 1 -O- "http://127.0.0.1:${admin_port}/stats" 2>/dev/null \
211+
| grep -E '^http\.ingress_http\.downstream_rq_active:' \
212+
| awk '{print $2}' || echo "0")
215213
[ -z "$active" ] && active=0
216214
[ "$active" -eq 0 ] && break
217215
sleep 1

manifests/bucketeer/charts/web/templates/deployment.yaml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -273,15 +273,13 @@ spec:
273273
- -c
274274
- |
275275
admin_port={{ .Values.envoy.adminPort }}
276-
max_wait=35
277-
propagation_delay=15
278-
279-
# Wait for GCLB to detect unhealthy status and stop routing
280-
sleep "$propagation_delay"
276+
max_wait=60
281277
282278
# Wait for active requests to drain
283279
for i in $(seq 1 "$max_wait"); do
284-
active=$(wget -q -O- "http://localhost:${admin_port}/stats" 2>/dev/null | grep "http.ingress_http.downstream_rq_active" | awk '{print $2}' || echo "0")
280+
active=$(wget -q -T 1 -O- "http://127.0.0.1:${admin_port}/stats" 2>/dev/null \
281+
| grep -E '^http\.ingress_http\.downstream_rq_active:' \
282+
| awk '{print $2}' || echo "0")
285283
[ -z "$active" ] && active=0
286284
[ "$active" -eq 0 ] && break
287285
sleep 1

pkg/api/cmd/server.go

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,6 @@ func RegisterCommand(r cli.CommandRegistry, p cli.ParentCommand) cli.Command {
230230
func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.Logger) error {
231231
registerer := metrics.DefaultRegisterer()
232232

233-
pubsubCtx, pubsubCancel := context.WithTimeout(ctx, 5*time.Second)
234-
defer pubsubCancel()
235-
236233
// Create PubSub client using the factory
237234
pubSubType := factory.PubSubType(*s.pubSubType)
238235
factoryOpts := []factory.Option{
@@ -261,6 +258,8 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
261258
factoryOpts = append(factoryOpts, factory.WithPartitionCount(*s.pubSubRedisPartitionCount))
262259
}
263260

261+
pubsubCtx, pubsubCancel := context.WithCancel(context.Background())
262+
defer pubsubCancel()
264263
pubsubClient, err := factory.NewClient(pubsubCtx, factoryOpts...)
265264
if err != nil {
266265
return err
@@ -388,7 +387,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
388387
if err != nil {
389388
return err
390389
}
391-
defer auditLogClient.Close()
390+
defer autoOpsClient.Close()
392391

393392
tagClient, err := tagclient.NewClient(*s.tagService, *s.certPath,
394393
client.WithPerRPCCredentials(creds),
@@ -502,8 +501,8 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
502501
// We don't check the Redis health status because if the check fails,
503502
// the Kubernetes will restart the container and it might cause internal errors.
504503
// Use a dedicated context so we can stop the health checker goroutine cleanly during shutdown
505-
healthCheckCtx, healthCheckCancel := context.WithCancel(ctx)
506-
defer healthCheckCancel() // Ensure cleanup on all paths (including early returns)
504+
healthCheckCtx, healthCheckCancel := context.WithCancel(context.Background())
505+
defer healthCheckCancel()
507506

508507
healthChecker := health.NewGrpcChecker(
509508
health.WithTimeout(5*time.Second),
@@ -545,7 +544,9 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
545544
return fmt.Errorf("failed to create API gateway: %v", err)
546545
}
547546

548-
if err := apiGateway.Start(ctx, gatewayHandler); err != nil {
547+
serverCtx, serverCtxCancel := context.WithCancel(context.Background())
548+
defer serverCtxCancel()
549+
if err := apiGateway.Start(serverCtx, gatewayHandler); err != nil {
549550
return fmt.Errorf("failed to start API gateway: %v", err)
550551
}
551552

@@ -584,27 +585,20 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
584585
shutdownStartTime := time.Now()
585586
logger.Info("Starting graceful shutdown sequence")
586587

587-
waitBeforeUnready := 10 * time.Second
588-
logger.Info("Waiting before marking unready",
589-
zap.Duration("wait_before_unready", waitBeforeUnready))
590-
time.Sleep(waitBeforeUnready)
588+
// Wait for K8s endpoint propagation
589+
// This prevents "context deadline exceeded" errors during high traffic.
590+
time.Sleep(propagationDelay)
591+
logger.Info("Starting HTTP/gRPC server shutdown")
591592

592-
// Cancel the health checker goroutines to prevent connection errors during shutdown
593-
healthCheckCancel()
594593
// Mark as unhealthy so readiness probes fail
595594
// This ensures Kubernetes readiness probe fails on next check,
596595
// preventing new traffic from being routed to this pod.
597596
healthChecker.Stop()
598597
restHealthChecker.Stop()
599598

600-
// Wait for K8s endpoint propagation
601-
// This prevents "context deadline exceeded" errors during high traffic.
602-
time.Sleep(propagationDelay)
603-
logger.Info("Starting HTTP/gRPC server shutdown")
604-
605-
// CRITICAL: Shutdown order matters due to dependencies:
599+
// Shutdown order matters due to dependencies:
606600
// 1. apiGateway/httpServer make gRPC calls to the backend server
607-
// 2. We MUST drain them BEFORE stopping the backend
601+
// 2. We MUST drain them BEFORE stopping the backend sever
608602
// 3. Otherwise their handlers hang waiting for a dead backend
609603
// We run apiGateway and httpServer in parallel since they don't depend on each other
610604
var wg sync.WaitGroup
@@ -623,9 +617,11 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
623617

624618
// Wait for HTTP/REST traffic to fully drain
625619
wg.Wait()
620+
logger.Info("gRPC-gateway and HTTP server shutdown completed")
626621

627622
// Now it's safe to stop the gRPC server (no more HTTP→gRPC calls)
628623
server.Stop(grpcStopTimeout)
624+
logger.Info("gRPC server shutdown completed")
629625

630626
// Close clients
631627
// These are fast cleanup operations that can run asynchronously.

pkg/batch/cmd/server/server.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -584,8 +584,8 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
584584
)
585585

586586
// Use a dedicated context so we can stop the health checker goroutine cleanly during shutdown
587-
healthCheckCtx, healthCheckCancel := context.WithCancel(ctx)
588-
defer healthCheckCancel() // Ensure cleanup on all paths (including early returns)
587+
healthCheckCtx, healthCheckCancel := context.WithCancel(context.Background())
588+
defer healthCheckCancel()
589589

590590
healthChecker := health.NewGrpcChecker(
591591
health.WithTimeout(time.Second),
@@ -627,31 +627,33 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
627627
return fmt.Errorf("failed to create batch gateway: %v", err)
628628
}
629629

630-
if err := batchGateway.Start(ctx, batchHandler); err != nil {
630+
batchCtx, batchCancel := context.WithCancel(context.Background())
631+
defer batchCancel()
632+
if err := batchGateway.Start(batchCtx, batchHandler); err != nil {
631633
return fmt.Errorf("failed to start batch gateway: %v", err)
632634
}
633635

634636
defer func() {
635637
shutdownStartTime := time.Now()
636638
logger.Info("Starting graceful shutdown sequence")
637639

638-
// Cancel the health checker goroutines to prevent connection errors during shutdown
639-
healthCheckCancel()
640-
// Mark as unhealthy so readiness probes fail
641-
// This ensures Kubernetes readiness probe fails on next check,
642-
// preventing new traffic from being routed to this pod.
643-
healthChecker.Stop()
644-
645640
// Wait for K8s endpoint propagation
646641
// This prevents "context deadline exceeded" errors during high traffic.
647642
time.Sleep(propagationDelay)
648643
logger.Info("Starting HTTP/gRPC server shutdown")
649644

650-
// Gracefully stop REST gateway (calls the gRPC server internally)
645+
// Mark as unhealthy so readiness probes fail
646+
// This ensures Kubernetes readiness probe fails on next check,
647+
// preventing new traffic from being routed to this pod.
648+
healthChecker.Stop()
649+
650+
// Gracefully stop gRPC Gateway (calls the gRPC server internally)
651651
batchGateway.Stop(serverShutDownTimeout)
652+
logger.Info("gRPC-gateway server shutdown completed")
652653

653654
// Stop gRPC server (only pure gRPC connections remain)
654655
server.Stop(grpcStopTimeout)
656+
logger.Info("gRPC server shutdown completed")
655657

656658
// Close clients
657659
// These are fast cleanup operations that can run asynchronously.

pkg/subscriber/cmd/server/server.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
343343
// healthCheckService
344344
// Use a dedicated context so we can stop the health checker goroutine cleanly during shutdown
345345
healthCheckCtx, healthCheckCancel := context.WithCancel(ctx)
346-
defer healthCheckCancel() // Ensure cleanup on all paths (including early returns)
346+
defer healthCheckCancel()
347347

348348
restHealthChecker := health.NewRestChecker(
349349
"", "",
@@ -365,8 +365,6 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
365365
shutdownStartTime := time.Now()
366366
logger.Info("Starting graceful shutdown sequence")
367367

368-
// Cancel the health checker goroutines to prevent connection errors during shutdown
369-
healthCheckCancel()
370368
// Mark as unhealthy so readiness probes fail
371369
// This ensures Kubernetes readiness probe fails on next check,
372370
// preventing new traffic from being routed to this pod.
@@ -376,6 +374,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
376374
// Stop PubSub subscription
377375
// This stops receiving new messages and allows in-flight messages to be processed.
378376
multiPubSub.Stop()
377+
logger.Info("PubSub subscription stopped, all messages processed")
379378

380379
// Close clients
381380
// These are fast cleanup operations that can run asynchronously.

pkg/subscriber/on_demand_subscriber.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,9 @@ func (s *onDemandSubscriber) createPubSubClient(ctx context.Context) error {
233233
}
234234
}
235235

236-
// Create the PubSub client using the factory
237-
pubsubClient, err := factory.NewClient(ctx, factoryOpts...)
236+
// Create the PubSub client using the factory with context.Background()
237+
// to ensure connections remain healthy until explicitly stopped during graceful shutdown
238+
pubsubClient, err := factory.NewClient(context.Background(), factoryOpts...)
238239
if err != nil {
239240
s.logger.Error("Failed to create pubsub client",
240241
zap.Error(err),

pkg/subscriber/subscriber.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,9 @@ func (s pubSubSubscriber) createPuller(
201201
}
202202
}
203203

204-
// Create the PubSub client using the factory
205-
pubsubClient, err = factory.NewClient(ctx, factoryOpts...)
204+
// Create the PubSub client using the factory with context.Background()
205+
// to ensure connections remain healthy until explicitly stopped during graceful shutdown
206+
pubsubClient, err = factory.NewClient(context.Background(), factoryOpts...)
206207
if err != nil {
207208
s.logger.Error("Failed to create pubsub client",
208209
zap.Error(err),

pkg/web/cmd/server/server.go

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -404,14 +404,14 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
404404
registerer := metrics.DefaultRegisterer()
405405

406406
// dataWarehouse config
407-
dataWarehouseConfig, err := s.readDataWarehouseConfig(ctx, logger)
407+
dataWarehouseConfig, err := s.readDataWarehouseConfig(logger)
408408
if err != nil {
409409
logger.Error("Failed to read dataWarehouse config", zap.Error(err))
410410
return err
411411
}
412412

413413
// oauth config
414-
oAuthConfig, err := s.readOAuthConfig(ctx, logger)
414+
oAuthConfig, err := s.readOAuthConfig(logger)
415415
if err != nil {
416416
logger.Error("Failed to read OAuth config", zap.Error(err))
417417
return err
@@ -427,16 +427,17 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
427427
if err != nil {
428428
return err
429429
}
430-
// healthCheckService
431-
// Use a dedicated context so we can stop the health checker goroutine cleanly during shutdown
432-
healthCheckCtx, healthCheckCancel := context.WithCancel(ctx)
433-
defer healthCheckCancel() // Ensure cleanup on all paths (including early returns)
434430

435431
restHealthChecker := health.NewRestChecker(
436432
"", "",
437433
health.WithTimeout(healthCheckTimeout),
438434
health.WithCheck("metrics", metrics.Check),
439435
)
436+
437+
// Use a dedicated context so we can stop the health checker goroutine cleanly during shutdown
438+
healthCheckCtx, healthCheckCancel := context.WithCancel(context.Background())
439+
defer healthCheckCancel()
440+
440441
go restHealthChecker.Run(healthCheckCtx)
441442
// healthcheckService
442443
healthcheckServer := rest.NewServer(
@@ -500,13 +501,16 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
500501
if err != nil {
501502
return err
502503
}
504+
505+
pubsubCtx, pubsubCancel := context.WithCancel(context.Background())
506+
defer pubsubCancel()
503507
// domainTopicPublisher
504-
domainTopicPublisher, err := s.createPublisher(ctx, *s.domainTopic, registerer, logger)
508+
domainTopicPublisher, err := s.createPublisher(pubsubCtx, *s.domainTopic, registerer, logger)
505509
if err != nil {
506510
return err
507511
}
508512
// segmentUsersPublisher
509-
segmentUsersPublisher, err := s.createPublisher(ctx, *s.bulkSegmentUsersReceivedTopic, registerer, logger)
513+
segmentUsersPublisher, err := s.createPublisher(pubsubCtx, *s.bulkSegmentUsersReceivedTopic, registerer, logger)
510514
if err != nil {
511515
return err
512516
}
@@ -722,7 +726,6 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
722726

723727
// featureService
724728
featureService, err := s.createFeatureService(
725-
ctx,
726729
accountClient,
727730
experimentClient,
728731
autoOpsClient,
@@ -861,18 +864,17 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
861864
shutdownStartTime := time.Now()
862865
logger.Info("Starting graceful shutdown sequence")
863866

864-
// Cancel the health checker goroutines to prevent connection errors during shutdown
865-
healthCheckCancel()
866-
// Mark as unhealthy so readiness probes fail
867-
// This ensures Kubernetes readiness probe fails on next check,
868-
// preventing new traffic from being routed to this pod.
869-
restHealthChecker.Stop()
870-
871867
// Wait for K8s endpoint propagation
872868
// This prevents "context deadline exceeded" errors during high traffic.
873869
time.Sleep(propagationDelay)
874870
logger.Info("Starting HTTP/gRPC server shutdown")
875871

872+
// Mark as unhealthy so readiness probes fail
873+
// This ensures Kubernetes readiness probe fails on next check,
874+
// preventing new traffic from being routed to this pod.
875+
healthcheckServer.Stop(5 * time.Second)
876+
restHealthChecker.Stop()
877+
876878
// Stop REST servers in parallel (these call gRPC servers internally)
877879
// Stop these first to drain REST traffic before stopping gRPC
878880
var restWg sync.WaitGroup
@@ -993,9 +995,6 @@ func (s *server) createPublisher(
993995
registerer metrics.Registerer,
994996
logger *zap.Logger,
995997
) (publisher.Publisher, error) {
996-
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
997-
defer cancel()
998-
999998
// Create PubSub client using the factory
1000999
pubSubType := factory.PubSubType(*s.pubSubType)
10011000
factoryOpts := []factory.Option{
@@ -1035,7 +1034,6 @@ func (s *server) createPublisher(
10351034
}
10361035

10371036
func (s *server) readOAuthConfig(
1038-
ctx context.Context,
10391037
logger *zap.Logger,
10401038
) (*auth.OAuthConfig, error) {
10411039
bytes, err := os.ReadFile(*s.oauthConfigPath)
@@ -1129,7 +1127,6 @@ func (s *server) createEnvironmentService(
11291127
}
11301128

11311129
func (s *server) createFeatureService(
1132-
ctx context.Context,
11331130
accountClient accountclient.Client,
11341131
experimentClient experimentclient.Client,
11351132
autoOpsClient autoopsclient.Client,
@@ -1218,7 +1215,6 @@ func (s *server) createGatewayHandlers() []gatewayapi.HandlerRegistrar {
12181215
}
12191216

12201217
func (s *server) readDataWarehouseConfig(
1221-
ctx context.Context,
12221218
logger *zap.Logger,
12231219
) (*DataWarehouseConfig, error) {
12241220
// If config path is provided, read from file

0 commit comments

Comments
 (0)