Skip to content

Commit f6d8894

Browse files
committed
chore: change api agteway server context
Signed-off-by: Alessandro Yuichi Okimoto <yuichijpn@gmail.com>
1 parent 1d75ac9 commit f6d8894

File tree

14 files changed

+162
-83
lines changed

14 files changed

+162
-83
lines changed

manifests/bucketeer/charts/api/templates/deployment.yaml

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,39 @@ spec:
185185
- -c
186186
- |
187187
admin_port={{ .Values.envoy.adminPort }}
188-
max_wait=60
188+
max_wait=45
189+
propagation_delay=15
190+
191+
sleep "$propagation_delay"
189192
190193
# Wait for active requests to drain
191194
for i in $(seq 1 "$max_wait"); do
192-
active=$(wget -q -O- "http://localhost:${admin_port}/stats" 2>/dev/null | grep "http.ingress_http.downstream_rq_active" | awk '{print $2}' || echo "0")
193-
[ -z "$active" ] && active=0
194-
[ "$active" -eq 0 ] && break
195+
# Fetch stats and check if request succeeded
196+
stats=$(wget -q -T 1 -O- "http://127.0.0.1:${admin_port}/stats" 2>/dev/null)
197+
if [ $? -ne 0 ] || [ -z "$stats" ]; then
198+
echo "Check $i/$max_wait: Failed to fetch stats, retrying..."
199+
sleep 1
200+
continue
201+
fi
202+
203+
# Extract active requests metric
204+
active=$(echo "$stats" | grep -E '^http\.ingress_http\.downstream_rq_active:' | awk '{print $2}')
205+
if [ -z "$active" ]; then
206+
echo "Check $i/$max_wait: Metric not found, retrying..."
207+
sleep 1
208+
continue
209+
fi
210+
211+
echo "Check $i/$max_wait: Active requests: $active"
212+
[ "$active" -eq 0 ] && echo "No active requests, exiting gracefully" && break
195213
sleep 1
196214
done
215+
216+
if [ -n "$active" ] && [ "$active" -eq 0 ]; then
217+
echo "Graceful shutdown completed successfully"
218+
else
219+
echo "Warning: Timed out after ${max_wait}s - active=${active:-unknown}"
220+
fi
197221
exit 0
198222
command: ["envoy"]
199223
args:

manifests/bucketeer/charts/api/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ terminationGracePeriodSeconds: 75
9999
health:
100100
startupProbe:
101101
periodSeconds: 3
102-
failureThreshold: 10
102+
failureThreshold: 20
103103
timeoutSeconds: 5
104104
livenessProbe:
105105
initialDelaySeconds: 30

manifests/bucketeer/charts/batch/templates/deployment.yaml

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,19 +223,39 @@ spec:
223223
- -c
224224
- |
225225
admin_port={{ .Values.envoy.adminPort }}
226-
max_wait=35
226+
max_wait=45
227227
propagation_delay=15
228228
229-
# Wait for GCLB to detect unhealthy status and stop routing
230229
sleep "$propagation_delay"
231230
232231
# Wait for active requests to drain
233232
for i in $(seq 1 "$max_wait"); do
234-
active=$(wget -q -O- "http://localhost:${admin_port}/stats" 2>/dev/null | grep "http.ingress_http.downstream_rq_active" | awk '{print $2}' || echo "0")
235-
[ -z "$active" ] && active=0
236-
[ "$active" -eq 0 ] && break
233+
# Fetch stats and check if request succeeded
234+
stats=$(wget -q -T 1 -O- "http://127.0.0.1:${admin_port}/stats" 2>/dev/null)
235+
if [ $? -ne 0 ] || [ -z "$stats" ]; then
236+
echo "Check $i/$max_wait: Failed to fetch stats, retrying..."
237+
sleep 1
238+
continue
239+
fi
240+
241+
# Extract active requests metric
242+
active=$(echo "$stats" | grep -E '^http\.ingress_http\.downstream_rq_active:' | awk '{print $2}')
243+
if [ -z "$active" ]; then
244+
echo "Check $i/$max_wait: Metric not found, retrying..."
245+
sleep 1
246+
continue
247+
fi
248+
249+
echo "Check $i/$max_wait: Active requests: $active"
250+
[ "$active" -eq 0 ] && echo "No active requests, exiting gracefully" && break
237251
sleep 1
238252
done
253+
254+
if [ -n "$active" ] && [ "$active" -eq 0 ]; then
255+
echo "Graceful shutdown completed successfully"
256+
else
257+
echo "Warning: Timed out after ${max_wait}s - active=${active:-unknown}"
258+
fi
239259
exit 0
240260
command: ["envoy"]
241261
args:

manifests/bucketeer/charts/batch/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ gcpMultiCluster:
126126
health:
127127
startupProbe:
128128
periodSeconds: 3
129-
failureThreshold: 10
129+
failureThreshold: 30
130130
timeoutSeconds: 5
131131
livenessProbe:
132132
initialDelaySeconds: 30

manifests/bucketeer/charts/subscriber/templates/deployment.yaml

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,19 +203,39 @@ spec:
203203
- -c
204204
- |
205205
admin_port={{ .Values.envoy.adminPort }}
206-
max_wait=35
206+
max_wait=45
207207
propagation_delay=15
208208
209-
# Wait for GCLB to detect unhealthy status and stop routing
210209
sleep "$propagation_delay"
211210
212211
# Wait for active requests to drain
213212
for i in $(seq 1 "$max_wait"); do
214-
active=$(wget -q -O- "http://localhost:${admin_port}/stats" 2>/dev/null | grep "http.ingress_http.downstream_rq_active" | awk '{print $2}' || echo "0")
215-
[ -z "$active" ] && active=0
216-
[ "$active" -eq 0 ] && break
213+
# Fetch stats and check if request succeeded
214+
stats=$(wget -q -T 1 -O- "http://127.0.0.1:${admin_port}/stats" 2>/dev/null)
215+
if [ $? -ne 0 ] || [ -z "$stats" ]; then
216+
echo "Check $i/$max_wait: Failed to fetch stats, retrying..."
217+
sleep 1
218+
continue
219+
fi
220+
221+
# Extract active requests metric
222+
active=$(echo "$stats" | grep -E '^http\.ingress_http\.downstream_rq_active:' | awk '{print $2}')
223+
if [ -z "$active" ]; then
224+
echo "Check $i/$max_wait: Metric not found, retrying..."
225+
sleep 1
226+
continue
227+
fi
228+
229+
echo "Check $i/$max_wait: Active requests: $active"
230+
[ "$active" -eq 0 ] && echo "No active requests, exiting gracefully" && break
217231
sleep 1
218232
done
233+
234+
if [ -n "$active" ] && [ "$active" -eq 0 ]; then
235+
echo "Graceful shutdown completed successfully"
236+
else
237+
echo "Warning: Timed out after ${max_wait}s - active=${active:-unknown}"
238+
fi
219239
exit 0
220240
command: ["envoy"]
221241
args:

manifests/bucketeer/charts/subscriber/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ terminationGracePeriodSeconds: 75
9696
health:
9797
startupProbe:
9898
periodSeconds: 3
99-
failureThreshold: 10
99+
failureThreshold: 20
100100
timeoutSeconds: 5
101101
livenessProbe:
102102
initialDelaySeconds: 30

manifests/bucketeer/charts/web/templates/deployment.yaml

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,19 +273,39 @@ spec:
273273
- -c
274274
- |
275275
admin_port={{ .Values.envoy.adminPort }}
276-
max_wait=35
276+
max_wait=45
277277
propagation_delay=15
278278
279-
# Wait for GCLB to detect unhealthy status and stop routing
280279
sleep "$propagation_delay"
281280
282281
# Wait for active requests to drain
283282
for i in $(seq 1 "$max_wait"); do
284-
active=$(wget -q -O- "http://localhost:${admin_port}/stats" 2>/dev/null | grep "http.ingress_http.downstream_rq_active" | awk '{print $2}' || echo "0")
285-
[ -z "$active" ] && active=0
286-
[ "$active" -eq 0 ] && break
283+
# Fetch stats and check if request succeeded
284+
stats=$(wget -q -T 1 -O- "http://127.0.0.1:${admin_port}/stats" 2>/dev/null)
285+
if [ $? -ne 0 ] || [ -z "$stats" ]; then
286+
echo "Check $i/$max_wait: Failed to fetch stats, retrying..."
287+
sleep 1
288+
continue
289+
fi
290+
291+
# Extract active requests metric
292+
active=$(echo "$stats" | grep -E '^http\.ingress_http\.downstream_rq_active:' | awk '{print $2}')
293+
if [ -z "$active" ]; then
294+
echo "Check $i/$max_wait: Metric not found, retrying..."
295+
sleep 1
296+
continue
297+
fi
298+
299+
echo "Check $i/$max_wait: Active requests: $active"
300+
[ "$active" -eq 0 ] && echo "No active requests, exiting gracefully" && break
287301
sleep 1
288302
done
303+
304+
if [ -n "$active" ] && [ "$active" -eq 0 ]; then
305+
echo "Graceful shutdown completed successfully"
306+
else
307+
echo "Warning: Timed out after ${max_wait}s - active=${active:-unknown}"
308+
fi
289309
exit 0
290310
command: ["envoy"]
291311
args:

manifests/bucketeer/charts/web/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ service:
131131
health:
132132
startupProbe:
133133
periodSeconds: 3
134-
failureThreshold: 10
134+
failureThreshold: 20
135135
timeoutSeconds: 5
136136
livenessProbe:
137137
initialDelaySeconds: 30

pkg/api/cmd/server.go

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,6 @@ func RegisterCommand(r cli.CommandRegistry, p cli.ParentCommand) cli.Command {
230230
func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.Logger) error {
231231
registerer := metrics.DefaultRegisterer()
232232

233-
pubsubCtx, pubsubCancel := context.WithTimeout(ctx, 5*time.Second)
234-
defer pubsubCancel()
235-
236233
// Create PubSub client using the factory
237234
pubSubType := factory.PubSubType(*s.pubSubType)
238235
factoryOpts := []factory.Option{
@@ -261,6 +258,8 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
261258
factoryOpts = append(factoryOpts, factory.WithPartitionCount(*s.pubSubRedisPartitionCount))
262259
}
263260

261+
pubsubCtx, pubsubCancel := context.WithCancel(context.Background())
262+
defer pubsubCancel()
264263
pubsubClient, err := factory.NewClient(pubsubCtx, factoryOpts...)
265264
if err != nil {
266265
return err
@@ -388,7 +387,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
388387
if err != nil {
389388
return err
390389
}
391-
defer auditLogClient.Close()
390+
defer autoOpsClient.Close()
392391

393392
tagClient, err := tagclient.NewClient(*s.tagService, *s.certPath,
394393
client.WithPerRPCCredentials(creds),
@@ -502,8 +501,8 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
502501
// We don't check the Redis health status because if the check fails,
503502
// the Kubernetes will restart the container and it might cause internal errors.
504503
// Use a dedicated context so we can stop the health checker goroutine cleanly during shutdown
505-
healthCheckCtx, healthCheckCancel := context.WithCancel(ctx)
506-
defer healthCheckCancel() // Ensure cleanup on all paths (including early returns)
504+
healthCheckCtx, healthCheckCancel := context.WithCancel(context.Background())
505+
defer healthCheckCancel()
507506

508507
healthChecker := health.NewGrpcChecker(
509508
health.WithTimeout(5*time.Second),
@@ -545,7 +544,9 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
545544
return fmt.Errorf("failed to create API gateway: %v", err)
546545
}
547546

548-
if err := apiGateway.Start(ctx, gatewayHandler); err != nil {
547+
serverCtx, serverCtxCancel := context.WithCancel(context.Background())
548+
defer serverCtxCancel()
549+
if err := apiGateway.Start(serverCtx, gatewayHandler); err != nil {
549550
return fmt.Errorf("failed to start API gateway: %v", err)
550551
}
551552

@@ -584,27 +585,20 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
584585
shutdownStartTime := time.Now()
585586
logger.Info("Starting graceful shutdown sequence")
586587

587-
waitBeforeUnready := 10 * time.Second
588-
logger.Info("Waiting before marking unready",
589-
zap.Duration("wait_before_unready", waitBeforeUnready))
590-
time.Sleep(waitBeforeUnready)
588+
// Wait for K8s endpoint propagation
589+
// This prevents "context deadline exceeded" errors during high traffic.
590+
time.Sleep(propagationDelay)
591+
logger.Info("Starting HTTP/gRPC server shutdown")
591592

592-
// Cancel the health checker goroutines to prevent connection errors during shutdown
593-
healthCheckCancel()
594593
// Mark as unhealthy so readiness probes fail
595594
// This ensures Kubernetes readiness probe fails on next check,
596595
// preventing new traffic from being routed to this pod.
597596
healthChecker.Stop()
598597
restHealthChecker.Stop()
599598

600-
// Wait for K8s endpoint propagation
601-
// This prevents "context deadline exceeded" errors during high traffic.
602-
time.Sleep(propagationDelay)
603-
logger.Info("Starting HTTP/gRPC server shutdown")
604-
605-
// CRITICAL: Shutdown order matters due to dependencies:
599+
// Shutdown order matters due to dependencies:
606600
// 1. apiGateway/httpServer make gRPC calls to the backend server
607-
// 2. We MUST drain them BEFORE stopping the backend
601+
// 2. We MUST drain them BEFORE stopping the backend sever
608602
// 3. Otherwise their handlers hang waiting for a dead backend
609603
// We run apiGateway and httpServer in parallel since they don't depend on each other
610604
var wg sync.WaitGroup
@@ -623,9 +617,11 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
623617

624618
// Wait for HTTP/REST traffic to fully drain
625619
wg.Wait()
620+
logger.Info("gRPC-gateway and HTTP server shutdown completed")
626621

627622
// Now it's safe to stop the gRPC server (no more HTTP→gRPC calls)
628623
server.Stop(grpcStopTimeout)
624+
logger.Info("gRPC server shutdown completed")
629625

630626
// Close clients
631627
// These are fast cleanup operations that can run asynchronously.

pkg/batch/cmd/server/server.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -584,8 +584,8 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
584584
)
585585

586586
// Use a dedicated context so we can stop the health checker goroutine cleanly during shutdown
587-
healthCheckCtx, healthCheckCancel := context.WithCancel(ctx)
588-
defer healthCheckCancel() // Ensure cleanup on all paths (including early returns)
587+
healthCheckCtx, healthCheckCancel := context.WithCancel(context.Background())
588+
defer healthCheckCancel()
589589

590590
healthChecker := health.NewGrpcChecker(
591591
health.WithTimeout(time.Second),
@@ -627,31 +627,33 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
627627
return fmt.Errorf("failed to create batch gateway: %v", err)
628628
}
629629

630-
if err := batchGateway.Start(ctx, batchHandler); err != nil {
630+
batchCtx, batchCancel := context.WithCancel(context.Background())
631+
defer batchCancel()
632+
if err := batchGateway.Start(batchCtx, batchHandler); err != nil {
631633
return fmt.Errorf("failed to start batch gateway: %v", err)
632634
}
633635

634636
defer func() {
635637
shutdownStartTime := time.Now()
636638
logger.Info("Starting graceful shutdown sequence")
637639

638-
// Cancel the health checker goroutines to prevent connection errors during shutdown
639-
healthCheckCancel()
640-
// Mark as unhealthy so readiness probes fail
641-
// This ensures Kubernetes readiness probe fails on next check,
642-
// preventing new traffic from being routed to this pod.
643-
healthChecker.Stop()
644-
645640
// Wait for K8s endpoint propagation
646641
// This prevents "context deadline exceeded" errors during high traffic.
647642
time.Sleep(propagationDelay)
648643
logger.Info("Starting HTTP/gRPC server shutdown")
649644

650-
// Gracefully stop REST gateway (calls the gRPC server internally)
645+
// Mark as unhealthy so readiness probes fail
646+
// This ensures Kubernetes readiness probe fails on next check,
647+
// preventing new traffic from being routed to this pod.
648+
healthChecker.Stop()
649+
650+
// Gracefully stop gRPC Gateway (calls the gRPC server internally)
651651
batchGateway.Stop(serverShutDownTimeout)
652+
logger.Info("gRPC-gateway server shutdown completed")
652653

653654
// Stop gRPC server (only pure gRPC connections remain)
654655
server.Stop(grpcStopTimeout)
656+
logger.Info("gRPC server shutdown completed")
655657

656658
// Close clients
657659
// These are fast cleanup operations that can run asynchronously.

0 commit comments

Comments
 (0)