diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 941376cc..0ad84bf1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/json" "fmt" "net/http" "sort" @@ -39,6 +40,7 @@ const ( // How frequently informers should resync. This is also the frequency at which // the operator reconciles even if no changes are made to the watched resources. informerSyncInterval = 5 * time.Minute + lastAppConfAnnKey = "kubectl.kubernetes.io/last-applied-configuration" ) type httpClient interface { @@ -68,14 +70,17 @@ type RolloutController struct { stopCh chan struct{} // Metrics. - groupReconcileTotal *prometheus.CounterVec - groupReconcileFailed *prometheus.CounterVec - groupReconcileDuration *prometheus.HistogramVec - groupReconcileLastSuccess *prometheus.GaugeVec - desiredReplicas *prometheus.GaugeVec - scaleDownBoolean *prometheus.GaugeVec - downscaleProbeTotal *prometheus.CounterVec - downscaleProbeFailureTotal *prometheus.CounterVec + groupReconcileTotal *prometheus.CounterVec + groupReconcileFailed *prometheus.CounterVec + groupReconcileDuration *prometheus.HistogramVec + groupReconcileLastSuccess *prometheus.GaugeVec + desiredReplicas *prometheus.GaugeVec + downscaleProbeTotal *prometheus.CounterVec + removeLastAppliedReplicasTotal *prometheus.CounterVec + removeLastAppliedReplicasEmptyTotal *prometheus.CounterVec + removeLastAppliedReplicasErrorTotal *prometheus.CounterVec + lastAppliedReplicasRemovedTotal *prometheus.CounterVec + downscaleState *prometheus.GaugeVec // Keep track of discovered rollout groups. We use this information to delete metrics // related to rollout groups that have been decommissioned. @@ -135,18 +140,30 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM Name: "rollout_operator_statefulset_desired_replicas", Help: "Desired replicas of a Statefulset parsed from CRD.", }, []string{"statefulset_name"}), - scaleDownBoolean: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "rollout_operator_scale_down_boolean", - Help: "Boolean for whether an ingester pod is ready to scale down.", - }, []string{"scale_down_pod_name"}), downscaleProbeTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_downscale_probe_total", Help: "Total number of downscale probes.", - }, []string{"scale_down_pod_name"}), - downscaleProbeFailureTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "rollout_operator_downscale_probe_failure_total", - Help: "Total number of failed downscale probes.", - }, []string{"scale_down_pod_name"}), + }, []string{"scale_down_pod_name", "status"}), + removeLastAppliedReplicasTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_remove_last_applied_replicas_total", + Help: "Total number of removal of .spec.replicas field from last-applied-configuration annotation.", + }, []string{"statefulset_name"}), + removeLastAppliedReplicasEmptyTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_remove_last_applied_replicas_empty_total", + Help: "Total number of empty .spec.replicas field from last-applied-configuration annotation.", + }, []string{"statefulset_name"}), + removeLastAppliedReplicasErrorTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_remove_last_applied_replicas_error_total", + Help: "Total number of errors while removing .spec.replicas field from last-applied-configuration annotation.", + }, []string{"statefulset_name", "error"}), + lastAppliedReplicasRemovedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "rollout_operator_last_applied_replicas_removed_total", + Help: "Total number of .spec.replicas fields removed from last-applied-configuration annotation.", + }, []string{"statefulset_name"}), + downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "rollout_operator_downscale_state", + Help: "State of the downscale operation.", + }, []string{"statefulset_name"}), } return c @@ -230,7 +247,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error { span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()") defer span.Finish() - level.Info(c.logger).Log("msg", "reconcile started") + level.Info(c.logger).Log("msg", "================ RECONCILE START ================") sets, err := c.listStatefulSetsWithRolloutGroup() if err != nil { @@ -252,7 +269,8 @@ func (c *RolloutController) reconcile(ctx context.Context) error { c.deleteMetricsForDecommissionedGroups(groups) - level.Info(c.logger).Log("msg", "reconcile done") + level.Info(c.logger).Log("msg", "================ RECONCILE DONE ================") + return nil } @@ -276,6 +294,12 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou // Sort StatefulSets to provide a deterministic behaviour. util.SortStatefulSets(sets) + for _, s := range sets { + if err := c.removeReplicasFromLastApplied(ctx, s); err != nil { + level.Error(c.logger).Log("msg", "failed to remove replicas from last-applied-configuration annotation", "statefulset", s.Name, "err", err) + } + } + // Adjust the number of replicas for each StatefulSet in the group if desired. If the number of // replicas of any StatefulSet was adjusted, return early in order to guarantee each STS model is // up-to-date. @@ -517,7 +541,7 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) } func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) { - level.Debug(c.logger).Log("msg", "reconciling StatefulSet==============", "statefulset", sts.Name) + level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name) podsToUpdate, err := c.podsNotMatchingUpdateRevision(sts) if err != nil { @@ -678,3 +702,74 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st _, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) return err } + +// removeReplicasFromLastApplied deletes .spec.replicas from the +// kubectl.kubernetes.io/last-applied-configuration annotation on a StatefulSet. +func (c *RolloutController) removeReplicasFromLastApplied( + ctx context.Context, + sts *v1.StatefulSet, +) error { + const noAnnotationErr = "NoAnnotationErr" + const lastAppliedNotFoundErr = "LastAppliedNotFoundErr" + const specNotFoundErr = "SpecNotFoundErr" + const jsonDecodeErr = "JsonDecodeErr" + const jsonEncodeErr = "JsonEncodeErr" + const stsPatchErr = "StsPatchErr" + + c.removeLastAppliedReplicasTotal.WithLabelValues(sts.GetName()).Inc() + anns := sts.GetAnnotations() + if anns == nil { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), noAnnotationErr).Inc() + return fmt.Errorf("no annotation found on statefulset %s", sts.GetName()) + } + raw, ok := anns[lastAppConfAnnKey] + if !ok || raw == "" { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), lastAppliedNotFoundErr).Inc() + return fmt.Errorf("last applied annotation not found in statefulset %s annotations", sts.GetName()) + } + + // Decode annotation JSON. + var obj map[string]any + if err := json.Unmarshal([]byte(raw), &obj); err != nil { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonDecodeErr).Inc() + return fmt.Errorf("unmarshal %s: %w", lastAppConfAnnKey, err) + } + + // Remove spec.replicas. + if spec, ok := obj["spec"].(map[string]any); ok { + if _, ok := spec["replicas"]; !ok { + c.removeLastAppliedReplicasEmptyTotal.WithLabelValues(sts.GetName()).Inc() + return nil + } + delete(spec, "replicas") + if len(spec) == 0 { + delete(obj, "spec") + } + } else { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), specNotFoundErr).Inc() + return fmt.Errorf("no spec found on statefulset %s last applied annotation", sts.GetName()) + } + + // Encode updated annotation. + newRaw, err := json.Marshal(obj) + if err != nil { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonEncodeErr).Inc() + return fmt.Errorf("marshal %s: %w", lastAppConfAnnKey, err) + } + + // Patch StatefulSet with the new annotation. + patch := fmt.Sprintf( + `{"metadata":{"annotations":{"%s":%q}}}`, + lastAppConfAnnKey, + newRaw, + ) + _, err = c.kubeClient.AppsV1(). + StatefulSets(c.namespace). + Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + if err != nil { + c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), stsPatchErr).Inc() + return err + } + c.lastAppliedReplicasRemovedTotal.WithLabelValues(sts.GetName()).Inc() + return nil +} diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index b77c98e0..f72e1327 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -19,6 +19,11 @@ import ( "github.com/grafana/rollout-operator/pkg/config" ) +const ( + idle = iota + waiting +) + func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet, client httpClient) (bool, error) { // Return early no matter what after scaling up or down a single StatefulSet to make sure that rollout-operator // works with up-to-date models. @@ -50,16 +55,16 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx var desiredReplicas int32 if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "boolean" { level.Debug(c.logger).Log("msg", "boolean scaling logic") - desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.scaleDownBoolean, c.downscaleProbeTotal, c.downscaleProbeFailureTotal) + desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.downscaleProbeTotal) } else { desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) } if err != nil { - level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", + level.Info(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", "group", groupName, "name", sts.GetName(), "currentReplicas", currentReplicas, - "referenceResourceDesiredReplicas", referenceResourceDesiredReplicas, + "desiredReplicas", referenceResourceDesiredReplicas, "err", err, ) @@ -69,6 +74,11 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx } logMsg := "" + if desiredReplicas == referenceResourceDesiredReplicas { + c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(idle)) + } else { + c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(waiting)) + } if desiredReplicas > currentReplicas { logMsg = "scaling up statefulset to match replicas in the reference resource" } else if desiredReplicas < currentReplicas { diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index 1cff1dda..657d32e7 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/url" + "strconv" "sync" "time" @@ -44,7 +45,7 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger, callCancelDelayedDownscale(ctx, logger, httpClient, endpoints) } -func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) { +func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, downscaleProbeTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) { if desiredReplicas >= currentReplicas { return desiredReplicas, nil } @@ -54,7 +55,7 @@ func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.Statefu return currentReplicas, err } downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), int(desiredReplicas), int(currentReplicas), prepareURL) - scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, scaleDownBooleanMetric, downscaleProbeTotal, downscaleProbeFailureTotal) + scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, downscaleProbeTotal) if err != nil { return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err) } @@ -160,7 +161,7 @@ func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Durat delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey] urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey] - if delayStr == "" || urlStr == "" { + if delayStr == "" || delayStr == "boolean" || urlStr == "" { return 0, nil, nil } @@ -218,7 +219,7 @@ func createPrepareDownscaleEndpoints(namespace, statefulsetName, serviceName str return eps } -func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (map[int]bool, error) { +func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, downscaleProbeTotal *prometheus.CounterVec) (map[int]bool, error) { if len(endpoints) == 0 { return nil, fmt.Errorf("no endpoints") } @@ -238,33 +239,31 @@ func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logge epLogger := log.With(logger, "pod", ep.podName, "url", target) req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, nil) - downscaleProbeTotal.WithLabelValues(ep.podName).Inc() if err != nil { level.Error(epLogger).Log("msg", "error creating HTTP POST request to endpoint", "err", err) - downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc() + downscaleProbeTotal.WithLabelValues(ep.podName, "error creating HTTP POST request to endpoint").Inc() return err } resp, err := client.Do(req) if err != nil { level.Error(epLogger).Log("msg", "error sending HTTP POST request to endpoint", "err", err) + downscaleProbeTotal.WithLabelValues(ep.podName, "error sending HTTP POST request to endpoint").Inc() return err } defer resp.Body.Close() scalableMu.Lock() + downscaleProbeTotal.WithLabelValues(ep.podName, strconv.Itoa(resp.StatusCode)).Inc() if resp.StatusCode == 200 { scalable[ep.replica] = true - scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(1) } else { if resp.StatusCode != 425 { // 425 too early - downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc() level.Error(epLogger).Log("msg", "downscale POST got unexpected status", resp.StatusCode) } scalable[ep.replica] = false - scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(0) } scalableMu.Unlock()