diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 83fa41aa..0ad84bf1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/json" "fmt" "net/http" "sort" @@ -39,6 +40,7 @@ const ( // How frequently informers should resync. This is also the frequency at which // the operator reconciles even if no changes are made to the watched resources. informerSyncInterval = 5 * time.Minute + lastAppConfAnnKey = "kubectl.kubernetes.io/last-applied-configuration" ) type httpClient interface { @@ -68,12 +70,17 @@ type RolloutController struct { stopCh chan struct{} // Metrics. - groupReconcileTotal *prometheus.CounterVec - groupReconcileFailed *prometheus.CounterVec - groupReconcileDuration *prometheus.HistogramVec - groupReconcileLastSuccess *prometheus.GaugeVec - desiredReplicas *prometheus.GaugeVec - downscaleProbeTotal *prometheus.CounterVec + groupReconcileTotal *prometheus.CounterVec + groupReconcileFailed *prometheus.CounterVec + groupReconcileDuration *prometheus.HistogramVec + groupReconcileLastSuccess *prometheus.GaugeVec + desiredReplicas *prometheus.GaugeVec + downscaleProbeTotal *prometheus.CounterVec + removeLastAppliedReplicasTotal *prometheus.CounterVec + removeLastAppliedReplicasEmptyTotal *prometheus.CounterVec + removeLastAppliedReplicasErrorTotal *prometheus.CounterVec + lastAppliedReplicasRemovedTotal *prometheus.CounterVec + downscaleState *prometheus.GaugeVec // Keep track of discovered rollout groups. We use this information to delete metrics // related to rollout groups that have been decommissioned. @@ -137,6 +144,26 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM Name: "rollout_operator_downscale_probe_total", Help: "Total number of downscale probes.", }, []string{"scale_down_pod_name", "status"}), + removeLastAppliedReplicasTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_remove_last_applied_replicas_total", + Help: "Total number of removal of .spec.replicas field from last-applied-configuration annotation.", + }, []string{"statefulset_name"}), + removeLastAppliedReplicasEmptyTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_remove_last_applied_replicas_empty_total", + Help: "Total number of empty .spec.replicas field from last-applied-configuration annotation.", + }, []string{"statefulset_name"}), + removeLastAppliedReplicasErrorTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_remove_last_applied_replicas_error_total", + Help: "Total number of errors while removing .spec.replicas field from last-applied-configuration annotation.", + }, []string{"statefulset_name", "error"}), + lastAppliedReplicasRemovedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_last_applied_replicas_removed_total", + Help: "Total number of .spec.replicas fields removed from last-applied-configuration annotation.", + }, []string{"statefulset_name"}), + downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rollout_operator_downscale_state", + Help: "State of the downscale operation.", + }, []string{"statefulset_name"}), } return c @@ -267,6 +294,12 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou // Sort StatefulSets to provide a deterministic behaviour. util.SortStatefulSets(sets) + for _, s := range sets { + if err := c.removeReplicasFromLastApplied(ctx, s); err != nil { + level.Error(c.logger).Log("msg", "failed to remove replicas from last-applied-configuration annotation", "statefulset", s.Name, "err", err) + } + } + // Adjust the number of replicas for each StatefulSet in the group if desired. If the number of // replicas of any StatefulSet was adjusted, return early in order to guarantee each STS model is // up-to-date. @@ -669,3 +702,74 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st _, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) return err } + +// removeReplicasFromLastApplied deletes .spec.replicas from the +// kubectl.kubernetes.io/last-applied-configuration annotation on a StatefulSet. +func (c *RolloutController) removeReplicasFromLastApplied( + ctx context.Context, + sts *v1.StatefulSet, +) error { + const noAnnotationErr = "NoAnnotationErr" + const lastAppliedNotFoundErr = "LastAppliedNotFoundErr" + const specNotFoundErr = "SpecNotFoundErr" + const jsonDecodeErr = "JsonDecodeErr" + const jsonEncodeErr = "JsonEncodeErr" + const stsPatchErr = "StsPatchErr" + + c.removeLastAppliedReplicasTotal.WithLabelValues(sts.GetName()).Inc() + anns := sts.GetAnnotations() + if anns == nil { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), noAnnotationErr).Inc() + return fmt.Errorf("no annotation found on statefulset %s", sts.GetName()) + } + raw, ok := anns[lastAppConfAnnKey] + if !ok || raw == "" { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), lastAppliedNotFoundErr).Inc() + return fmt.Errorf("last applied annotation not found in statefulset %s annotations", sts.GetName()) + } + + // Decode annotation JSON. + var obj map[string]any + if err := json.Unmarshal([]byte(raw), &obj); err != nil { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonDecodeErr).Inc() + return fmt.Errorf("unmarshal %s: %w", lastAppConfAnnKey, err) + } + + // Remove spec.replicas. + if spec, ok := obj["spec"].(map[string]any); ok { + if _, ok := spec["replicas"]; !ok { + c.removeLastAppliedReplicasEmptyTotal.WithLabelValues(sts.GetName()).Inc() + return nil + } + delete(spec, "replicas") + if len(spec) == 0 { + delete(obj, "spec") + } + } else { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), specNotFoundErr).Inc() + return fmt.Errorf("no spec found on statefulset %s last applied annotation", sts.GetName()) + } + + // Encode updated annotation. + newRaw, err := json.Marshal(obj) + if err != nil { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonEncodeErr).Inc() + return fmt.Errorf("marshal %s: %w", lastAppConfAnnKey, err) + } + + // Patch StatefulSet with the new annotation. + patch := fmt.Sprintf( + `{"metadata":{"annotations":{"%s":%q}}}`, + lastAppConfAnnKey, + newRaw, + ) + _, err = c.kubeClient.AppsV1(). + StatefulSets(c.namespace). + Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + if err != nil { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), stsPatchErr).Inc() + return err + } + c.lastAppliedReplicasRemovedTotal.WithLabelValues(sts.GetName()).Inc() + return nil +} diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index a09426cc..f72e1327 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -19,6 +19,11 @@ import ( "github.com/grafana/rollout-operator/pkg/config" ) +const ( + idle = iota + waiting +) + func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet, client httpClient) (bool, error) { // Return early no matter what after scaling up or down a single StatefulSet to make sure that rollout-operator // works with up-to-date models. @@ -69,6 +74,11 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx } logMsg := "" + if desiredReplicas == referenceResourceDesiredReplicas { + c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(idle)) + } else { + c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(waiting)) + } if desiredReplicas > currentReplicas { logMsg = "scaling up statefulset to match replicas in the reference resource" } else if desiredReplicas < currentReplicas {