diff --git a/cmd/rollout-operator/main.go b/cmd/rollout-operator/main.go index 913e12485..61fb10211 100644 --- a/cmd/rollout-operator/main.go +++ b/cmd/rollout-operator/main.go @@ -46,6 +46,7 @@ type config struct { kubeConfigFile string kubeNamespace string reconcileInterval time.Duration + oomCooldown time.Duration serverTLSEnabled bool serverTLSPort int @@ -71,6 +72,7 @@ func (cfg *config) register(fs *flag.FlagSet) { fs.StringVar(&cfg.kubeConfigFile, "kubernetes.config-file", "", "The Kubernetes config file path. If not specified, it will be auto-detected when running within a Kubernetes cluster.") fs.StringVar(&cfg.kubeNamespace, "kubernetes.namespace", "", "The Kubernetes namespace for which this operator is running.") fs.DurationVar(&cfg.reconcileInterval, "reconcile.interval", 5*time.Second, "The minimum interval of reconciliation.") + fs.DurationVar(&cfg.oomCooldown, "oom.cooldown", 0*time.Minute, "If pods oom killed within cooldown duration, then don't proceed the rollout, 0 means disabled.") fs.BoolVar(&cfg.serverTLSEnabled, "server-tls.enabled", false, "Enable TLS server for webhook connections.") fs.IntVar(&cfg.serverTLSPort, "server-tls.port", 8443, "Port to use for exposing TLS server for webhook connections (if enabled).") @@ -171,7 +173,7 @@ func main() { maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics) // Init the controller. - c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger) + c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.oomCooldown, reg, logger) check(errors.Wrap(c.Init(), "failed to init controller")) // Listen to sigterm, as well as for restart (like for certificate renewal). diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 0ad84bf15..4488af943 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -41,6 +41,10 @@ const ( // the operator reconciles even if no changes are made to the watched resources. informerSyncInterval = 5 * time.Minute lastAppConfAnnKey = "kubectl.kubernetes.io/last-applied-configuration" + // OOMExitCode 137 is the exit code for OOM killed processes in Linux (128 + 9 for SIGNAL_KILL). + // see https://tldp.org/LDP/abs/html/exitcodes.html + // or https://discuss.kubernetes.io/t/how-can-we-tell-if-the-oomkilled-in-k8s-is-because-the-node-is-running-out-of-memory-and-thus-killing-the-pod-or-if-the-pod-itself-is-being-killed-because-the-memory-it-has-requested-exceeds-the-limt-declaration-limit/26303 + OOMExitCode = 137 ) type httpClient interface { @@ -51,6 +55,7 @@ type RolloutController struct { kubeClient kubernetes.Interface namespace string reconcileInterval time.Duration + oomCooldown time.Duration statefulSetsFactory informers.SharedInformerFactory statefulSetLister listersv1.StatefulSetLister statefulSetsInformer cache.SharedIndexInformer @@ -80,6 +85,7 @@ type RolloutController struct { removeLastAppliedReplicasEmptyTotal *prometheus.CounterVec removeLastAppliedReplicasErrorTotal *prometheus.CounterVec lastAppliedReplicasRemovedTotal *prometheus.CounterVec + oomDetectedTotal *prometheus.CounterVec downscaleState *prometheus.GaugeVec // Keep track of discovered rollout groups. We use this information to delete metrics @@ -87,7 +93,7 @@ type RolloutController struct { discoveredGroups map[string]struct{} } -func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { +func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, oomCooldown time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { namespaceOpt := informers.WithNamespace(namespace) // Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones @@ -106,6 +112,7 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM kubeClient: kubeClient, namespace: namespace, reconcileInterval: reconcileInterval, + oomCooldown: oomCooldown, statefulSetsFactory: statefulSetsFactory, statefulSetLister: statefulSetsInformer.Lister(), statefulSetsInformer: statefulSetsInformer.Informer(), @@ -160,6 +167,10 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM Name: "rollout_operator_last_applied_replicas_removed_total", Help: "Total number of .spec.replicas fields removed from last-applied-configuration annotation.", }, []string{"statefulset_name"}), + oomDetectedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_oom_detected_total", + Help: "Total number of OOM killed pods detected in a StatefulSet.", + }, []string{"statefulset_name"}), downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "rollout_operator_downscale_state", Help: "State of the downscale operation.", @@ -481,6 +492,12 @@ func (c *RolloutController) hasStatefulSetNotReadyPods(sts *v1.StatefulSet) (boo return true, nil } + if oomKilledPod := hasRecentlyOOMKilled(c.oomCooldown, pods); oomKilledPod != nil { + level.Warn(c.logger).Log("msg", "OOM killed pods detected", "statefulset", sts.Name, "oomCooldown", c.oomCooldown, "pod", oomKilledPod.Name) + c.oomDetectedTotal.WithLabelValues(sts.Name).Inc() + return true, nil + } + // The number of ready replicas reported by the StatefulSet matches the total number of // replicas. However, there's still no guarantee that all pods are running. For example, // a terminating pod (which we don't consider "ready") may have not yet failed the @@ -529,6 +546,27 @@ func notRunningAndReady(pods []*corev1.Pod) []*corev1.Pod { return notReady } +func hasRecentlyOOMKilled(cooldown time.Duration, pods []*corev1.Pod) *corev1.Pod { + if cooldown == 0 { + // feature is disabled + return nil + } + + // if oom kill happens within cooldown period, then return true else ignore old oom kills + oomCooldownTime := time.Now().Add(-cooldown) + for _, pod := range pods { + for _, cs := range pod.Status.ContainerStatuses { + term := cs.LastTerminationState.Terminated + if cs.RestartCount > 0 && term != nil { + if term.ExitCode == OOMExitCode && term.StartedAt.Time.After(oomCooldownTime) { + return pod + } + } + } + } + return nil +} + // listPods returns pods matching the provided labels selector. Please remember to call // DeepCopy() on the returned pods before doing any change. func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 05b798aa4..393a4e274 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -42,6 +42,11 @@ const ( testLastRevisionHash = "last-hash" ) +// uncomment the line below to enable debug logging +var logger = log.NewNopLogger() + +//var logger = log.NewLogfmtLogger(os.Stdout) + func TestRolloutController_Reconcile(t *testing.T) { customResourceGVK := schema.GroupVersionKind{Group: "my.group", Version: "v1", Kind: "CustomResource"} @@ -50,6 +55,7 @@ func TestRolloutController_Reconcile(t *testing.T) { pods []runtime.Object customResourceScaleSpecReplicas int customResourceScaleStatusReplicas int + oomCooldown time.Duration kubePatchErr error kubeDeleteErr error kubeUpdateErr error @@ -567,6 +573,61 @@ func TestRolloutController_Reconcile(t *testing.T) { expectedPatchedSets: map[string][]string{"ingester-zone-d": {`{"spec":{"replicas":5}}`}}, expectedPatchedResources: map[string][]string{"my.group/v1/customresources/test/status": {`{"status":{"replicas":5}}`}}, }, + "should NOT rollout if pods were oom killed recently": { + oomCooldown: time.Hour, + statefulSets: []runtime.Object{ + mockStatefulSet("ingester-zone-a", withReplicas(2, 2)), + mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()), + }, + pods: []runtime.Object{ + mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash, withOomKill(10*time.Minute)), + mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-0", testPrevRevisionHash), + mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash), + }, + }, + "should rollout if pods were oom killed long time ago": { + oomCooldown: time.Hour, + statefulSets: []runtime.Object{ + mockStatefulSet("ingester-zone-a", withReplicas(2, 2)), + mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()), + }, + pods: []runtime.Object{ + mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash, withOomKill(2*time.Hour)), + mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-0", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash), + }, + expectedDeletedPods: []string{"ingester-zone-b-1"}, + }, + "should rollout if pods were oom killed for the same statefulset": { + oomCooldown: time.Hour, + statefulSets: []runtime.Object{ + mockStatefulSet("ingester-zone-a", withReplicas(2, 2)), + mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()), + }, + pods: []runtime.Object{ + mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-0", testLastRevisionHash, withOomKill(2*time.Minute)), + mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash), + }, + expectedDeletedPods: []string{"ingester-zone-b-1"}, + }, + "should rollout if to update pods were oom killed": { + oomCooldown: time.Hour, + statefulSets: []runtime.Object{ + mockStatefulSet("ingester-zone-a", withReplicas(2, 2)), + mockStatefulSet("ingester-zone-b", withReplicas(2, 2), withPrevRevision()), + }, + pods: []runtime.Object{ + mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash), + mockStatefulSetPod("ingester-zone-b-0", testPrevRevisionHash, withOomKill(2*time.Minute)), + mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash), + }, + expectedDeletedPods: []string{"ingester-zone-b-0", "ingester-zone-b-1"}, + }, } for testName, testData := range tests { @@ -618,7 +679,7 @@ func TestRolloutController_Reconcile(t *testing.T) { // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, time.Hour, reg, logger) require.NoError(t, c.Init()) defer c.Stop() @@ -855,7 +916,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleBoolean(t *testing.T // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, 0, reg, logger) require.NoError(t, c.Init()) defer c.Stop() @@ -1128,7 +1189,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T) // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, 0, reg, logger) require.NoError(t, c.Init()) defer c.Stop() @@ -1231,7 +1292,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, 0, reg, logger) require.NoError(t, c.Init()) defer c.Stop() @@ -1395,6 +1456,25 @@ func withAnnotations(annotations map[string]string) func(sts *v1.StatefulSet) { } } +func withOomKill(recency time.Duration) func(pod *corev1.Pod) { + return func(pod *corev1.Pod) { + pod.Status.ContainerStatuses = []corev1.ContainerStatus{ + { + Ready: true, + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}, + RestartCount: 1, + LastTerminationState: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: OOMExitCode, + Reason: "OOMKilled", + StartedAt: metav1.NewTime(time.Now().Add(-recency)), + }, + }, + }, + } + } +} + func withMirrorReplicasAnnotations(name string, customResourceGVK schema.GroupVersionKind) func(sts *v1.StatefulSet) { return withAnnotations(map[string]string{ "grafana.com/rollout-mirror-replicas-from-resource-name": name,