From c97465073cc3f95848650515fefb89951749c9d0 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Mon, 9 Jun 2025 16:52:16 -0700 Subject: [PATCH 1/3] [ENGMP-216] add oom cooldown to block rollout Signed-off-by: Yi Jin --- cmd/rollout-operator/main.go | 4 +++- pkg/controller/controller.go | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/cmd/rollout-operator/main.go b/cmd/rollout-operator/main.go index 913e12485..61fb10211 100644 --- a/cmd/rollout-operator/main.go +++ b/cmd/rollout-operator/main.go @@ -46,6 +46,7 @@ type config struct { kubeConfigFile string kubeNamespace string reconcileInterval time.Duration + oomCooldown time.Duration serverTLSEnabled bool serverTLSPort int @@ -71,6 +72,7 @@ func (cfg *config) register(fs *flag.FlagSet) { fs.StringVar(&cfg.kubeConfigFile, "kubernetes.config-file", "", "The Kubernetes config file path. If not specified, it will be auto-detected when running within a Kubernetes cluster.") fs.StringVar(&cfg.kubeNamespace, "kubernetes.namespace", "", "The Kubernetes namespace for which this operator is running.") fs.DurationVar(&cfg.reconcileInterval, "reconcile.interval", 5*time.Second, "The minimum interval of reconciliation.") + fs.DurationVar(&cfg.oomCooldown, "oom.cooldown", 0*time.Minute, "If pods oom killed within cooldown duration, then don't proceed the rollout, 0 means disabled.") fs.BoolVar(&cfg.serverTLSEnabled, "server-tls.enabled", false, "Enable TLS server for webhook connections.") fs.IntVar(&cfg.serverTLSPort, "server-tls.port", 8443, "Port to use for exposing TLS server for webhook connections (if enabled).") @@ -171,7 +173,7 @@ func main() { maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics) // Init the controller. - c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger) + c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.oomCooldown, reg, logger) check(errors.Wrap(c.Init(), "failed to init controller")) // Listen to sigterm, as well as for restart (like for certificate renewal). diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 0ad84bf15..c9a2881e7 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -51,6 +51,7 @@ type RolloutController struct { kubeClient kubernetes.Interface namespace string reconcileInterval time.Duration + oomCooldown time.Duration statefulSetsFactory informers.SharedInformerFactory statefulSetLister listersv1.StatefulSetLister statefulSetsInformer cache.SharedIndexInformer @@ -80,6 +81,7 @@ type RolloutController struct { removeLastAppliedReplicasEmptyTotal *prometheus.CounterVec removeLastAppliedReplicasErrorTotal *prometheus.CounterVec lastAppliedReplicasRemovedTotal *prometheus.CounterVec + oomDetectedTotal *prometheus.CounterVec downscaleState *prometheus.GaugeVec // Keep track of discovered rollout groups. We use this information to delete metrics @@ -87,7 +89,7 @@ type RolloutController struct { discoveredGroups map[string]struct{} } -func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { +func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, oomCooldown time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { namespaceOpt := informers.WithNamespace(namespace) // Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones @@ -106,6 +108,7 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM kubeClient: kubeClient, namespace: namespace, reconcileInterval: reconcileInterval, + oomCooldown: oomCooldown, statefulSetsFactory: statefulSetsFactory, statefulSetLister: statefulSetsInformer.Lister(), statefulSetsInformer: statefulSetsInformer.Informer(), @@ -160,6 +163,10 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM Name: "rollout_operator_last_applied_replicas_removed_total", Help: "Total number of .spec.replicas fields removed from last-applied-configuration annotation.", }, []string{"statefulset_name"}), + oomDetectedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_oom_detected_total", + Help: "Total number of OOM killed pods detected in a StatefulSet.", + }, []string{"statefulset_name"}), downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "rollout_operator_downscale_state", Help: "State of the downscale operation.", @@ -481,6 +488,12 @@ func (c *RolloutController) hasStatefulSetNotReadyPods(sts *v1.StatefulSet) (boo return true, nil } + if hasRecentlyOOMKilled(c.oomCooldown, pods) { + level.Warn(c.logger).Log("msg", "OOM killed pods detected", "statefulset", sts.Name, "oomCooldown", c.oomCooldown) + c.oomDetectedTotal.WithLabelValues(sts.Name).Inc() + return true, nil + } + // The number of ready replicas reported by the StatefulSet matches the total number of // replicas. However, there's still no guarantee that all pods are running. For example, // a terminating pod (which we don't consider "ready") may have not yet failed the @@ -529,6 +542,27 @@ func notRunningAndReady(pods []*corev1.Pod) []*corev1.Pod { return notReady } +func hasRecentlyOOMKilled(cooldown time.Duration, pods []*corev1.Pod) bool { + if cooldown == 0 { + // feature is disabled + return false + } + + // if oom kill happens within cooldown period, then return true else ignore old oom kills + oomCooldownTime := time.Now().Add(-cooldown) + for _, pod := range pods { + for _, cs := range pod.Status.ContainerStatuses { + term := cs.LastTerminationState.Terminated + if cs.RestartCount > 0 && term != nil { + if term.ExitCode == 137 && term.FinishedAt.Time.After(oomCooldownTime) { + return true + } + } + } + } + return false +} + // listPods returns pods matching the provided labels selector. Please remember to call // DeepCopy() on the returned pods before doing any change. func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) { From 684c5e4ec3581337d4ea8d3b62c2294cf249525d Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Tue, 10 Jun 2025 16:49:34 -0700 Subject: [PATCH 2/3] adding unit tests Signed-off-by: Yi Jin --- pkg/controller/controller.go | 12 ++--- pkg/controller/controller_test.go | 88 +++++++++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 10 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index c9a2881e7..333c401ce 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -488,8 +488,8 @@ func (c *RolloutController) hasStatefulSetNotReadyPods(sts *v1.StatefulSet) (boo return true, nil } - if hasRecentlyOOMKilled(c.oomCooldown, pods) { - level.Warn(c.logger).Log("msg", "OOM killed pods detected", "statefulset", sts.Name, "oomCooldown", c.oomCooldown) + if oomKilledPod := hasRecentlyOOMKilled(c.oomCooldown, pods); oomKilledPod != nil { + level.Warn(c.logger).Log("msg", "OOM killed pods detected", "statefulset", sts.Name, "oomCooldown", c.oomCooldown, "pod", oomKilledPod.Name) c.oomDetectedTotal.WithLabelValues(sts.Name).Inc() return true, nil } @@ -542,10 +542,10 @@ func notRunningAndReady(pods []*corev1.Pod) []*corev1.Pod { return notReady } -func hasRecentlyOOMKilled(cooldown time.Duration, pods []*corev1.Pod) bool { +func hasRecentlyOOMKilled(cooldown time.Duration, pods []*corev1.Pod) *corev1.Pod { if cooldown == 0 { // feature is disabled - return false + return nil } // if oom kill happens within cooldown period, then return true else ignore old oom kills @@ -555,12 +555,12 @@ func hasRecentlyOOMKilled(cooldown time.Duration, pods []*corev1.Pod) bool { term := cs.LastTerminationState.Terminated if cs.RestartCount > 0 && term != nil { if term.ExitCode == 137 && term.FinishedAt.Time.After(oomCooldownTime) { - return true + return pod } } } } - return false + return nil } // listPods returns pods matching the provided labels selector. Please remember to call diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 05b798aa4..7b0b7c655 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -42,6 +42,11 @@ const ( testLastRevisionHash = "last-hash" ) +// uncomment the line below to enable debug logging +var logger = log.NewNopLogger() + +//var logger = log.NewLogfmtLogger(os.Stdout) + func TestRolloutController_Reconcile(t *testing.T) { customResourceGVK := schema.GroupVersionKind{Group: "my.group", Version: "v1", Kind: "CustomResource"} @@ -50,6 +55,7 @@ func TestRolloutController_Reconcile(t *testing.T) { pods []runtime.Object customResourceScaleSpecReplicas int customResourceScaleStatusReplicas int + oomCooldown time.Duration kubePatchErr error kubeDeleteErr error kubeUpdateErr error @@ -567,6 +573,61 @@ func TestRolloutController_Reconcile(t *testing.T) { expectedPatchedSets: map[string][]string{"ingester-zone-d": {`{"spec":{"replicas":5}}`}}, expectedPatchedResources: map[string][]string{"my.group/v1/customresources/test/status": {`{"status":{"replicas":5}}`}}, }, + "should NOT rollout if pods were oom killed recently": { + oomCooldown: time.Hour, + statefulSets: []runtime.Object{ + mockStatefulSet("ingester-zone-a", withReplicas(2, 2)), + mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()), + }, + pods: []runtime.Object{ + mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash, withOomKill(10*time.Minute)), + mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-0", testPrevRevisionHash), + mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash), + }, + }, + "should rollout if pods were oom killed long time ago": { + oomCooldown: time.Hour, + statefulSets: []runtime.Object{ + mockStatefulSet("ingester-zone-a", withReplicas(2, 2)), + mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()), + }, + pods: []runtime.Object{ + mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash, withOomKill(2*time.Hour)), + mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-0", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash), + }, + expectedDeletedPods: []string{"ingester-zone-b-1"}, + }, + "should rollout if pods were oom killed for the same statefulset": { + oomCooldown: time.Hour, + statefulSets: []runtime.Object{ + mockStatefulSet("ingester-zone-a", withReplicas(2, 2)), + mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()), + }, + pods: []runtime.Object{ + mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-0", testLastRevisionHash, withOomKill(2*time.Minute)), + mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash), + }, + expectedDeletedPods: []string{"ingester-zone-b-1"}, + }, + "should rollout if to update pods were oom killed": { + oomCooldown: time.Hour, + statefulSets: []runtime.Object{ + mockStatefulSet("ingester-zone-a", withReplicas(2, 2)), + mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()), + }, + pods: []runtime.Object{ + mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-0", testPrevRevisionHash, withOomKill(2*time.Minute)), + mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash), + }, + expectedDeletedPods: []string{"ingester-zone-b-0", "ingester-zone-b-1"}, + }, } for testName, testData := range tests { @@ -618,7 +679,7 @@ func TestRolloutController_Reconcile(t *testing.T) { // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, time.Hour, reg, logger) require.NoError(t, c.Init()) defer c.Stop() @@ -855,7 +916,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleBoolean(t *testing.T // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, 0, reg, logger) require.NoError(t, c.Init()) defer c.Stop() @@ -1128,7 +1189,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T) // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, 0, reg, logger) require.NoError(t, c.Init()) defer c.Stop() @@ -1231,7 +1292,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, 0, reg, logger) require.NoError(t, c.Init()) defer c.Stop() @@ -1395,6 +1456,25 @@ func withAnnotations(annotations map[string]string) func(sts *v1.StatefulSet) { } } +func withOomKill(recency time.Duration) func(pod *corev1.Pod) { + return func(pod *corev1.Pod) { + pod.Status.ContainerStatuses = []corev1.ContainerStatus{ + { + Ready: true, + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}, + RestartCount: 1, + LastTerminationState: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 137, // 137 is the exit code for OOMKilled. + Reason: "OOMKilled", + FinishedAt: metav1.NewTime(time.Now().Add(-recency)), + }, + }, + }, + } + } +} + func withMirrorReplicasAnnotations(name string, customResourceGVK schema.GroupVersionKind) func(sts *v1.StatefulSet) { return withAnnotations(map[string]string{ "grafana.com/rollout-mirror-replicas-from-resource-name": name, From bc93beb7fafb25de172b56aacf86ee14363a3f63 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Wed, 11 Jun 2025 11:46:40 -0700 Subject: [PATCH 3/3] make oom kill code constant Signed-off-by: Yi Jin --- pkg/controller/controller.go | 6 +++++- pkg/controller/controller_test.go | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 333c401ce..4488af943 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -41,6 +41,10 @@ const ( // the operator reconciles even if no changes are made to the watched resources. informerSyncInterval = 5 * time.Minute lastAppConfAnnKey = "kubectl.kubernetes.io/last-applied-configuration" + // OOMExitCode 137 is the exit code for OOM killed processes in Linux (128 + 9 for SIGNAL_KILL). + // see https://tldp.org/LDP/abs/html/exitcodes.html + // or https://discuss.kubernetes.io/t/how-can-we-tell-if-the-oomkilled-in-k8s-is-because-the-node-is-running-out-of-memory-and-thus-killing-the-pod-or-if-the-pod-itself-is-being-killed-because-the-memory-it-has-requested-exceeds-the-limt-declaration-limit/26303 + OOMExitCode = 137 ) type httpClient interface { @@ -554,7 +558,7 @@ func hasRecentlyOOMKilled(cooldown time.Duration, pods []*corev1.Pod) *corev1.Po for _, cs := range pod.Status.ContainerStatuses { term := cs.LastTerminationState.Terminated if cs.RestartCount > 0 && term != nil { - if term.ExitCode == 137 && term.FinishedAt.Time.After(oomCooldownTime) { + if term.ExitCode == OOMExitCode && term.StartedAt.Time.After(oomCooldownTime) { return pod } } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 7b0b7c655..393a4e274 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -1465,9 +1465,9 @@ func withOomKill(recency time.Duration) func(pod *corev1.Pod) { RestartCount: 1, LastTerminationState: corev1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ - ExitCode: 137, // 137 is the exit code for OOMKilled. - Reason: "OOMKilled", - FinishedAt: metav1.NewTime(time.Now().Add(-recency)), + ExitCode: OOMExitCode, + Reason: "OOMKilled", + StartedAt: metav1.NewTime(time.Now().Add(-recency)), }, }, },