Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions manifests/bucketeer/charts/batch/templates/cronjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ spec:
spec:
backoffLimit: 0
template:
metadata:
labels:
app: {{ template "batch-server.name" $ }}
job-type: cronjob
Comment on lines +26 to +27
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented it to report the logs as batch-server for easy tracking on the GCP console.

spec:
volumes:
- name: service-cert-secret
Expand All @@ -43,18 +47,34 @@ spec:
env:
- name: WEB_GATEWAY_ADDRESS
value: "{{ $.Values.cronjob.webGatewayAddress }}"
- name: JOB_NAME
value: "{{ .name }}"
- name: JOB_ID
value: "{{ .jobId }}"
command:
- /bin/sh
args:
- -c
- |
echo "Start {{ .name }} job."
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the start log and implemented the elapsed time to be reported in the success log.

START_TIME=$(date +%s)
ENDPOINT="${WEB_GATEWAY_ADDRESS}/bucketeer.batch.BatchService/ExecuteBatchJob"
TOKEN=`cat /usr/local/service-token/token`
RES=`curl -X POST -m 3600 --cacert /usr/local/certs/service/tls.crt -d '{"job": "{{ .jobId }}"}' -H "authorization: bearer ${TOKEN}" -H "Content-Type: application/json" -s -o /dev/null -w '%{http_code}\\n' ${ENDPOINT}`
echo "{{ .name }} job result: ${RES}"
if [ "$RES" = 200 ] || [ "$RES" = 503 ] || [ "$RES" = 000 ]
then
RESPONSE=$(mktemp)
RES=$(curl -X POST -m 3600 --cacert /usr/local/certs/service/tls.crt -d '{"job": "'${JOB_ID}'"}' -H "authorization: bearer ${TOKEN}" -H "Content-Type: application/json" -s -o ${RESPONSE} -w '%{http_code}' ${ENDPOINT})
BODY=$(cat ${RESPONSE})
rm -f ${RESPONSE}
END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))
# API returns empty {} on success, or error details on failure
if [ "${RES}" = "200" ]; then
# Success - use DEBUG to avoid verbose logs
echo '{"severity":"DEBUG","message":"'${JOB_NAME}' job completed successfully","job":"'${JOB_NAME}'","jobId":"'${JOB_ID}'","statusCode":'${RES}',"durationSeconds":'${DURATION}'}'
else
# Failure - log as ERROR with response details for debugging
BODY_ESCAPED=$(echo "${BODY}" | sed 's/\\/\\\\/g; s/"/\\"/g' | tr '\n\r\t' ' ')
echo '{"severity":"ERROR","message":"'${JOB_NAME}' job failed","job":"'${JOB_NAME}'","jobId":"'${JOB_ID}'","statusCode":'${RES}',"durationSeconds":'${DURATION}',"responseBody":"'${BODY_ESCAPED}'"}'
Comment on lines +71 to +75
Copy link
Member Author

@cre8ivejp cre8ivejp Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debug logs were always reported as INFO level to GCP, so I formatted them using JSON format.

fi
if [ "${RES}" = "200" ] || [ "${RES}" = "503" ] || [ "${RES}" = "000" ]; then
exit 0
else
exit 1
Expand Down
3 changes: 0 additions & 3 deletions pkg/api/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,6 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
// Wait for K8s endpoint propagation
// This prevents "context deadline exceeded" errors during high traffic.
time.Sleep(propagationDelay)
logger.Info("Starting HTTP/gRPC server shutdown")

// Mark as unhealthy so readiness probes fail
// This ensures Kubernetes readiness probe fails on next check,
Expand Down Expand Up @@ -600,11 +599,9 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L

// Wait for HTTP/REST traffic to fully drain
wg.Wait()
logger.Info("gRPC-gateway and HTTP server shutdown completed")

// Now it's safe to stop the gRPC server (no more HTTP→gRPC calls)
server.Stop(grpcStopTimeout)
logger.Info("gRPC server shutdown completed")

// Close clients
// These are fast cleanup operations that can run asynchronously.
Expand Down
3 changes: 0 additions & 3 deletions pkg/batch/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,6 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
// Wait for K8s endpoint propagation
// This prevents "context deadline exceeded" errors during high traffic.
time.Sleep(propagationDelay)
logger.Info("Starting HTTP/gRPC server shutdown")

// Mark as unhealthy so readiness probes fail
// This ensures Kubernetes readiness probe fails on next check,
Expand All @@ -645,11 +644,9 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L

// Gracefully stop gRPC Gateway (calls the gRPC server internally)
batchGateway.Stop(serverShutDownTimeout)
logger.Info("gRPC-gateway server shutdown completed")

// Stop gRPC server (only pure gRPC connections remain)
server.Stop(grpcStopTimeout)
logger.Info("gRPC server shutdown completed")

// Close clients
// These are fast cleanup operations that can run asynchronously.
Expand Down
2 changes: 1 addition & 1 deletion pkg/batch/jobs/calculator/experiment_calculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (e *experimentCalculate) runCalculation() {
return
}
if experiments == nil {
e.logger.Info("There are no experiments for calculation in the specified environment",
e.logger.Debug("There are no experiments for calculation in the specified environment",
log.FieldsFromIncomingContext(ctxWithTimeout).AddFields(
zap.String("environmentId", env.Id),
)...,
Expand Down
2 changes: 1 addition & 1 deletion pkg/batch/jobs/opsevent/progressove_rollout_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (w *progressiveRolloutWatcher) executeProgressiveRollout(
now := time.Now().Unix()
for _, s := range schedules {
if s.TriggeredAt == 0 && s.ExecuteAt <= now {
w.logger.Info("scheduled time is passed",
w.logger.Debug("scheduled time is passed",
zap.String("environmentId", environmentId),
zap.String("featureId", progressiveRollout.FeatureId),
zap.String("progressiveRolloutId", progressiveRollout.Id),
Expand Down
11 changes: 5 additions & 6 deletions pkg/batch/jobs/rediscounter/redis_counter_deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (r *redisCounterDeleter) deleteKeysByKind(environmentId, kind string) (int,
return 0, err
}
if len(keys) == 0 {
r.logger.Info("No keys was found",
r.logger.Debug("No keys was found",
zap.String("environmentId", environmentId),
zap.String("kind", kind),
)
Expand All @@ -141,15 +141,14 @@ func (r *redisCounterDeleter) deleteKeysByKind(environmentId, kind string) (int,
if err != nil {
return 0, err
}
r.logger.Info("Filtered keys older than 31 days",
r.logger.Debug("Filtered keys older than 31 days",
zap.String("environmentId", environmentId),
zap.String("kind", kind),
zap.Int("filteredKeysSize", len(filteredKeys)),
)
// To avoid blocking Redis for too much time while deleting all the keys
// we split the keys in chunks
chunks := r.chunkSlice(filteredKeys, redisChunkMaxSize)
r.logger.Info("Chunked the filtered keys", zap.Int("chunkSize", len(chunks)))
deletedKeys := 0
for _, chunk := range chunks {
if err := r.deleteKeys(chunk); err != nil {
Expand All @@ -163,7 +162,7 @@ func (r *redisCounterDeleter) deleteKeysByKind(environmentId, kind string) (int,
return deletedKeys, err
}
deletedKeys += len(chunk)
r.logger.Info("Chunk deleted successfully", zap.Strings("keys", chunk))
r.logger.Debug("Chunk deleted successfully", zap.Strings("keys", chunk))
}
return deletedKeys, nil
}
Expand All @@ -174,7 +173,7 @@ func (r *redisCounterDeleter) newKeyPrefix(environmentId, kind string) string {
}

func (r *redisCounterDeleter) scan(environmentId, kind, key string) ([]string, error) {
r.logger.Info("Starting scan keys from Redis",
r.logger.Debug("Starting scan keys from Redis",
zap.String("environmentId", environmentId),
zap.String("kind", kind),
)
Expand All @@ -196,7 +195,7 @@ func (r *redisCounterDeleter) scan(environmentId, kind, key string) ([]string, e
if err != nil {
return nil, err
}
r.logger.Info("Finished scanning keys from Redis",
r.logger.Debug("Finished scanning keys from Redis",
zap.String("environmentId", environmentId),
zap.String("kind", kind),
zap.Duration("elapsedTime", time.Since(startTime)),
Expand Down
2 changes: 0 additions & 2 deletions pkg/subscriber/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,10 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
// This ensures Kubernetes readiness probe fails on next check,
// preventing new traffic from being routed to this pod.
restHealthChecker.Stop()
logger.Info("Health check marked as unhealthy (readiness will fail)")

// Stop PubSub subscription
// This stops receiving new messages and allows in-flight messages to be processed.
multiPubSub.Stop()
logger.Info("PubSub subscription stopped, all messages processed")

// Close clients
// These are fast cleanup operations that can run asynchronously.
Expand Down
2 changes: 0 additions & 2 deletions pkg/web/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,6 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
// Wait for K8s endpoint propagation
// This prevents "context deadline exceeded" errors during high traffic.
time.Sleep(propagationDelay)
logger.Info("Starting HTTP/gRPC server shutdown")

// Mark as unhealthy so readiness probes fail
// This ensures Kubernetes readiness probe fails on next check,
Expand All @@ -891,7 +890,6 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L

// Wait for REST traffic to drain
restWg.Wait()
logger.Info("REST servers shutdown completed")

// Gracefully stop all gRPC servers in parallel
// Now safe to stop since REST traffic has drained
Expand Down
Loading