diff --git a/integrationtests/agent/suite_test.go b/integrationtests/agent/suite_test.go index f32f02ae73..c27d903c88 100644 --- a/integrationtests/agent/suite_test.go +++ b/integrationtests/agent/suite_test.go @@ -3,6 +3,7 @@ package agent_test import ( "context" "fmt" + "os" "path/filepath" "strings" "testing" @@ -31,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -74,9 +76,11 @@ var _ = BeforeSuite(func() { SetDefaultEventuallyTimeout(timeout) ctx, cancel = context.WithCancel(context.TODO()) + existing := os.Getenv("CI_USE_EXISTING_CLUSTER") == "true" testEnv = &envtest.Environment{ CRDDirectoryPaths: []string{filepath.Join("..", "..", "charts", "fleet-crd", "templates", "crds.yaml")}, ErrorIfCRDPathMissing: true, + UseExistingCluster: &existing, } var err error @@ -103,12 +107,28 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + driftChan := make(chan event.GenericEvent) + // Set up the bundledeployment reconciler Expect(k8sClient.Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: clusterNS}})).ToNot(HaveOccurred()) - reconciler := newReconciler(ctx, k8sManager, newLookup(resources)) + reconciler := newReconciler(ctx, k8sManager, newLookup(resources), driftChan) err = reconciler.SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred(), "failed to set up manager") + // Set up the driftdetect reconciler + driftReconciler := &controller.DriftReconciler{ + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + + Deployer: reconciler.Deployer, + Monitor: reconciler.Monitor, + DriftDetect: reconciler.DriftDetect, + + DriftChan: driftChan, + } + err = driftReconciler.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred(), "failed to set up manager") + go func() { defer GinkgoRecover() err = k8sManager.Start(ctx) @@ -124,7 +144,7 @@ var _ = AfterSuite(func() { // newReconciler creates a new BundleDeploymentReconciler that will watch for changes // in the test Fleet namespace, using configuration from the provided manager. // Resources are provided by the lookup parameter. -func newReconciler(ctx context.Context, mgr manager.Manager, lookup *lookup) *controller.BundleDeploymentReconciler { +func newReconciler(ctx context.Context, mgr manager.Manager, lookup *lookup, driftChan chan event.GenericEvent) *controller.BundleDeploymentReconciler { upstreamClient := mgr.GetClient() // re-use client, since this is a single cluster test localClient := upstreamClient @@ -174,12 +194,11 @@ func newReconciler(ctx context.Context, mgr manager.Manager, lookup *lookup) *co trigger := trigger.New(ctx, localDynamic, mgr.GetRESTMapper()) driftdetect := driftdetect.New( trigger, - upstreamClient, - mgr.GetAPIReader(), dsClient, defaultNamespace, defaultNamespace, agentScope, + driftChan, ) // Build the clean up diff --git a/internal/cmd/agent/controller/bundledeployment_controller.go b/internal/cmd/agent/controller/bundledeployment_controller.go index 4d51622736..605c52aba2 100644 --- a/internal/cmd/agent/controller/bundledeployment_controller.go +++ b/internal/cmd/agent/controller/bundledeployment_controller.go @@ -85,15 +85,13 @@ func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments/finalizers,verbs=update -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// The Reconcile function compares the state specified by -// the BundleDeployment object against the actual cluster state, and then -// performs operations to make the cluster state reflect the state specified by -// the user. +// Reconcile compares the state specified by the BundleDeployment object +// against the actual state, and decides if the bundle should be deployed. +// The deployed resources are then monitored for drift. +// It also updates the status of the BundleDeployment object with the results. // // For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/reconcile func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx).WithName("bundledeployment") ctx = log.IntoContext(ctx, logger) @@ -149,7 +147,7 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req } if monitor.ShouldUpdateStatus(bd) { - // update the bundledeployment status and check if we deploy an agent, or if we need to trigger drift correction + // update the bundledeployment status and check if we deploy an agent status, err = r.Monitor.UpdateStatus(ctx, bd, resources) if err != nil { logger.Error(err, "Cannot monitor deployed bundle") @@ -162,17 +160,6 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req bd.Status = setCondition(status, nil, monitor.Cond(fleetv1.BundleDeploymentConditionMonitored)) } - // Run drift correction - if len(status.ModifiedStatus) > 0 && bd.Spec.CorrectDrift != nil && bd.Spec.CorrectDrift.Enabled { - if release, err := r.Deployer.RemoveExternalChanges(ctx, bd); err != nil { - merr = append(merr, fmt.Errorf("failed reconciling drift: %w", err)) - // Propagate drift correction error to bundle deployment status. - monitor.Cond(fleetv1.BundleDeploymentConditionReady).SetError(&status, "", err) - } else { - bd.Status.Release = release - } - } - if len(bd.Status.ModifiedStatus) > 0 && monitor.ShouldRedeployAgent(bd) { bd.Status.AppliedDeploymentID = "" if err := r.Cleanup.OldAgent(ctx, status.ModifiedStatus); err != nil { @@ -181,7 +168,7 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req } } - // update our mini controller, which watches deployed resources for drift + // update our driftdetect mini controller, which watches deployed resources for drift err = r.DriftDetect.Refresh(ctx, req.String(), bd, resources) if err != nil { logger.V(1).Error(err, "Failed to refresh drift detection", "step", "drift") diff --git a/internal/cmd/agent/controller/drift_controller.go b/internal/cmd/agent/controller/drift_controller.go new file mode 100644 index 0000000000..18f39f4165 --- /dev/null +++ b/internal/cmd/agent/controller/drift_controller.go @@ -0,0 +1,129 @@ +package controller + +import ( + "context" + "fmt" + + "github.com/rancher/fleet/internal/cmd/agent/deployer" + "github.com/rancher/fleet/internal/cmd/agent/deployer/driftdetect" + "github.com/rancher/fleet/internal/cmd/agent/deployer/monitor" + fleetv1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + + "github.com/rancher/wrangler/v3/pkg/condition" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + errutil "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type DriftReconciler struct { + client.Client + Scheme *runtime.Scheme + + Deployer *deployer.Deployer + Monitor *monitor.Monitor + DriftDetect *driftdetect.DriftDetect + + DriftChan chan event.GenericEvent +} + +// SetupWithManager sets up the controller with the Manager. +func (r *DriftReconciler) SetupWithManager(mgr ctrl.Manager) error { + src := source.Channel(r.DriftChan, &handler.EnqueueRequestForObject{}) + return ctrl.NewControllerManagedBy(mgr). + Named("drift-reconciler"). + WatchesRawSource(src). + Complete(r) + +} + +// Reconcile is triggered via a channel from the driftdetect mini controller, +// which watches deployed resources for drift. It does so by creating a plan +// and comparing it to the current state. +// It will update the status of the BundleDeployment and correct drift if enabled. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/reconcile +func (r *DriftReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithName("drift") + ctx = log.IntoContext(ctx, logger) + + // get latest BundleDeployment from cluster + bd := &fleetv1.BundleDeployment{} + err := r.Get(ctx, req.NamespacedName, bd) + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } else if err != nil { + return ctrl.Result{}, err + } + + if bd.Spec.Paused { + logger.V(1).Info("Bundle paused, clearing drift detection") + err := r.DriftDetect.Clear(req.String()) + + return ctrl.Result{}, err + } + + merr := []error{} + + // retrieve the resources from the helm history. + // if we can't retrieve the resources, we don't need to try any of the other operations and requeue now + resources, err := r.Deployer.Resources(bd.Name, bd.Status.Release) + if err != nil { + logger.V(1).Info("Failed to retrieve bundledeployment's resources") + return ctrl.Result{}, err + } + + // return early if the bundledeployment is still being installed + if !monitor.ShouldUpdateStatus(bd) { + return ctrl.Result{}, nil + } + + // update the bundledeployment status from the helm resource list + bd.Status, err = r.Monitor.UpdateStatus(ctx, bd, resources) + if err != nil { + logger.Error(err, "Cannot monitor deployed bundle") + } + + // run drift correction + if len(bd.Status.ModifiedStatus) > 0 && bd.Spec.CorrectDrift != nil && bd.Spec.CorrectDrift.Enabled { + if release, err := r.Deployer.RemoveExternalChanges(ctx, bd); err != nil { + merr = append(merr, fmt.Errorf("failed reconciling drift: %w", err)) + // Propagate drift correction error to bundle deployment status. + condition.Cond(fleetv1.BundleDeploymentConditionReady).SetError(&bd.Status, "", err) + } else { + bd.Status.Release = release + } + } + + // final status update + logger.V(1).Info("Reconcile finished, updating the bundledeployment status") + err = r.updateStatus(ctx, req.NamespacedName, bd.Status) + if apierrors.IsNotFound(err) { + merr = append(merr, fmt.Errorf("bundledeployment has been deleted: %w", err)) + } else if err != nil { + merr = append(merr, fmt.Errorf("failed final update to bundledeployment status: %w", err)) + } + + return ctrl.Result{}, errutil.NewAggregate(merr) +} + +func (r *DriftReconciler) updateStatus(ctx context.Context, req types.NamespacedName, status fleetv1.BundleDeploymentStatus) error { + return retry.RetryOnConflict(DefaultRetry, func() error { + newBD := &fleetv1.BundleDeployment{} + err := r.Get(ctx, req, newBD) + if err != nil { + return err + } + newBD.Status = status + return r.Status().Update(ctx, newBD) + }) +} diff --git a/internal/cmd/agent/deployer/driftdetect/driftdetect.go b/internal/cmd/agent/deployer/driftdetect/driftdetect.go index 36c05dec3a..7b336b752c 100644 --- a/internal/cmd/agent/deployer/driftdetect/driftdetect.go +++ b/internal/cmd/agent/deployer/driftdetect/driftdetect.go @@ -3,17 +3,13 @@ package driftdetect import ( "context" - "github.com/go-logr/logr" - "github.com/rancher/fleet/internal/cmd/agent/deployer/desiredset" "github.com/rancher/fleet/internal/cmd/agent/trigger" "github.com/rancher/fleet/internal/helmdeployer" fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/client-go/util/retry" - "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -21,32 +17,29 @@ type DriftDetect struct { // Trigger watches deployed resources on the local cluster. trigger *trigger.Trigger - upstreamClient client.Client - upstreamReader client.Reader - desiredset *desiredset.Client defaultNamespace string labelPrefix string labelSuffix string + + driftChan chan event.GenericEvent } func New( trigger *trigger.Trigger, - upstreamClient client.Client, - upstreamReader client.Reader, desiredset *desiredset.Client, defaultNamespace string, labelPrefix string, labelSuffix string, + driftChan chan event.GenericEvent, ) *DriftDetect { return &DriftDetect{ trigger: trigger, - upstreamClient: upstreamClient, - upstreamReader: upstreamReader, desiredset: desiredset, defaultNamespace: defaultNamespace, labelPrefix: labelPrefix, labelSuffix: labelSuffix, + driftChan: driftChan, } } @@ -56,7 +49,7 @@ func (d *DriftDetect) Clear(bdKey string) error { // Refresh triggers a sync of all resources of the provided bd which may have drifted from their desired state. func (d *DriftDetect) Refresh(ctx context.Context, bdKey string, bd *fleet.BundleDeployment, resources *helmdeployer.Resources) error { - logger := log.FromContext(ctx).WithName("drift-detect") + logger := log.FromContext(ctx).WithName("drift-detect").WithValues("initialResourceVersion", bd.ResourceVersion) logger.V(1).Info("Refreshing drift detection") resources, err := d.allResources(ctx, bd, resources) @@ -68,57 +61,14 @@ func (d *DriftDetect) Refresh(ctx context.Context, bdKey string, bd *fleet.Bundl return nil } - logger.V(1).Info("Adding OnChange for bundledeployment's resource list") - logger = logger.WithValues("key", bdKey, "initialResourceVersion", bd.ResourceVersion) - - handleID := int(bd.Generation) handler := func(key string) { - logger := logger.WithValues("handleID", handleID, "triggered by", key) - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Can't enqueue directly, update bundledeployment instead - return d.requeueBD(logger, handleID, bd.Namespace, bd.Name) - }) - if err != nil { - logger.Error(err, "Failed to trigger bundledeployment", "error", err) - return - } + logger.V(1).Info("Notifying driftdetect reconciler of a resource change", "triggeredBy", key) + d.driftChan <- event.GenericEvent{Object: bd} } - return d.trigger.OnChange(bdKey, resources.DefaultNamespace, handler, resources.Objects...) -} -func (d *DriftDetect) requeueBD(logger logr.Logger, handleID int, namespace string, name string) error { - bd := &fleet.BundleDeployment{} - - err := d.upstreamReader.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, bd) - if apierrors.IsNotFound(err) { - logger.Info("Bundledeployment is not found, can't trigger refresh") - return nil - } - if err != nil { - logger.Error(err, "Failed to get bundledeployment, can't trigger refresh") - return nil - } - - logger = logger.WithValues("resourceVersion", bd.ResourceVersion) - logger.V(1).Info("Going to update bundledeployment to trigger re-sync") - - // This mechanism of triggering requeues for changes is not ideal. - // It's a workaround since we can't enqueue directly from the trigger - // mini controller. Triggering via a status update is expensive. - // It's hard to compute a stable hash to make this idempotent, because - // the hash would need to be computed over the whole change. We can't - // just use the resource version of the bundle deployment. We would - // need to look at the deployed resources and compute a hash over them. - // However this status update happens for every changed resource, maybe - // multiple times per resource. It will also trigger on a resync. - bd.Status.SyncGeneration = &[]int64{int64(handleID)}[0] - - err = d.upstreamClient.Status().Update(context.Background(), bd) - if err != nil { - logger.V(1).Info("Retry to update bundledeployment, couldn't update status to trigger re-sync", "conflict", apierrors.IsConflict(err), "error", err) - } - return err + // Adding bundledeployment's resource list to the trigger-controller's watch list + return d.trigger.OnChange(bdKey, resources.DefaultNamespace, handler, resources.Objects...) } // allResources returns the resources that are deployed by the bundle deployment, diff --git a/internal/cmd/agent/deployer/normalizers/norm.go b/internal/cmd/agent/deployer/normalizers/norm.go index 54f67825b2..b6a9050825 100644 --- a/internal/cmd/agent/deployer/normalizers/norm.go +++ b/internal/cmd/agent/deployer/normalizers/norm.go @@ -12,8 +12,8 @@ type Norm struct { } func (n Norm) Normalize(un *unstructured.Unstructured) error { - for _, normalizers := range n.normalizers { - if err := normalizers.Normalize(un); err != nil { + for _, normalizer := range n.normalizers { + if err := normalizer.Normalize(un); err != nil { return err } } diff --git a/internal/cmd/agent/operator.go b/internal/cmd/agent/operator.go index c5019d93a5..2eccc335e6 100644 --- a/internal/cmd/agent/operator.go +++ b/internal/cmd/agent/operator.go @@ -28,6 +28,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -85,6 +86,8 @@ func start(ctx context.Context, localConfig *rest.Config, systemNamespace, agent localCtx, cancel := context.WithCancel(ctx) defer cancel() + driftChan := make(chan event.GenericEvent) + reconciler, err := newReconciler( ctx, localCtx, @@ -94,6 +97,7 @@ func start(ctx context.Context, localConfig *rest.Config, systemNamespace, agent fleetNamespace, agentScope, agentConfig, + driftChan, ) if err != nil { setupLog.Error(err, "unable to set up bundledeployment reconciler") @@ -107,6 +111,22 @@ func start(ctx context.Context, localConfig *rest.Config, systemNamespace, agent } //+kubebuilder:scaffold:builder + // RawSource watches for all events from the driftdetect mini controller + driftReconciler := &controller.DriftReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + + Deployer: reconciler.Deployer, + Monitor: reconciler.Monitor, + DriftDetect: reconciler.DriftDetect, + + DriftChan: driftChan, + } + if err = driftReconciler.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "BundleDeployment") + return err + } + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") return err @@ -167,6 +187,7 @@ func newReconciler( fleetNamespace string, agentScope string, agentConfig config.Config, + driftChan chan event.GenericEvent, ) (*controller.BundleDeploymentReconciler, error) { upstreamClient := mgr.GetClient() @@ -228,12 +249,11 @@ func newReconciler( trigger := trigger.New(ctx, localDynamic, localCluster.GetRESTMapper()) driftdetect := driftdetect.New( trigger, - upstreamClient, - mgr.GetAPIReader(), ds, defaultNamespace, defaultNamespace, agentScope, + driftChan, ) // Build the clean up, which deletes helm releases diff --git a/pkg/durations/durations.go b/pkg/durations/durations.go index 5b8f0e9886..518bcee5ce 100644 --- a/pkg/durations/durations.go +++ b/pkg/durations/durations.go @@ -26,8 +26,8 @@ const ( RestConfigTimeout = time.Second * 15 ServiceTokenSleep = time.Second * 2 TokenClusterEnqueueDelay = time.Second * 2 - // TriggerSleep is the delay before the mini controller starts watching - // deployed resources for changes + // TriggerSleep is the delay before the driftdetect mini controller + // starts watching deployed resources for changes TriggerSleep = time.Second * 5 DefaultCpuPprofPeriod = time.Minute )