Skip to content

remove last applied replicas when statefulset is changed #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 110 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
Expand Down Expand Up @@ -39,6 +40,7 @@ const (
// How frequently informers should resync. This is also the frequency at which
// the operator reconciles even if no changes are made to the watched resources.
informerSyncInterval = 5 * time.Minute
lastAppConfAnnKey = "kubectl.kubernetes.io/last-applied-configuration"
)

type httpClient interface {
Expand Down Expand Up @@ -68,12 +70,17 @@ type RolloutController struct {
stopCh chan struct{}

// Metrics.
groupReconcileTotal *prometheus.CounterVec
groupReconcileFailed *prometheus.CounterVec
groupReconcileDuration *prometheus.HistogramVec
groupReconcileLastSuccess *prometheus.GaugeVec
desiredReplicas *prometheus.GaugeVec
downscaleProbeTotal *prometheus.CounterVec
groupReconcileTotal *prometheus.CounterVec
groupReconcileFailed *prometheus.CounterVec
groupReconcileDuration *prometheus.HistogramVec
groupReconcileLastSuccess *prometheus.GaugeVec
desiredReplicas *prometheus.GaugeVec
downscaleProbeTotal *prometheus.CounterVec
removeLastAppliedReplicasTotal *prometheus.CounterVec
removeLastAppliedReplicasEmptyTotal *prometheus.CounterVec
removeLastAppliedReplicasErrorTotal *prometheus.CounterVec
lastAppliedReplicasRemovedTotal *prometheus.CounterVec
downscaleState *prometheus.GaugeVec

// Keep track of discovered rollout groups. We use this information to delete metrics
// related to rollout groups that have been decommissioned.
Expand Down Expand Up @@ -137,6 +144,26 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
Name: "rollout_operator_downscale_probe_total",
Help: "Total number of downscale probes.",
}, []string{"scale_down_pod_name", "status"}),
removeLastAppliedReplicasTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_remove_last_applied_replicas_total",
Help: "Total number of removal of .spec.replicas field from last-applied-configuration annotation.",
}, []string{"statefulset_name"}),
removeLastAppliedReplicasEmptyTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_remove_last_applied_replicas_empty_total",
Help: "Total number of empty .spec.replicas field from last-applied-configuration annotation.",
}, []string{"statefulset_name"}),
removeLastAppliedReplicasErrorTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_remove_last_applied_replicas_error_total",
Help: "Total number of errors while removing .spec.replicas field from last-applied-configuration annotation.",
}, []string{"statefulset_name", "error"}),
lastAppliedReplicasRemovedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_last_applied_replicas_removed_total",
Help: "Total number of .spec.replicas fields removed from last-applied-configuration annotation.",
}, []string{"statefulset_name"}),
downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "rollout_operator_downscale_state",
Help: "State of the downscale operation.",
}, []string{"statefulset_name"}),
}

return c
Expand Down Expand Up @@ -267,6 +294,12 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
// Sort StatefulSets to provide a deterministic behaviour.
util.SortStatefulSets(sets)

for _, s := range sets {
if err := c.removeReplicasFromLastApplied(ctx, s); err != nil {
level.Error(c.logger).Log("msg", "failed to remove replicas from last-applied-configuration annotation", "statefulset", s.Name, "err", err)
}
}

// Adjust the number of replicas for each StatefulSet in the group if desired. If the number of
// replicas of any StatefulSet was adjusted, return early in order to guarantee each STS model is
// up-to-date.
Expand Down Expand Up @@ -669,3 +702,74 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st
_, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
return err
}

// removeReplicasFromLastApplied deletes .spec.replicas from the
// kubectl.kubernetes.io/last-applied-configuration annotation on a StatefulSet.
func (c *RolloutController) removeReplicasFromLastApplied(
ctx context.Context,
sts *v1.StatefulSet,
) error {
const noAnnotationErr = "NoAnnotationErr"
const lastAppliedNotFoundErr = "LastAppliedNotFoundErr"
const specNotFoundErr = "SpecNotFoundErr"
const jsonDecodeErr = "JsonDecodeErr"
const jsonEncodeErr = "JsonEncodeErr"
const stsPatchErr = "StsPatchErr"

c.removeLastAppliedReplicasTotal.WithLabelValues(sts.GetName()).Inc()
anns := sts.GetAnnotations()
if anns == nil {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), noAnnotationErr).Inc()
return fmt.Errorf("no annotation found on statefulset %s", sts.GetName())
}
raw, ok := anns[lastAppConfAnnKey]
if !ok || raw == "" {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), lastAppliedNotFoundErr).Inc()
return fmt.Errorf("last applied annotation not found in statefulset %s annotations", sts.GetName())
}

// Decode annotation JSON.
var obj map[string]any
if err := json.Unmarshal([]byte(raw), &obj); err != nil {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonDecodeErr).Inc()
return fmt.Errorf("unmarshal %s: %w", lastAppConfAnnKey, err)
}

// Remove spec.replicas.
if spec, ok := obj["spec"].(map[string]any); ok {
if _, ok := spec["replicas"]; !ok {
c.removeLastAppliedReplicasEmptyTotal.WithLabelValues(sts.GetName()).Inc()
return nil
}
delete(spec, "replicas")
if len(spec) == 0 {
delete(obj, "spec")
}
} else {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), specNotFoundErr).Inc()
return fmt.Errorf("no spec found on statefulset %s last applied annotation", sts.GetName())
}

// Encode updated annotation.
newRaw, err := json.Marshal(obj)
if err != nil {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonEncodeErr).Inc()
return fmt.Errorf("marshal %s: %w", lastAppConfAnnKey, err)
}

// Patch StatefulSet with the new annotation.
patch := fmt.Sprintf(
`{"metadata":{"annotations":{"%s":%q}}}`,
lastAppConfAnnKey,
newRaw,
)
_, err = c.kubeClient.AppsV1().
StatefulSets(c.namespace).
Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), stsPatchErr).Inc()
return err
}
c.lastAppliedReplicasRemovedTotal.WithLabelValues(sts.GetName()).Inc()
return nil
}
10 changes: 10 additions & 0 deletions pkg/controller/custom_resource_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (
"github.com/grafana/rollout-operator/pkg/config"
)

const (
idle = iota
waiting
)

func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet, client httpClient) (bool, error) {
// Return early no matter what after scaling up or down a single StatefulSet to make sure that rollout-operator
// works with up-to-date models.
Expand Down Expand Up @@ -69,6 +74,11 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx
}

logMsg := ""
if desiredReplicas == referenceResourceDesiredReplicas {
c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(idle))
} else {
c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(waiting))
}
if desiredReplicas > currentReplicas {
logMsg = "scaling up statefulset to match replicas in the reference resource"
} else if desiredReplicas < currentReplicas {
Expand Down