Skip to content

[ENGMP-216] add oom cooldown to block rollout #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type config struct {
kubeConfigFile string
kubeNamespace string
reconcileInterval time.Duration
oomCooldown time.Duration

serverTLSEnabled bool
serverTLSPort int
Expand All @@ -71,6 +72,7 @@ func (cfg *config) register(fs *flag.FlagSet) {
fs.StringVar(&cfg.kubeConfigFile, "kubernetes.config-file", "", "The Kubernetes config file path. If not specified, it will be auto-detected when running within a Kubernetes cluster.")
fs.StringVar(&cfg.kubeNamespace, "kubernetes.namespace", "", "The Kubernetes namespace for which this operator is running.")
fs.DurationVar(&cfg.reconcileInterval, "reconcile.interval", 5*time.Second, "The minimum interval of reconciliation.")
fs.DurationVar(&cfg.oomCooldown, "oom.cooldown", 0*time.Minute, "If pods oom killed within cooldown duration, then don't proceed the rollout, 0 means disabled.")

fs.BoolVar(&cfg.serverTLSEnabled, "server-tls.enabled", false, "Enable TLS server for webhook connections.")
fs.IntVar(&cfg.serverTLSPort, "server-tls.port", 8443, "Port to use for exposing TLS server for webhook connections (if enabled).")
Expand Down Expand Up @@ -171,7 +173,7 @@ func main() {
maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics)

// Init the controller.
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger)
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.oomCooldown, reg, logger)
check(errors.Wrap(c.Init(), "failed to init controller"))

// Listen to sigterm, as well as for restart (like for certificate renewal).
Expand Down
40 changes: 39 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const (
// the operator reconciles even if no changes are made to the watched resources.
informerSyncInterval = 5 * time.Minute
lastAppConfAnnKey = "kubectl.kubernetes.io/last-applied-configuration"
// OOMExitCode 137 is the exit code for OOM killed processes in Linux (128 + 9 for SIGNAL_KILL).
// see https://tldp.org/LDP/abs/html/exitcodes.html
// or https://discuss.kubernetes.io/t/how-can-we-tell-if-the-oomkilled-in-k8s-is-because-the-node-is-running-out-of-memory-and-thus-killing-the-pod-or-if-the-pod-itself-is-being-killed-because-the-memory-it-has-requested-exceeds-the-limt-declaration-limit/26303
OOMExitCode = 137
)

type httpClient interface {
Expand All @@ -51,6 +55,7 @@ type RolloutController struct {
kubeClient kubernetes.Interface
namespace string
reconcileInterval time.Duration
oomCooldown time.Duration
statefulSetsFactory informers.SharedInformerFactory
statefulSetLister listersv1.StatefulSetLister
statefulSetsInformer cache.SharedIndexInformer
Expand Down Expand Up @@ -80,14 +85,15 @@ type RolloutController struct {
removeLastAppliedReplicasEmptyTotal *prometheus.CounterVec
removeLastAppliedReplicasErrorTotal *prometheus.CounterVec
lastAppliedReplicasRemovedTotal *prometheus.CounterVec
oomDetectedTotal *prometheus.CounterVec
downscaleState *prometheus.GaugeVec

// Keep track of discovered rollout groups. We use this information to delete metrics
// related to rollout groups that have been decommissioned.
discoveredGroups map[string]struct{}
}

func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, oomCooldown time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
namespaceOpt := informers.WithNamespace(namespace)

// Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones
Expand All @@ -106,6 +112,7 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
kubeClient: kubeClient,
namespace: namespace,
reconcileInterval: reconcileInterval,
oomCooldown: oomCooldown,
statefulSetsFactory: statefulSetsFactory,
statefulSetLister: statefulSetsInformer.Lister(),
statefulSetsInformer: statefulSetsInformer.Informer(),
Expand Down Expand Up @@ -160,6 +167,10 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
Name: "rollout_operator_last_applied_replicas_removed_total",
Help: "Total number of .spec.replicas fields removed from last-applied-configuration annotation.",
}, []string{"statefulset_name"}),
oomDetectedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_oom_detected_total",
Help: "Total number of OOM killed pods detected in a StatefulSet.",
}, []string{"statefulset_name"}),
downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "rollout_operator_downscale_state",
Help: "State of the downscale operation.",
Expand Down Expand Up @@ -481,6 +492,12 @@ func (c *RolloutController) hasStatefulSetNotReadyPods(sts *v1.StatefulSet) (boo
return true, nil
}

if oomKilledPod := hasRecentlyOOMKilled(c.oomCooldown, pods); oomKilledPod != nil {
level.Warn(c.logger).Log("msg", "OOM killed pods detected", "statefulset", sts.Name, "oomCooldown", c.oomCooldown, "pod", oomKilledPod.Name)
c.oomDetectedTotal.WithLabelValues(sts.Name).Inc()
return true, nil
}

// The number of ready replicas reported by the StatefulSet matches the total number of
// replicas. However, there's still no guarantee that all pods are running. For example,
// a terminating pod (which we don't consider "ready") may have not yet failed the
Expand Down Expand Up @@ -529,6 +546,27 @@ func notRunningAndReady(pods []*corev1.Pod) []*corev1.Pod {
return notReady
}

func hasRecentlyOOMKilled(cooldown time.Duration, pods []*corev1.Pod) *corev1.Pod {
if cooldown == 0 {
// feature is disabled
return nil
}

// if oom kill happens within cooldown period, then return true else ignore old oom kills
oomCooldownTime := time.Now().Add(-cooldown)
for _, pod := range pods {
for _, cs := range pod.Status.ContainerStatuses {
term := cs.LastTerminationState.Terminated
if cs.RestartCount > 0 && term != nil {
if term.ExitCode == OOMExitCode && term.StartedAt.Time.After(oomCooldownTime) {
return pod
}
}
}
}
return nil
}

// listPods returns pods matching the provided labels selector. Please remember to call
// DeepCopy() on the returned pods before doing any change.
func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) {
Expand Down
88 changes: 84 additions & 4 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ const (
testLastRevisionHash = "last-hash"
)

// uncomment the line below to enable debug logging
var logger = log.NewNopLogger()

//var logger = log.NewLogfmtLogger(os.Stdout)

func TestRolloutController_Reconcile(t *testing.T) {
customResourceGVK := schema.GroupVersionKind{Group: "my.group", Version: "v1", Kind: "CustomResource"}

Expand All @@ -50,6 +55,7 @@ func TestRolloutController_Reconcile(t *testing.T) {
pods []runtime.Object
customResourceScaleSpecReplicas int
customResourceScaleStatusReplicas int
oomCooldown time.Duration
kubePatchErr error
kubeDeleteErr error
kubeUpdateErr error
Expand Down Expand Up @@ -567,6 +573,61 @@ func TestRolloutController_Reconcile(t *testing.T) {
expectedPatchedSets: map[string][]string{"ingester-zone-d": {`{"spec":{"replicas":5}}`}},
expectedPatchedResources: map[string][]string{"my.group/v1/customresources/test/status": {`{"status":{"replicas":5}}`}},
},
"should NOT rollout if pods were oom killed recently": {
oomCooldown: time.Hour,
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-a", withReplicas(2, 2)),
mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()),
},
pods: []runtime.Object{
mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash, withOomKill(10*time.Minute)),
mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-b-0", testPrevRevisionHash),
mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash),
},
},
"should rollout if pods were oom killed long time ago": {
oomCooldown: time.Hour,
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-a", withReplicas(2, 2)),
mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()),
},
pods: []runtime.Object{
mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash, withOomKill(2*time.Hour)),
mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-b-0", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash),
},
expectedDeletedPods: []string{"ingester-zone-b-1"},
},
"should rollout if pods were oom killed for the same statefulset": {
oomCooldown: time.Hour,
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-a", withReplicas(2, 2)),
mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()),
},
pods: []runtime.Object{
mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-b-0", testLastRevisionHash, withOomKill(2*time.Minute)),
mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash),
},
expectedDeletedPods: []string{"ingester-zone-b-1"},
},
"should rollout if to update pods were oom killed": {
oomCooldown: time.Hour,
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-a", withReplicas(2, 2)),
mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()),
},
pods: []runtime.Object{
mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-b-0", testPrevRevisionHash, withOomKill(2*time.Minute)),
mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash),
},
expectedDeletedPods: []string{"ingester-zone-b-0", "ingester-zone-b-1"},
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -618,7 +679,7 @@ func TestRolloutController_Reconcile(t *testing.T) {

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, time.Hour, reg, logger)
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -855,7 +916,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleBoolean(t *testing.T

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, 0, reg, logger)
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -1128,7 +1189,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T)

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, 0, reg, logger)
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -1231,7 +1292,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, 0, reg, logger)
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -1395,6 +1456,25 @@ func withAnnotations(annotations map[string]string) func(sts *v1.StatefulSet) {
}
}

func withOomKill(recency time.Duration) func(pod *corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Status.ContainerStatuses = []corev1.ContainerStatus{
{
Ready: true,
State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}},
RestartCount: 1,
LastTerminationState: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: OOMExitCode,
Reason: "OOMKilled",
StartedAt: metav1.NewTime(time.Now().Add(-recency)),
},
},
},
}
}
}

func withMirrorReplicasAnnotations(name string, customResourceGVK schema.GroupVersionKind) func(sts *v1.StatefulSet) {
return withAnnotations(map[string]string{
"grafana.com/rollout-mirror-replicas-from-resource-name": name,
Expand Down