diff --git a/cmd/main.go b/cmd/main.go index 528702f1f..faf715458 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "net/http" @@ -9,6 +10,7 @@ import ( "time" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "github.com/codeready-toolchain/host-operator/controllers/deactivation" @@ -33,6 +35,7 @@ import ( "github.com/codeready-toolchain/host-operator/pkg/capacity" "github.com/codeready-toolchain/host-operator/pkg/cluster" "github.com/codeready-toolchain/host-operator/pkg/metrics" + "github.com/codeready-toolchain/host-operator/pkg/restart" "github.com/codeready-toolchain/host-operator/pkg/segment" "github.com/codeready-toolchain/host-operator/pkg/templates/assets" "github.com/codeready-toolchain/host-operator/pkg/templates/nstemplatetiers" @@ -202,257 +205,263 @@ func main() { // nolint:gocyclo } }() } - // Webhook server will be created with default values (port 9443) as per doc - https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/manager/manager.go#L244-L247 - // Cache Options design doc - https://github.com/kubernetes-sigs/controller-runtime/blob/main/designs/cache_options.md - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: scheme, - Metrics: metricsserver.Options{ - BindAddress: metricsAddr, - }, - HealthProbeBindAddress: probeAddr, - LeaderElection: enableLeaderElection, - LeaderElectionID: "dc07038f.toolchain.host.operator", - Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{namespace: {}}}, - }) - if err != nil { - setupLog.Error(err, "unable to start manager") - os.Exit(1) - } - memberClusters, err := addMemberClusters(mgr, cl, namespace, true) - if err != nil { - setupLog.Error(err, "") - os.Exit(1) - } - // Setup all Controllers - if err = (&toolchainclusterresources.Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Templates: &deploy.ToolchainClusterTemplateFS, - }).SetupWithManager(mgr, namespace); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "ToolchainClusterResources") - os.Exit(1) - } - if err = toolchainclustercache.NewReconciler( - mgr, - namespace, - memberClientTimeout, - ).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "ToolchainClusterCache") - os.Exit(1) - } + startManager := &restart.StartManager{} + startManager.InitializeManager = func(ctx context.Context) (manager.Manager, error) { + // Webhook server will be created with default values (port 9443) as per doc - https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/manager/manager.go#L244-L247 + // Cache Options design doc - https://github.com/kubernetes-sigs/controller-runtime/blob/main/designs/cache_options.md + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + Metrics: metricsserver.Options{ + BindAddress: metricsAddr, + }, + HealthProbeBindAddress: probeAddr, + LeaderElection: enableLeaderElection, + LeaderElectionID: "dc07038f.toolchain.host.operator", + Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{namespace: {}}}, + }) + if err != nil { + setupLog.Error(err, "unable to start manager") + os.Exit(1) + } + memberClusters, err := addMemberClusters(mgr, cl, namespace, true) + if err != nil { + setupLog.Error(err, "") + os.Exit(1) + } - if err := (&toolchaincluster.Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - RequeAfter: 10 * time.Second, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "ToolchainCluster") - os.Exit(1) - } + // Setup all Controllers + if err = (&toolchainclusterresources.Reconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Templates: &deploy.ToolchainClusterTemplateFS, + }).SetupWithManager(mgr, namespace); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ToolchainClusterResources") + os.Exit(1) + } + if err = toolchainclustercache.NewReconciler( + mgr, + namespace, + memberClientTimeout, + ).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ToolchainClusterCache") + os.Exit(1) + } - if err := (&deactivation.Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "Deactivation") - os.Exit(1) - } - if err := (&masteruserrecord.Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Namespace: namespace, - MemberClusters: memberClusters, - }).SetupWithManager(mgr, memberClusters); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "MasterUserRecord") - os.Exit(1) - } - if err := (¬ification.Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr, crtConfig); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "Notification") - os.Exit(1) - } - if err := (&nstemplatetier.Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "NSTemplateTier") - os.Exit(1) - } - if err = (&nstemplatetierrevisioncleanup.Reconciler{ - Client: mgr.GetClient(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "NSTemplatTierRevisionCleanup") - os.Exit(1) - } - if err := (&toolchainconfig.Reconciler{ - Client: mgr.GetClient(), - GetMembersFunc: commoncluster.GetMemberClusters, - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "ToolchainConfig") - os.Exit(1) - } + if err := (&toolchaincluster.Reconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + RequeAfter: 10 * time.Second, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ToolchainCluster") + os.Exit(1) + } - if err := (&toolchainstatus.Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - HTTPClientImpl: &http.Client{}, - VersionCheckManager: status.VersionCheckManager{GetGithubClientFunc: commonclient.NewGitHubClient}, - GetMembersFunc: commoncluster.GetMemberClusters, - Namespace: namespace, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "ToolchainStatus") - os.Exit(1) - } - if err := (&usersignup.Reconciler{ - StatusUpdater: &usersignup.StatusUpdater{ + if err := (&deactivation.Reconciler{ Client: mgr.GetClient(), - }, - Namespace: namespace, - Scheme: mgr.GetScheme(), - SegmentClient: segmentClient, - ClusterManager: capacity.NewClusterManager(namespace, mgr.GetClient()), - }).SetupWithManager(ctx, mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "UserSignup") - os.Exit(1) - } - if err := (&usersignupcleanup.Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "UserSignupCleanup") - os.Exit(1) - } - // init cluster scoped member cluster clients - clusterScopedMemberClusters, err := addMemberClusters(mgr, cl, namespace, false) - if err != nil { - setupLog.Error(err, "") - os.Exit(1) - } - - // enable space request - if crtConfig.SpaceConfig().SpaceRequestIsEnabled() { - if err = (&spacerequest.Reconciler{ + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Deactivation") + os.Exit(1) + } + if err := (&masteruserrecord.Reconciler{ Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), Namespace: namespace, - MemberClusters: clusterScopedMemberClusters, + MemberClusters: memberClusters, + }).SetupWithManager(mgr, memberClusters); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "MasterUserRecord") + os.Exit(1) + } + if err := (¬ification.Reconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr, crtConfig); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Notification") + os.Exit(1) + } + if err := (&nstemplatetier.Reconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "NSTemplateTier") + os.Exit(1) + } + if err = (&nstemplatetierrevisioncleanup.Reconciler{ + Client: mgr.GetClient(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "NSTemplatTierRevisionCleanup") + os.Exit(1) + } + if err := (&toolchainconfig.Reconciler{ + Client: mgr.GetClient(), + GetMembersFunc: commoncluster.GetMemberClusters, Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr, clusterScopedMemberClusters); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "SpaceRequest") + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ToolchainConfig") os.Exit(1) } - } - // enable space binding request - if crtConfig.SpaceConfig().SpaceBindingRequestIsEnabled() { - if err = (&spacebindingrequest.Reconciler{ - Client: mgr.GetClient(), + if err := (&toolchainstatus.Reconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + HTTPClientImpl: &http.Client{}, + VersionCheckManager: status.VersionCheckManager{GetGithubClientFunc: commonclient.NewGitHubClient}, + GetMembersFunc: commoncluster.GetMemberClusters, + Namespace: namespace, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ToolchainStatus") + os.Exit(1) + } + if err := (&usersignup.Reconciler{ + StatusUpdater: &usersignup.StatusUpdater{ + Client: mgr.GetClient(), + }, Namespace: namespace, - MemberClusters: clusterScopedMemberClusters, Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr, clusterScopedMemberClusters); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "SpaceBindingRequest") + SegmentClient: segmentClient, + ClusterManager: capacity.NewClusterManager(namespace, mgr.GetClient()), + }).SetupWithManager(ctx, mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "UserSignup") os.Exit(1) } - } - if err = (&space.Reconciler{ - Client: mgr.GetClient(), - Namespace: namespace, - MemberClusters: memberClusters, - }).SetupWithManager(mgr, memberClusters); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "Space") - os.Exit(1) - } - if err = (&spacecompletion.Reconciler{ - Client: mgr.GetClient(), - Namespace: namespace, - ClusterManager: capacity.NewClusterManager(namespace, mgr.GetClient()), - }).SetupWithManager(ctx, mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "SpaceCompletion") - os.Exit(1) - } - if err = (&spacebindingcleanup.Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Namespace: namespace, - MemberClusters: clusterScopedMemberClusters, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "SpaceBindingCleanup") - os.Exit(1) - } - if err = (&spacecleanup.Reconciler{ - Client: mgr.GetClient(), - Namespace: namespace, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "SpaceCleanup") - os.Exit(1) - } - if err = (&socialevent.Reconciler{ - Client: mgr.GetClient(), - Namespace: namespace, - StatusUpdater: &socialevent.StatusUpdater{ + if err := (&usersignupcleanup.Reconciler{ Client: mgr.GetClient(), - }, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "SocialEvent") - os.Exit(1) - } - if err = (&spaceprovisionerconfig.Reconciler{ - Client: mgr.GetClient(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "SpaceProvisionerConfig") - os.Exit(1) - } - //+kubebuilder:scaffold:builder - - go func() { - setupLog.Info("Starting cluster health checker & creating/updating the NSTemplateTier resources once cache is sync'd") - if !mgr.GetCache().WaitForCacheSync(ctx) { - setupLog.Error(errors.New("timed out waiting for caches to sync"), "") + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "UserSignupCleanup") os.Exit(1) } - - // create or update Toolchain status during the operator deployment - setupLog.Info("Creating/updating the ToolchainStatus resource") - if err := toolchainstatus.CreateOrUpdateResources(ctx, mgr.GetClient(), namespace, toolchainconfig.ToolchainStatusName); err != nil { - setupLog.Error(err, "cannot create/update ToolchainStatus resource") + // init cluster scoped member cluster clients + clusterScopedMemberClusters, err := addMemberClusters(mgr, cl, namespace, false) + if err != nil { + setupLog.Error(err, "") os.Exit(1) } - setupLog.Info("Created/updated the ToolchainStatus resource") - // create or update all NSTemplateTiers on the cluster at startup - setupLog.Info("Creating/updating the NSTemplateTier resources") - nstemplatetierAssets := assets.NewAssets(nstemplatetiers.AssetNames, nstemplatetiers.Asset) - if err := nstemplatetiers.CreateOrUpdateResources(ctx, mgr.GetScheme(), mgr.GetClient(), namespace, nstemplatetierAssets); err != nil { - setupLog.Error(err, "") + // enable space request + if crtConfig.SpaceConfig().SpaceRequestIsEnabled() { + if err = (&spacerequest.Reconciler{ + Client: mgr.GetClient(), + Namespace: namespace, + MemberClusters: clusterScopedMemberClusters, + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr, clusterScopedMemberClusters); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "SpaceRequest") + os.Exit(1) + } + } + + // enable space binding request + if crtConfig.SpaceConfig().SpaceBindingRequestIsEnabled() { + if err = (&spacebindingrequest.Reconciler{ + Client: mgr.GetClient(), + Namespace: namespace, + MemberClusters: clusterScopedMemberClusters, + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr, clusterScopedMemberClusters); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "SpaceBindingRequest") + os.Exit(1) + } + } + if err = (&space.Reconciler{ + Client: mgr.GetClient(), + Namespace: namespace, + MemberClusters: memberClusters, + }).SetupWithManager(mgr, memberClusters); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Space") + os.Exit(1) + } + if err = (&spacecompletion.Reconciler{ + Client: mgr.GetClient(), + Namespace: namespace, + ClusterManager: capacity.NewClusterManager(namespace, mgr.GetClient()), + }).SetupWithManager(ctx, mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "SpaceCompletion") + os.Exit(1) + } + if err = (&spacebindingcleanup.Reconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Namespace: namespace, + MemberClusters: clusterScopedMemberClusters, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "SpaceBindingCleanup") + os.Exit(1) + } + if err = (&spacecleanup.Reconciler{ + Client: mgr.GetClient(), + Namespace: namespace, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "SpaceCleanup") + os.Exit(1) + } + if err = (&socialevent.Reconciler{ + Client: mgr.GetClient(), + Namespace: namespace, + StatusUpdater: &socialevent.StatusUpdater{ + Client: mgr.GetClient(), + }, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "SocialEvent") os.Exit(1) } - setupLog.Info("Created/updated the NSTemplateTier resources") + if err = (&spaceprovisionerconfig.Reconciler{ + Client: mgr.GetClient(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "SpaceProvisionerConfig") + os.Exit(1) + } + //+kubebuilder:scaffold:builder - // create or update all UserTiers on the cluster at startup - setupLog.Info("Creating/updating the UserTier resources") - usertierAssets := assets.NewAssets(usertiers.AssetNames, usertiers.Asset) - if err := usertiers.CreateOrUpdateResources(ctx, mgr.GetScheme(), mgr.GetClient(), namespace, usertierAssets); err != nil { - setupLog.Error(err, "") + go func() { + setupLog.Info("Starting cluster health checker & creating/updating the NSTemplateTier resources once cache is sync'd") + if !mgr.GetCache().WaitForCacheSync(ctx) { + setupLog.Error(errors.New("timed out waiting for caches to sync"), "") + os.Exit(1) + } + + // create or update Toolchain status during the operator deployment + setupLog.Info("Creating/updating the ToolchainStatus resource") + if err := toolchainstatus.CreateOrUpdateResources(ctx, mgr.GetClient(), namespace, toolchainconfig.ToolchainStatusName); err != nil { + setupLog.Error(err, "cannot create/update ToolchainStatus resource") + os.Exit(1) + } + setupLog.Info("Created/updated the ToolchainStatus resource") + + // create or update all NSTemplateTiers on the cluster at startup + setupLog.Info("Creating/updating the NSTemplateTier resources") + nstemplatetierAssets := assets.NewAssets(nstemplatetiers.AssetNames, nstemplatetiers.Asset) + if err := nstemplatetiers.CreateOrUpdateResources(ctx, mgr.GetScheme(), mgr.GetClient(), namespace, nstemplatetierAssets); err != nil { + setupLog.Error(err, "") + os.Exit(1) + } + setupLog.Info("Created/updated the NSTemplateTier resources") + + // create or update all UserTiers on the cluster at startup + setupLog.Info("Creating/updating the UserTier resources") + usertierAssets := assets.NewAssets(usertiers.AssetNames, usertiers.Asset) + if err := usertiers.CreateOrUpdateResources(ctx, mgr.GetScheme(), mgr.GetClient(), namespace, usertierAssets); err != nil { + setupLog.Error(err, "") + os.Exit(1) + } + setupLog.Info("Created/updated the UserTier resources") + }() + + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up health check") + os.Exit(1) + } + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up ready check") os.Exit(1) } - setupLog.Info("Created/updated the UserTier resources") - }() - if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up health check") - os.Exit(1) - } - if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up ready check") - os.Exit(1) + setupLog.Info("starting manager") + return mgr, nil } - setupLog.Info("starting manager") - if err := mgr.Start(ctx); err != nil { + if err := startManager.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/pkg/restart/restart.go b/pkg/restart/restart.go new file mode 100644 index 000000000..81988b31c --- /dev/null +++ b/pkg/restart/restart.go @@ -0,0 +1,173 @@ +package restart + +import ( + "context" + "errors" + "sync" + + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +var ( + errRestartNeeded = errors.New("restart needed called") + errAlreadyRunning = errors.New("controller manager already running") + + _ Restarter = (*StartManager)(nil) +) + +type InitializeManagerFunc func(context.Context) (manager.Manager, error) + +// Restarter is a thing that can be told that a restart is needed. +// The StartManager is the sole implementation at runtime, but this interface +// should be used at use-sites to help with unit testing that a restart was requested. +type Restarter interface { + RestartNeeded() +} + +// StartManager manages the lifecycle of the controller manager. +// It it supplied a function that initializes the manager (and which MUST NOT start it). +// StartManager then can be used to start the controller manager and can also +// be asked to restart it. +type StartManager struct { + InitializeManager InitializeManagerFunc + + running bool + lock sync.Mutex + runningContext context.Context + cancelFunc context.CancelCauseFunc +} + +func debug(message string) { + // fmt.Println(message) +} + +func (s *StartManager) Start(ctx context.Context) error { + for { + debug("Start: start loop enter") + startErr, finished := s.doStart(ctx) + + if s.runningContext == nil { + err := <-startErr + debug("Start: start failed before initializing the running context, quitting") + return err + } + + debug("Start: selecting on cancellation and error channels") + select { + case <-s.runningContext.Done(): + { + debug("Start: context cancelled, waiting for finish signal") + // The context is cancelled so the manager is shutting down. Let's wait for it to actually finish running before allowing + // ourselves to continue and eventually allowing another round of manager run. + <-finished + + debug("Start: context cancelled, finish signal received") + + // the manager is not running anymore, now we can be sure of it. Let's record that fact in our state to keep the records + // straight. + debug("Start: context cancelled, about to set running=false") + s.lock.Lock() + debug("Start: locked") + s.running = false + s.lock.Unlock() + debug("Start: unlocked") + debug("Start: context cancelled, running=false is set") + + // now, let's see what caused the cancellation + + if !errors.Is(context.Cause(s.runningContext), errRestartNeeded) { + // this can only happen if the passed-in context is cancellable and is itself cancelled. + // In this case, the error, if any, is retrievable from the context and therefore should NOT be returned + // from this function. + debug("Start: context cancelled, cancellation from above, quitting") + return nil + } + + // The conext has been cancelled and therefore the controller manager gracefully shut down. + // We actually waited for the start function to finish and cleared our running state. + // We also detected that the restart is needed by inspecting the cancellation cause. + // + // All the conditions for the restart are satisfied, so we can just start another loop that will start the manager + // anew. + debug("Start: context cancelled, restart request detected, entering another start loop") + } + case err := <-startErr: + debug("Start: error received, quitting") + return err + } + } +} + +func (s *StartManager) doStart(ctx context.Context) (<-chan error, <-chan struct{}) { + debug("doStart: enter") + s.lock.Lock() + defer func() { + s.lock.Unlock() + debug("doStart: unlocked") + }() + + debug("doStart: locked") + + s.runningContext = nil + s.cancelFunc = nil + + // we cannot guarantee the order in which the caller will wait for these channels vs when the channels are pushed to + // in this method (because we start the manager in a co-routine, or, actually, when the error is sent to before the + // control flow returns to the caller). + // We can guarantee, that the caller will eventually expect a message from these channels. + // Therefore, make the channels with a buffer of size 1, so that we can send a message here before the caller + // starts listening. + err := make(chan error, 1) + finished := make(chan struct{}, 1) + + if s.running { + debug("doStart: already running") + err <- errAlreadyRunning + return err, finished + } + + s.runningContext, s.cancelFunc = context.WithCancelCause(ctx) + + debug("doStart: calling InitializeManager") + mgr, initErr := s.InitializeManager(s.runningContext) + if initErr != nil { + debug("doStart: InitializeManager failed") + err <- initErr + return err, finished + } + go func() { + debug("doStart: about to invoke mgr.Start()") + if startErr := mgr.Start(s.runningContext); startErr != nil { + debug("doStart: mgr.Start() returned error. Sending it.") + err <- startErr + } + debug("doStart: sending finished message") + finished <- struct{}{} + }() + + debug("doStart: setting running=true") + s.running = true + return err, finished +} + +func (s *StartManager) RestartNeeded() { + go func() { + debug("restartNeeded: enter") + s.lock.Lock() + defer func() { + s.lock.Unlock() + debug("restartNeeded: unlocked") + }() + + debug("restartNeeded: locked") + + if s.cancelFunc == nil { + // we're not running yet + debug("restartNeeded: not running yet") + return + } + + debug("restartNeeded: calling restart function") + s.cancelFunc(errRestartNeeded) + }() +} diff --git a/pkg/restart/restart_test.go b/pkg/restart/restart_test.go new file mode 100644 index 000000000..f3fb90669 --- /dev/null +++ b/pkg/restart/restart_test.go @@ -0,0 +1,226 @@ +package restart + +import ( + "context" + "errors" + "net/http" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/config" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +type MockManager struct { + startFn func() + afterRun func() + afterContextCancelledFn func() + waitUntilContextCancelled bool + errorToReturn error +} + +func TestStartManager(t *testing.T) { + t.Run("returns no error when manager returns with no error after context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + startManager := &StartManager{InitializeManager: func(context.Context) (manager.Manager, error) { + return &MockManager{waitUntilContextCancelled: true}, nil + }} + + doneCh := make(chan struct{}) + returnedError := errors.New("unexpected error") + go func() { + returnedError = startManager.Start(ctx) + doneCh <- struct{}{} + }() + + cancel() + + <-doneCh + + assert.NoError(t, returnedError) + }) + + t.Run("returns error when manager fails to start", func(t *testing.T) { + errToReturn := errors.New("an error") + startManager := &StartManager{InitializeManager: func(context.Context) (manager.Manager, error) { + return &MockManager{errorToReturn: errToReturn}, nil + }} + + assert.Same(t, errToReturn, startManager.Start(context.TODO())) + }) + + t.Run("manager can be restarted", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + doneCh := make(chan struct{}) + startCh := make(chan struct{}) + startManager := &StartManager{InitializeManager: func(context.Context) (manager.Manager, error) { + return &MockManager{ + waitUntilContextCancelled: true, + startFn: func() { + startCh <- struct{}{} + }, + afterContextCancelledFn: func() { + doneCh <- struct{}{} + }, + }, nil + }} + + waitForResult := make(chan struct{}) + returnedError := errors.New("unexpected error") + go func() { + returnedError = startManager.Start(ctx) + waitForResult <- struct{}{} + }() + + // wait until we know the manager is running + <-startCh + + // we check that the manager restarted by the virtue of receiving a message to the startCh channel and checking + // that doneCh receives the message due to the cancellation of the context. + startManager.RestartNeeded() + <-doneCh + <-startCh + + startManager.RestartNeeded() + <-doneCh + <-startCh + + startManager.RestartNeeded() + <-doneCh + <-startCh + + assert.Empty(t, doneCh) + + cancel() + <-doneCh + + <-waitForResult + assert.Empty(t, doneCh) + assert.Empty(t, startCh) + assert.NoError(t, returnedError) + }) +} + +// Add implements manager.Manager. +func (m *MockManager) Add(manager.Runnable) error { + return nil +} + +// AddHealthzCheck implements manager.Manager. +func (m *MockManager) AddHealthzCheck(name string, check healthz.Checker) error { + return nil +} + +// AddMetricsExtraHandler implements manager.Manager. +func (m *MockManager) AddMetricsExtraHandler(path string, handler http.Handler) error { + return nil +} + +// AddReadyzCheck implements manager.Manager. +func (m *MockManager) AddReadyzCheck(name string, check healthz.Checker) error { + return nil +} + +// Elected implements manager.Manager. +func (m *MockManager) Elected() <-chan struct{} { + return nil +} + +// GetAPIReader implements manager.Manager. +func (m *MockManager) GetAPIReader() client.Reader { + return nil +} + +// GetCache implements manager.Manager. +func (m *MockManager) GetCache() cache.Cache { + return nil +} + +// GetClient implements manager.Manager. +func (m *MockManager) GetClient() client.Client { + return nil +} + +// GetConfig implements manager.Manager. +func (m *MockManager) GetConfig() *rest.Config { + return nil +} + +// GetControllerOptions implements manager.Manager. +func (m *MockManager) GetControllerOptions() config.Controller { + return config.Controller{} +} + +// GetEventRecorderFor implements manager.Manager. +func (m *MockManager) GetEventRecorderFor(name string) record.EventRecorder { + return nil +} + +// GetFieldIndexer implements manager.Manager. +func (m *MockManager) GetFieldIndexer() client.FieldIndexer { + return nil +} + +// GetHTTPClient implements manager.Manager. +func (m *MockManager) GetHTTPClient() *http.Client { + return nil +} + +// GetLogger implements manager.Manager. +func (m *MockManager) GetLogger() logr.Logger { + return logr.Logger{} +} + +// GetRESTMapper implements manager.Manager. +func (m *MockManager) GetRESTMapper() meta.RESTMapper { + return nil +} + +// GetScheme implements manager.Manager. +func (m *MockManager) GetScheme() *runtime.Scheme { + return nil +} + +// GetWebhookServer implements manager.Manager. +func (m *MockManager) GetWebhookServer() webhook.Server { + return nil +} + +// AddMetricsServerExtraHandler implements manager.Manager. +func (m *MockManager) AddMetricsServerExtraHandler(path string, handler http.Handler) error { + return nil +} + +// Start implements manager.Manager. +func (m *MockManager) Start(ctx context.Context) error { + defer func() { + if m.afterRun != nil { + m.afterRun() + } + }() + + if m.startFn != nil { + m.startFn() + } + + if m.waitUntilContextCancelled { + <-ctx.Done() + if m.afterContextCancelledFn != nil { + m.afterContextCancelledFn() + } + // controller manager never returns an error after context cancellation. + return nil + } + return m.errorToReturn +} + +var _ manager.Manager = (*MockManager)(nil)