From 6128456daeb5faa6b2324091490e377799e06c91 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Wed, 19 Mar 2025 19:36:38 -0700 Subject: [PATCH 01/10] polish logging and metrics --- pkg/controller/controller.go | 31 ++++++++-------------- pkg/controller/custom_resource_replicas.go | 6 ++--- pkg/controller/delay.go | 14 +++++----- 3 files changed, 20 insertions(+), 31 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 941376cc..83fa41aa 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -68,14 +68,12 @@ type RolloutController struct { stopCh chan struct{} // Metrics. - groupReconcileTotal *prometheus.CounterVec - groupReconcileFailed *prometheus.CounterVec - groupReconcileDuration *prometheus.HistogramVec - groupReconcileLastSuccess *prometheus.GaugeVec - desiredReplicas *prometheus.GaugeVec - scaleDownBoolean *prometheus.GaugeVec - downscaleProbeTotal *prometheus.CounterVec - downscaleProbeFailureTotal *prometheus.CounterVec + groupReconcileTotal *prometheus.CounterVec + groupReconcileFailed *prometheus.CounterVec + groupReconcileDuration *prometheus.HistogramVec + groupReconcileLastSuccess *prometheus.GaugeVec + desiredReplicas *prometheus.GaugeVec + downscaleProbeTotal *prometheus.CounterVec // Keep track of discovered rollout groups. We use this information to delete metrics // related to rollout groups that have been decommissioned. @@ -135,18 +133,10 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM Name: "rollout_operator_statefulset_desired_replicas", Help: "Desired replicas of a Statefulset parsed from CRD.", }, []string{"statefulset_name"}), - scaleDownBoolean: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "rollout_operator_scale_down_boolean", - Help: "Boolean for whether an ingester pod is ready to scale down.", - }, []string{"scale_down_pod_name"}), downscaleProbeTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_downscale_probe_total", Help: "Total number of downscale probes.", - }, []string{"scale_down_pod_name"}), - downscaleProbeFailureTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "rollout_operator_downscale_probe_failure_total", - Help: "Total number of failed downscale probes.", - }, []string{"scale_down_pod_name"}), + }, []string{"scale_down_pod_name", "status"}), } return c @@ -230,7 +220,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error { span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()") defer span.Finish() - level.Info(c.logger).Log("msg", "reconcile started") + level.Info(c.logger).Log("msg", "================ RECONCILE START ================") sets, err := c.listStatefulSetsWithRolloutGroup() if err != nil { @@ -252,7 +242,8 @@ func (c *RolloutController) reconcile(ctx context.Context) error { c.deleteMetricsForDecommissionedGroups(groups) - level.Info(c.logger).Log("msg", "reconcile done") + level.Info(c.logger).Log("msg", "================ RECONCILE DONE ================") + return nil } @@ -517,7 +508,7 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) } func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) { - level.Debug(c.logger).Log("msg", "reconciling StatefulSet==============", "statefulset", sts.Name) + level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name) podsToUpdate, err := c.podsNotMatchingUpdateRevision(sts) if err != nil { diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index b77c98e0..a09426cc 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -50,16 +50,16 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx var desiredReplicas int32 if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "boolean" { level.Debug(c.logger).Log("msg", "boolean scaling logic") - desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.scaleDownBoolean, c.downscaleProbeTotal, c.downscaleProbeFailureTotal) + desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.downscaleProbeTotal) } else { desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) } if err != nil { - level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", + level.Info(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", "group", groupName, "name", sts.GetName(), "currentReplicas", currentReplicas, - "referenceResourceDesiredReplicas", referenceResourceDesiredReplicas, + "desiredReplicas", referenceResourceDesiredReplicas, "err", err, ) diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index 1cff1dda..7815b9e8 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -44,7 +44,7 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger, callCancelDelayedDownscale(ctx, logger, httpClient, endpoints) } -func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) { +func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, downscaleProbeTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) { if desiredReplicas >= currentReplicas { return desiredReplicas, nil } @@ -54,7 +54,7 @@ func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.Statefu return currentReplicas, err } downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), int(desiredReplicas), int(currentReplicas), prepareURL) - scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, scaleDownBooleanMetric, downscaleProbeTotal, downscaleProbeFailureTotal) + scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, downscaleProbeTotal) if err != nil { return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err) } @@ -218,7 +218,7 @@ func createPrepareDownscaleEndpoints(namespace, statefulsetName, serviceName str return eps } -func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (map[int]bool, error) { +func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, downscaleProbeTotal *prometheus.CounterVec) (map[int]bool, error) { if len(endpoints) == 0 { return nil, fmt.Errorf("no endpoints") } @@ -238,33 +238,31 @@ func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logge epLogger := log.With(logger, "pod", ep.podName, "url", target) req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, nil) - downscaleProbeTotal.WithLabelValues(ep.podName).Inc() if err != nil { level.Error(epLogger).Log("msg", "error creating HTTP POST request to endpoint", "err", err) - downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc() + downscaleProbeTotal.WithLabelValues(ep.podName, "error creating HTTP POST request to endpoint").Inc() return err } resp, err := client.Do(req) if err != nil { level.Error(epLogger).Log("msg", "error sending HTTP POST request to endpoint", "err", err) + downscaleProbeTotal.WithLabelValues(ep.podName, "error sending HTTP POST request to endpoint").Inc() return err } defer resp.Body.Close() scalableMu.Lock() + downscaleProbeTotal.WithLabelValues(ep.podName, resp.Status).Inc() if resp.StatusCode == 200 { scalable[ep.replica] = true - scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(1) } else { if resp.StatusCode != 425 { // 425 too early - downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc() level.Error(epLogger).Log("msg", "downscale POST got unexpected status", resp.StatusCode) } scalable[ep.replica] = false - scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(0) } scalableMu.Unlock() From 842feea40b57c728be2f0ad841f31c79864d7b72 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 20 Mar 2025 11:34:34 -0700 Subject: [PATCH 02/10] use status code as downscale metrics --- pkg/controller/delay.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index 7815b9e8..5f6b090e 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/url" + "strconv" "sync" "time" @@ -254,7 +255,7 @@ func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logge defer resp.Body.Close() scalableMu.Lock() - downscaleProbeTotal.WithLabelValues(ep.podName, resp.Status).Inc() + downscaleProbeTotal.WithLabelValues(ep.podName, strconv.Itoa(resp.StatusCode)).Inc() if resp.StatusCode == 200 { scalable[ep.replica] = true } else { From 9e1be1ea63b97fffd031ea8c7c2903141a2c195f Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 20 Mar 2025 11:38:12 -0700 Subject: [PATCH 03/10] do not cancel downscale --- pkg/controller/custom_resource_replicas.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index a09426cc..d5f94024 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -39,7 +39,7 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx c.desiredReplicas.WithLabelValues(sts.GetName()).Set(float64(referenceResourceDesiredReplicas)) if currentReplicas == referenceResourceDesiredReplicas { updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, referenceResourceDesiredReplicas) - cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas) + // cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas) // No change in the number of replicas: don't log because this will be the result most of the time. continue } From d322a2704cab73de8524cfddd055e55eb84378d7 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 20 Mar 2025 11:40:48 -0700 Subject: [PATCH 04/10] do not cancel downscale --- pkg/controller/custom_resource_replicas.go | 2 +- pkg/controller/delay.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index d5f94024..a09426cc 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -39,7 +39,7 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx c.desiredReplicas.WithLabelValues(sts.GetName()).Set(float64(referenceResourceDesiredReplicas)) if currentReplicas == referenceResourceDesiredReplicas { updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, referenceResourceDesiredReplicas) - // cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas) + cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas) // No change in the number of replicas: don't log because this will be the result most of the time. continue } diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index 5f6b090e..657d32e7 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -161,7 +161,7 @@ func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Durat delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey] urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey] - if delayStr == "" || urlStr == "" { + if delayStr == "" || delayStr == "boolean" || urlStr == "" { return 0, nil, nil } From a71da17fc1efbec9d16611e3e8bcee79f498a67c Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 15 May 2025 12:06:27 -0700 Subject: [PATCH 05/10] add a flag to remove last applied replicas --- cmd/rollout-operator/main.go | 6 +- pkg/controller/controller.go | 201 ++++++++++++++++++++++++------ pkg/controller/controller_test.go | 8 +- 3 files changed, 173 insertions(+), 42 deletions(-) diff --git a/cmd/rollout-operator/main.go b/cmd/rollout-operator/main.go index 913e1248..2cbba3e5 100644 --- a/cmd/rollout-operator/main.go +++ b/cmd/rollout-operator/main.go @@ -62,6 +62,8 @@ type config struct { useZoneTracker bool zoneTrackerConfigMapName string + + removeLastAppliedReplicas bool } func (cfg *config) register(fs *flag.FlagSet) { @@ -88,6 +90,8 @@ func (cfg *config) register(fs *flag.FlagSet) { fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones") fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker") + + fs.BoolVar(&cfg.removeLastAppliedReplicas, "remove-last-applied-replicas", false, "Remove the .spec.replicas field from statefulsets last applied config, so omitting this field when kubectl apply doesn't reset replicas to 1") } func (cfg config) validate() error { @@ -171,7 +175,7 @@ func main() { maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics) // Init the controller. - c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger) + c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.removeLastAppliedReplicas, reg, logger) check(errors.Wrap(c.Init(), "failed to init controller")) // Listen to sigterm, as well as for restart (like for certificate renewal). diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 83fa41aa..f4e44d21 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/json" "fmt" "net/http" "sort" @@ -39,6 +40,7 @@ const ( // How frequently informers should resync. This is also the frequency at which // the operator reconciles even if no changes are made to the watched resources. informerSyncInterval = 5 * time.Minute + lastAppConfAnnKey = "kubectl.kubernetes.io/last-applied-configuration" ) type httpClient interface { @@ -46,20 +48,21 @@ type httpClient interface { } type RolloutController struct { - kubeClient kubernetes.Interface - namespace string - reconcileInterval time.Duration - statefulSetsFactory informers.SharedInformerFactory - statefulSetLister listersv1.StatefulSetLister - statefulSetsInformer cache.SharedIndexInformer - podsFactory informers.SharedInformerFactory - podLister corelisters.PodLister - podsInformer cache.SharedIndexInformer - restMapper meta.RESTMapper - scaleClient scale.ScalesGetter - dynamicClient dynamic.Interface - httpClient httpClient - logger log.Logger + kubeClient kubernetes.Interface + namespace string + reconcileInterval time.Duration + removeLastAppliedReplicas bool + statefulSetsFactory informers.SharedInformerFactory + statefulSetLister listersv1.StatefulSetLister + statefulSetsInformer cache.SharedIndexInformer + podsFactory informers.SharedInformerFactory + podLister corelisters.PodLister + podsInformer cache.SharedIndexInformer + restMapper meta.RESTMapper + scaleClient scale.ScalesGetter + dynamicClient dynamic.Interface + httpClient httpClient + logger log.Logger // This bool is true if we should trigger a reconcile. shouldReconcile atomic.Bool @@ -68,19 +71,22 @@ type RolloutController struct { stopCh chan struct{} // Metrics. - groupReconcileTotal *prometheus.CounterVec - groupReconcileFailed *prometheus.CounterVec - groupReconcileDuration *prometheus.HistogramVec - groupReconcileLastSuccess *prometheus.GaugeVec - desiredReplicas *prometheus.GaugeVec - downscaleProbeTotal *prometheus.CounterVec + groupReconcileTotal *prometheus.CounterVec + groupReconcileFailed *prometheus.CounterVec + groupReconcileDuration *prometheus.HistogramVec + groupReconcileLastSuccess *prometheus.GaugeVec + desiredReplicas *prometheus.GaugeVec + downscaleProbeTotal *prometheus.CounterVec + removeLastAppliedReplicaTotal *prometheus.CounterVec + removeLastAppliedReplicaEmptyTotal *prometheus.CounterVec + removeLastAppliedReplicaErrorTotal *prometheus.CounterVec // Keep track of discovered rollout groups. We use this information to delete metrics // related to rollout groups that have been decommissioned. discoveredGroups map[string]struct{} } -func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { +func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, removeLastAppliedReplicas bool, reg prometheus.Registerer, logger log.Logger) *RolloutController { namespaceOpt := informers.WithNamespace(namespace) // Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones @@ -96,22 +102,23 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM podsInformer := podsFactory.Core().V1().Pods() c := &RolloutController{ - kubeClient: kubeClient, - namespace: namespace, - reconcileInterval: reconcileInterval, - statefulSetsFactory: statefulSetsFactory, - statefulSetLister: statefulSetsInformer.Lister(), - statefulSetsInformer: statefulSetsInformer.Informer(), - podsFactory: podsFactory, - podLister: podsInformer.Lister(), - podsInformer: podsInformer.Informer(), - restMapper: restMapper, - scaleClient: scaleClient, - dynamicClient: dynamic, - httpClient: client, - logger: logger, - stopCh: make(chan struct{}), - discoveredGroups: map[string]struct{}{}, + kubeClient: kubeClient, + namespace: namespace, + reconcileInterval: reconcileInterval, + removeLastAppliedReplicas: removeLastAppliedReplicas, + statefulSetsFactory: statefulSetsFactory, + statefulSetLister: statefulSetsInformer.Lister(), + statefulSetsInformer: statefulSetsInformer.Informer(), + podsFactory: podsFactory, + podLister: podsInformer.Lister(), + podsInformer: podsInformer.Informer(), + restMapper: restMapper, + scaleClient: scaleClient, + dynamicClient: dynamic, + httpClient: client, + logger: logger, + stopCh: make(chan struct{}), + discoveredGroups: map[string]struct{}{}, groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_group_reconciles_total", Help: "Total number of reconciles started for a specific rollout group.", @@ -137,6 +144,18 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM Name: "rollout_operator_downscale_probe_total", Help: "Total number of downscale probes.", }, []string{"scale_down_pod_name", "status"}), + removeLastAppliedReplicaTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_remove_last_applied_replicas_total", + Help: "Total number of removal of .spec.replicas field from last-applied-configuration annotation.", + }, []string{"statefulset_name"}), + removeLastAppliedReplicaEmptyTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_remove_last_applied_replicas_empty_total", + Help: "Total number of empty .spec.replicas field from last-applied-configuration annotation.", + }, []string{"statefulset_name"}), + removeLastAppliedReplicaErrorTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_remove_last_applied_replicas_error_total", + Help: "Total number of errors while removing .spec.replicas field from last-applied-configuration annotation.", + }, []string{"statefulset_name", "error"}), } return c @@ -267,6 +286,14 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou // Sort StatefulSets to provide a deterministic behaviour. util.SortStatefulSets(sets) + if c.removeLastAppliedReplicas { + for _, s := range sets { + if err := c.removeReplicasFromLastApplied(ctx, s); err != nil { + return errors.Wrapf(err, "failed to remove last-applied-configuration annotation from StatefulSet %s", s.GetName()) + } + } + } + // Adjust the number of replicas for each StatefulSet in the group if desired. If the number of // replicas of any StatefulSet was adjusted, return early in order to guarantee each STS model is // up-to-date. @@ -669,3 +696,103 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st _, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) return err } + +// replicasAbsentLastAppConf returns true if .spec.replicas is NOT present in the last applied configuration +func replicasAbsentLastAppConf(sts *v1.StatefulSet) (bool, error) { + raw, ok := sts.Annotations[lastAppConfAnnKey] + if !ok || raw == "" { + return true, nil // nothing to check + } + + var obj map[string]any + if err := json.Unmarshal([]byte(raw), &obj); err != nil { + return false, err + } + spec, ok := obj["spec"].(map[string]any) + if !ok { + return true, nil + } + _, has := spec["replicas"] + return !has, nil +} + +// removeReplicasFromLastApplied deletes .spec.replicas from the +// kubectl.kubernetes.io/last-applied-configuration annotation on a StatefulSet. +func (c *RolloutController) removeReplicasFromLastApplied( + ctx context.Context, + sts *v1.StatefulSet, +) error { + const noAnnotationErr = "NoAnnotationErr" + const lastAppliedNotFoundErr = "LastAppliedNotFoundErr" + const specNotFoundErr = "SpecNotFoundErr" + const jsonDecodeErr = "JsonDecodeErr" + const jsonEncodeErr = "JsonEncodeErr" + const stsPatchErr = "StsPatchErr" + const verifyErr = "VerifyErr" + const verifyFailed = "VerifyFailed" + + c.removeLastAppliedReplicaTotal.WithLabelValues(sts.GetName()).Inc() + anns := sts.GetAnnotations() + if anns == nil { + c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), noAnnotationErr).Inc() + return fmt.Errorf("no annotation found on statefulset %s", sts.GetName()) + } + raw, ok := anns[lastAppConfAnnKey] + if !ok || raw == "" { + c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), lastAppliedNotFoundErr).Inc() + return fmt.Errorf("last applied annotation not found in statefulset %s annotations", sts.GetName()) + } + + // Decode annotation JSON. + var obj map[string]any + if err := json.Unmarshal([]byte(raw), &obj); err != nil { + c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), jsonDecodeErr).Inc() + return fmt.Errorf("unmarshal %s: %w", lastAppConfAnnKey, err) + } + + // Remove spec.replicas. + if spec, ok := obj["spec"].(map[string]any); ok { + if _, ok := spec["replicas"]; !ok { + c.removeLastAppliedReplicaEmptyTotal.WithLabelValues(sts.GetName()).Inc() + return nil + } + delete(spec, "replicas") + if len(spec) == 0 { + delete(obj, "spec") + } + } else { + c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), specNotFoundErr).Inc() + return fmt.Errorf("no spec found on statefulset %s last applied annotation", sts.GetName()) + } + + // Encode updated annotation. + newRaw, err := json.Marshal(obj) + if err != nil { + c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), jsonEncodeErr).Inc() + return fmt.Errorf("marshal %s: %w", lastAppConfAnnKey, err) + } + + // Patch StatefulSet with the new annotation. + patch := fmt.Sprintf( + `{"metadata":{"annotations":{"%s":%q}}}`, + lastAppConfAnnKey, + newRaw, + ) + _, err = c.kubeClient.AppsV1(). + StatefulSets(c.namespace). + Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + if err != nil { + c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), stsPatchErr).Inc() + return err + } + ok, err = replicasAbsentLastAppConf(sts) + if err != nil { + c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), verifyErr).Inc() + return fmt.Errorf("verify %s: %w", sts.GetName(), err) + } + if !ok { + c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), verifyFailed).Inc() + return fmt.Errorf("verify %s: replicas still present in last applied annotation", sts.GetName()) + } + return nil +} diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 05b798aa..9e9e6bfb 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -618,7 +618,7 @@ func TestRolloutController_Reconcile(t *testing.T) { // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, false, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -855,7 +855,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleBoolean(t *testing.T // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, false, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -1128,7 +1128,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T) // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, false, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -1231,7 +1231,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, false, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() From 8cdc861eae9aced1d6be2e6c664fe3b9b00c2705 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 15 May 2025 13:34:21 -0700 Subject: [PATCH 06/10] print err --- pkg/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f4e44d21..80bddbca 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -289,7 +289,7 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou if c.removeLastAppliedReplicas { for _, s := range sets { if err := c.removeReplicasFromLastApplied(ctx, s); err != nil { - return errors.Wrapf(err, "failed to remove last-applied-configuration annotation from StatefulSet %s", s.GetName()) + level.Error(c.logger).Log("msg", "failed to remove replicas from last-applied-configuration annotation", "statefulset", s.Name, "err", err) } } } From 7ab8bc0ae084358364b73b978f54dbac70547167 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 15 May 2025 14:06:37 -0700 Subject: [PATCH 07/10] add a gauge metric --- pkg/controller/controller.go | 5 +++++ pkg/controller/custom_resource_replicas.go | 10 ++++++++++ 2 files changed, 15 insertions(+) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 80bddbca..5033d523 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -80,6 +80,7 @@ type RolloutController struct { removeLastAppliedReplicaTotal *prometheus.CounterVec removeLastAppliedReplicaEmptyTotal *prometheus.CounterVec removeLastAppliedReplicaErrorTotal *prometheus.CounterVec + downscaleState *prometheus.GaugeVec // Keep track of discovered rollout groups. We use this information to delete metrics // related to rollout groups that have been decommissioned. @@ -156,6 +157,10 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM Name: "rollout_operator_remove_last_applied_replicas_error_total", Help: "Total number of errors while removing .spec.replicas field from last-applied-configuration annotation.", }, []string{"statefulset_name", "error"}), + downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rollout_operator_downscale_state", + Help: "State of the downscale operation.", + }, []string{"statefulset_name"}), } return c diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index a09426cc..f72e1327 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -19,6 +19,11 @@ import ( "github.com/grafana/rollout-operator/pkg/config" ) +const ( + idle = iota + waiting +) + func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet, client httpClient) (bool, error) { // Return early no matter what after scaling up or down a single StatefulSet to make sure that rollout-operator // works with up-to-date models. @@ -69,6 +74,11 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx } logMsg := "" + if desiredReplicas == referenceResourceDesiredReplicas { + c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(idle)) + } else { + c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(waiting)) + } if desiredReplicas > currentReplicas { logMsg = "scaling up statefulset to match replicas in the reference resource" } else if desiredReplicas < currentReplicas { From b2594be9b17775cc9cc86ad6ce5312d0b9b9bb6e Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 15 May 2025 14:24:36 -0700 Subject: [PATCH 08/10] remove flag --- cmd/rollout-operator/main.go | 6 +-- pkg/controller/controller.go | 72 +++++++++++++++---------------- pkg/controller/controller_test.go | 8 ++-- 3 files changed, 39 insertions(+), 47 deletions(-) diff --git a/cmd/rollout-operator/main.go b/cmd/rollout-operator/main.go index 2cbba3e5..913e1248 100644 --- a/cmd/rollout-operator/main.go +++ b/cmd/rollout-operator/main.go @@ -62,8 +62,6 @@ type config struct { useZoneTracker bool zoneTrackerConfigMapName string - - removeLastAppliedReplicas bool } func (cfg *config) register(fs *flag.FlagSet) { @@ -90,8 +88,6 @@ func (cfg *config) register(fs *flag.FlagSet) { fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones") fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker") - - fs.BoolVar(&cfg.removeLastAppliedReplicas, "remove-last-applied-replicas", false, "Remove the .spec.replicas field from statefulsets last applied config, so omitting this field when kubectl apply doesn't reset replicas to 1") } func (cfg config) validate() error { @@ -175,7 +171,7 @@ func main() { maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics) // Init the controller. - c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.removeLastAppliedReplicas, reg, logger) + c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger) check(errors.Wrap(c.Init(), "failed to init controller")) // Listen to sigterm, as well as for restart (like for certificate renewal). diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5033d523..1e6f16ab 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -48,21 +48,20 @@ type httpClient interface { } type RolloutController struct { - kubeClient kubernetes.Interface - namespace string - reconcileInterval time.Duration - removeLastAppliedReplicas bool - statefulSetsFactory informers.SharedInformerFactory - statefulSetLister listersv1.StatefulSetLister - statefulSetsInformer cache.SharedIndexInformer - podsFactory informers.SharedInformerFactory - podLister corelisters.PodLister - podsInformer cache.SharedIndexInformer - restMapper meta.RESTMapper - scaleClient scale.ScalesGetter - dynamicClient dynamic.Interface - httpClient httpClient - logger log.Logger + kubeClient kubernetes.Interface + namespace string + reconcileInterval time.Duration + statefulSetsFactory informers.SharedInformerFactory + statefulSetLister listersv1.StatefulSetLister + statefulSetsInformer cache.SharedIndexInformer + podsFactory informers.SharedInformerFactory + podLister corelisters.PodLister + podsInformer cache.SharedIndexInformer + restMapper meta.RESTMapper + scaleClient scale.ScalesGetter + dynamicClient dynamic.Interface + httpClient httpClient + logger log.Logger // This bool is true if we should trigger a reconcile. shouldReconcile atomic.Bool @@ -87,7 +86,7 @@ type RolloutController struct { discoveredGroups map[string]struct{} } -func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, removeLastAppliedReplicas bool, reg prometheus.Registerer, logger log.Logger) *RolloutController { +func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { namespaceOpt := informers.WithNamespace(namespace) // Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones @@ -103,23 +102,22 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM podsInformer := podsFactory.Core().V1().Pods() c := &RolloutController{ - kubeClient: kubeClient, - namespace: namespace, - reconcileInterval: reconcileInterval, - removeLastAppliedReplicas: removeLastAppliedReplicas, - statefulSetsFactory: statefulSetsFactory, - statefulSetLister: statefulSetsInformer.Lister(), - statefulSetsInformer: statefulSetsInformer.Informer(), - podsFactory: podsFactory, - podLister: podsInformer.Lister(), - podsInformer: podsInformer.Informer(), - restMapper: restMapper, - scaleClient: scaleClient, - dynamicClient: dynamic, - httpClient: client, - logger: logger, - stopCh: make(chan struct{}), - discoveredGroups: map[string]struct{}{}, + kubeClient: kubeClient, + namespace: namespace, + reconcileInterval: reconcileInterval, + statefulSetsFactory: statefulSetsFactory, + statefulSetLister: statefulSetsInformer.Lister(), + statefulSetsInformer: statefulSetsInformer.Informer(), + podsFactory: podsFactory, + podLister: podsInformer.Lister(), + podsInformer: podsInformer.Informer(), + restMapper: restMapper, + scaleClient: scaleClient, + dynamicClient: dynamic, + httpClient: client, + logger: logger, + stopCh: make(chan struct{}), + discoveredGroups: map[string]struct{}{}, groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_group_reconciles_total", Help: "Total number of reconciles started for a specific rollout group.", @@ -291,11 +289,9 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou // Sort StatefulSets to provide a deterministic behaviour. util.SortStatefulSets(sets) - if c.removeLastAppliedReplicas { - for _, s := range sets { - if err := c.removeReplicasFromLastApplied(ctx, s); err != nil { - level.Error(c.logger).Log("msg", "failed to remove replicas from last-applied-configuration annotation", "statefulset", s.Name, "err", err) - } + for _, s := range sets { + if err := c.removeReplicasFromLastApplied(ctx, s); err != nil { + level.Error(c.logger).Log("msg", "failed to remove replicas from last-applied-configuration annotation", "statefulset", s.Name, "err", err) } } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 9e9e6bfb..05b798aa 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -618,7 +618,7 @@ func TestRolloutController_Reconcile(t *testing.T) { // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, false, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -855,7 +855,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleBoolean(t *testing.T // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, false, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -1128,7 +1128,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T) // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, false, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -1231,7 +1231,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, false, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() From ab93933d99fbad671d6026405fd7cf0654ce7c36 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 15 May 2025 15:24:01 -0700 Subject: [PATCH 09/10] add another gague --- pkg/controller/controller.go | 37 +++++++----------------------------- 1 file changed, 7 insertions(+), 30 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 1e6f16ab..a6b3c59f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -80,6 +80,7 @@ type RolloutController struct { removeLastAppliedReplicaEmptyTotal *prometheus.CounterVec removeLastAppliedReplicaErrorTotal *prometheus.CounterVec downscaleState *prometheus.GaugeVec + lastAppliedReplicasPresent *prometheus.GaugeVec // Keep track of discovered rollout groups. We use this information to delete metrics // related to rollout groups that have been decommissioned. @@ -159,6 +160,10 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM Name: "rollout_operator_downscale_state", Help: "State of the downscale operation.", }, []string{"statefulset_name"}), + lastAppliedReplicasPresent: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rollout_operator_last_applied_replicas_present", + Help: "Whether the last-applied-configuration annotation contains .spec.replicas field.", + }, []string{"statefulset_name"}), } return c @@ -698,25 +703,6 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st return err } -// replicasAbsentLastAppConf returns true if .spec.replicas is NOT present in the last applied configuration -func replicasAbsentLastAppConf(sts *v1.StatefulSet) (bool, error) { - raw, ok := sts.Annotations[lastAppConfAnnKey] - if !ok || raw == "" { - return true, nil // nothing to check - } - - var obj map[string]any - if err := json.Unmarshal([]byte(raw), &obj); err != nil { - return false, err - } - spec, ok := obj["spec"].(map[string]any) - if !ok { - return true, nil - } - _, has := spec["replicas"] - return !has, nil -} - // removeReplicasFromLastApplied deletes .spec.replicas from the // kubectl.kubernetes.io/last-applied-configuration annotation on a StatefulSet. func (c *RolloutController) removeReplicasFromLastApplied( @@ -729,8 +715,6 @@ func (c *RolloutController) removeReplicasFromLastApplied( const jsonDecodeErr = "JsonDecodeErr" const jsonEncodeErr = "JsonEncodeErr" const stsPatchErr = "StsPatchErr" - const verifyErr = "VerifyErr" - const verifyFailed = "VerifyFailed" c.removeLastAppliedReplicaTotal.WithLabelValues(sts.GetName()).Inc() anns := sts.GetAnnotations() @@ -754,9 +738,11 @@ func (c *RolloutController) removeReplicasFromLastApplied( // Remove spec.replicas. if spec, ok := obj["spec"].(map[string]any); ok { if _, ok := spec["replicas"]; !ok { + c.lastAppliedReplicasPresent.WithLabelValues(sts.GetName()).Set(0) c.removeLastAppliedReplicaEmptyTotal.WithLabelValues(sts.GetName()).Inc() return nil } + c.lastAppliedReplicasPresent.WithLabelValues(sts.GetName()).Set(1) delete(spec, "replicas") if len(spec) == 0 { delete(obj, "spec") @@ -786,14 +772,5 @@ func (c *RolloutController) removeReplicasFromLastApplied( c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), stsPatchErr).Inc() return err } - ok, err = replicasAbsentLastAppConf(sts) - if err != nil { - c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), verifyErr).Inc() - return fmt.Errorf("verify %s: %w", sts.GetName(), err) - } - if !ok { - c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), verifyFailed).Inc() - return fmt.Errorf("verify %s: replicas still present in last applied annotation", sts.GetName()) - } return nil } From aec33801b6e1be992bf31372202f84fcabfa5ed3 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 15 May 2025 15:30:33 -0700 Subject: [PATCH 10/10] update metrics --- pkg/controller/controller.go | 55 ++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a6b3c59f..0ad84bf1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -70,17 +70,17 @@ type RolloutController struct { stopCh chan struct{} // Metrics. - groupReconcileTotal *prometheus.CounterVec - groupReconcileFailed *prometheus.CounterVec - groupReconcileDuration *prometheus.HistogramVec - groupReconcileLastSuccess *prometheus.GaugeVec - desiredReplicas *prometheus.GaugeVec - downscaleProbeTotal *prometheus.CounterVec - removeLastAppliedReplicaTotal *prometheus.CounterVec - removeLastAppliedReplicaEmptyTotal *prometheus.CounterVec - removeLastAppliedReplicaErrorTotal *prometheus.CounterVec - downscaleState *prometheus.GaugeVec - lastAppliedReplicasPresent *prometheus.GaugeVec + groupReconcileTotal *prometheus.CounterVec + groupReconcileFailed *prometheus.CounterVec + groupReconcileDuration *prometheus.HistogramVec + groupReconcileLastSuccess *prometheus.GaugeVec + desiredReplicas *prometheus.GaugeVec + downscaleProbeTotal *prometheus.CounterVec + removeLastAppliedReplicasTotal *prometheus.CounterVec + removeLastAppliedReplicasEmptyTotal *prometheus.CounterVec + removeLastAppliedReplicasErrorTotal *prometheus.CounterVec + lastAppliedReplicasRemovedTotal *prometheus.CounterVec + downscaleState *prometheus.GaugeVec // Keep track of discovered rollout groups. We use this information to delete metrics // related to rollout groups that have been decommissioned. @@ -144,26 +144,26 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM Name: "rollout_operator_downscale_probe_total", Help: "Total number of downscale probes.", }, []string{"scale_down_pod_name", "status"}), - removeLastAppliedReplicaTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + removeLastAppliedReplicasTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_remove_last_applied_replicas_total", Help: "Total number of removal of .spec.replicas field from last-applied-configuration annotation.", }, []string{"statefulset_name"}), - removeLastAppliedReplicaEmptyTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + removeLastAppliedReplicasEmptyTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_remove_last_applied_replicas_empty_total", Help: "Total number of empty .spec.replicas field from last-applied-configuration annotation.", }, []string{"statefulset_name"}), - removeLastAppliedReplicaErrorTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + removeLastAppliedReplicasErrorTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_remove_last_applied_replicas_error_total", Help: "Total number of errors while removing .spec.replicas field from last-applied-configuration annotation.", }, []string{"statefulset_name", "error"}), + lastAppliedReplicasRemovedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_last_applied_replicas_removed_total", + Help: "Total number of .spec.replicas fields removed from last-applied-configuration annotation.", + }, []string{"statefulset_name"}), downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "rollout_operator_downscale_state", Help: "State of the downscale operation.", }, []string{"statefulset_name"}), - lastAppliedReplicasPresent: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "rollout_operator_last_applied_replicas_present", - Help: "Whether the last-applied-configuration annotation contains .spec.replicas field.", - }, []string{"statefulset_name"}), } return c @@ -716,46 +716,44 @@ func (c *RolloutController) removeReplicasFromLastApplied( const jsonEncodeErr = "JsonEncodeErr" const stsPatchErr = "StsPatchErr" - c.removeLastAppliedReplicaTotal.WithLabelValues(sts.GetName()).Inc() + c.removeLastAppliedReplicasTotal.WithLabelValues(sts.GetName()).Inc() anns := sts.GetAnnotations() if anns == nil { - c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), noAnnotationErr).Inc() + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), noAnnotationErr).Inc() return fmt.Errorf("no annotation found on statefulset %s", sts.GetName()) } raw, ok := anns[lastAppConfAnnKey] if !ok || raw == "" { - c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), lastAppliedNotFoundErr).Inc() + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), lastAppliedNotFoundErr).Inc() return fmt.Errorf("last applied annotation not found in statefulset %s annotations", sts.GetName()) } // Decode annotation JSON. var obj map[string]any if err := json.Unmarshal([]byte(raw), &obj); err != nil { - c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), jsonDecodeErr).Inc() + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonDecodeErr).Inc() return fmt.Errorf("unmarshal %s: %w", lastAppConfAnnKey, err) } // Remove spec.replicas. if spec, ok := obj["spec"].(map[string]any); ok { if _, ok := spec["replicas"]; !ok { - c.lastAppliedReplicasPresent.WithLabelValues(sts.GetName()).Set(0) - c.removeLastAppliedReplicaEmptyTotal.WithLabelValues(sts.GetName()).Inc() + c.removeLastAppliedReplicasEmptyTotal.WithLabelValues(sts.GetName()).Inc() return nil } - c.lastAppliedReplicasPresent.WithLabelValues(sts.GetName()).Set(1) delete(spec, "replicas") if len(spec) == 0 { delete(obj, "spec") } } else { - c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), specNotFoundErr).Inc() + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), specNotFoundErr).Inc() return fmt.Errorf("no spec found on statefulset %s last applied annotation", sts.GetName()) } // Encode updated annotation. newRaw, err := json.Marshal(obj) if err != nil { - c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), jsonEncodeErr).Inc() + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonEncodeErr).Inc() return fmt.Errorf("marshal %s: %w", lastAppConfAnnKey, err) } @@ -769,8 +767,9 @@ func (c *RolloutController) removeReplicasFromLastApplied( StatefulSets(c.namespace). Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) if err != nil { - c.removeLastAppliedReplicaErrorTotal.WithLabelValues(sts.GetName(), stsPatchErr).Inc() + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), stsPatchErr).Inc() return err } + c.lastAppliedReplicasRemovedTotal.WithLabelValues(sts.GetName()).Inc() return nil }