From f034ec7fe00fb02c9c65e4f3b97085981bf2072c Mon Sep 17 00:00:00 2001 From: Danil-Grigorev Date: Fri, 7 Feb 2025 13:06:57 +0100 Subject: [PATCH] Implement Prioriy and Serial settings for discovered handlers Signed-off-by: Danil-Grigorev --- ...ime.cluster.x-k8s.io_extensionconfigs.yaml | 9 +++ .../api/v1alpha1/extensionconfig_types.go | 8 ++ .../hooks/api/v1alpha1/discovery_types.go | 8 ++ .../api/v1alpha1/zz_generated.openapi.go | 14 ++++ .../extensionconfig_controller_test.go | 6 ++ exp/runtime/server/server.go | 8 ++ internal/runtime/client/client.go | 18 +++++ internal/runtime/client/client_test.go | 79 ++++++++++++++++++- internal/runtime/registry/registry.go | 15 ++++ test/e2e/cluster_upgrade.go | 10 ++- test/e2e/cluster_upgrade_runtimesdk.go | 33 ++++++-- test/extension/handlers/lifecycle/handlers.go | 25 ++++++ test/extension/main.go | 11 +++ test/framework/cluster_topology_helpers.go | 6 +- 14 files changed, 237 insertions(+), 13 deletions(-) diff --git a/config/crd/bases/runtime.cluster.x-k8s.io_extensionconfigs.yaml b/config/crd/bases/runtime.cluster.x-k8s.io_extensionconfigs.yaml index f56847de914e..145d4ba7817d 100644 --- a/config/crd/bases/runtime.cluster.x-k8s.io_extensionconfigs.yaml +++ b/config/crd/bases/runtime.cluster.x-k8s.io_extensionconfigs.yaml @@ -228,6 +228,11 @@ spec: name: description: name is the unique name of the ExtensionHandler. type: string + priority: + description: Priority defines the order in which this handler + will be invoked. Hooks are executed in the descending order. + format: int32 + type: integer requestHook: description: requestHook defines the versioned runtime hook which this ExtensionHandler serves. @@ -243,6 +248,10 @@ spec: - apiVersion - hook type: object + serial: + description: Serial defines if the blocked hook is allowed to + run in parallel with others. + type: boolean timeoutSeconds: description: |- timeoutSeconds defines the timeout duration for client calls to the ExtensionHandler. diff --git a/exp/runtime/api/v1alpha1/extensionconfig_types.go b/exp/runtime/api/v1alpha1/extensionconfig_types.go index 2a62ebf363ad..fe929a2a4426 100644 --- a/exp/runtime/api/v1alpha1/extensionconfig_types.go +++ b/exp/runtime/api/v1alpha1/extensionconfig_types.go @@ -131,6 +131,14 @@ type ExtensionHandler struct { // Defaults to Fail if not set. // +optional FailurePolicy *FailurePolicy `json:"failurePolicy,omitempty"` + + // Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order. + // +optional + Priority int32 `json:"priority,omitempty"` + + // Serial defines if the blocked hook is allowed to run in parallel with others. + // +optional + Serial bool `json:"serial,omitempty"` } // GroupVersionHook defines the runtime hook when the ExtensionHandler is called. diff --git a/exp/runtime/hooks/api/v1alpha1/discovery_types.go b/exp/runtime/hooks/api/v1alpha1/discovery_types.go index 61d0ac395505..963ce1619463 100644 --- a/exp/runtime/hooks/api/v1alpha1/discovery_types.go +++ b/exp/runtime/hooks/api/v1alpha1/discovery_types.go @@ -63,6 +63,14 @@ type ExtensionHandler struct { // failurePolicy defines how failures in calls to the ExtensionHandler should be handled by a client. // This is defaulted to FailurePolicyFail if not defined. FailurePolicy *FailurePolicy `json:"failurePolicy,omitempty"` + + // Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order. + // +optional + Priority int32 `json:"priority,omitempty"` + + // Serial defines if the blocked hook is allowed to run in parallel with others. + // +optional + Serial bool `json:"serial,omitempty"` } // GroupVersionHook defines the runtime hook when the ExtensionHandler is called. diff --git a/exp/runtime/hooks/api/v1alpha1/zz_generated.openapi.go b/exp/runtime/hooks/api/v1alpha1/zz_generated.openapi.go index 69066c1b0658..fd88257e76b9 100644 --- a/exp/runtime/hooks/api/v1alpha1/zz_generated.openapi.go +++ b/exp/runtime/hooks/api/v1alpha1/zz_generated.openapi.go @@ -1317,6 +1317,20 @@ func schema_runtime_hooks_api_v1alpha1_ExtensionHandler(ref common.ReferenceCall Format: "", }, }, + "priority": { + SchemaProps: spec.SchemaProps{ + Description: "Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order.", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "serial": { + SchemaProps: spec.SchemaProps{ + Description: "Serial defines if the blocked hook is allowed to run in parallel with others.", + Type: []string{"boolean"}, + Format: "", + }, + }, }, Required: []string{"name", "requestHook"}, }, diff --git a/exp/runtime/internal/controllers/extensionconfig_controller_test.go b/exp/runtime/internal/controllers/extensionconfig_controller_test.go index c6d93fb4b67e..47229b7762ee 100644 --- a/exp/runtime/internal/controllers/extensionconfig_controller_test.go +++ b/exp/runtime/internal/controllers/extensionconfig_controller_test.go @@ -229,6 +229,10 @@ func TestExtensionReconciler_discoverExtensionConfig(t *testing.T) { handlers := discoveredExtensionConfig.Status.Handlers g.Expect(handlers).To(HaveLen(1)) g.Expect(handlers[0].Name).To(Equal("first.ext1")) + g.Expect(handlers[0].RequestHook.Hook).To(Equal("FakeHook")) + g.Expect(handlers[0].RequestHook.APIVersion).To(Equal("test.runtime.cluster.x-k8s.io/v1alpha1")) + g.Expect(handlers[0].Serial).To(BeTrue()) + g.Expect(handlers[0].Priority).To(Equal(int32(100))) // Expect exactly one condition and expect the condition to have type RuntimeExtensionDiscoveredCondition and // Status true. @@ -345,6 +349,8 @@ func discoveryHandler(handlerList ...string) func(http.ResponseWriter, *http.Req Hook: "FakeHook", APIVersion: fakev1alpha1.GroupVersion.String(), }, + Serial: true, + Priority: 100, }) } response := &runtimehooksv1.DiscoveryResponse{ diff --git a/exp/runtime/server/server.go b/exp/runtime/server/server.go index 540636bc04d3..043f3251ff89 100644 --- a/exp/runtime/server/server.go +++ b/exp/runtime/server/server.go @@ -150,6 +150,12 @@ type ExtensionHandler struct { // If left undefined, this will be defaulted to FailurePolicyFail when processing the answer to the discovery // call for this server. FailurePolicy *runtimehooksv1.FailurePolicy + + // Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order. + Priority int32 + + // Serial defines if the blocked hook is allowed to run in parallel with others. + Serial bool } // AddExtensionHandler adds an extension handler to the server. @@ -268,6 +274,8 @@ func discoveryHandler(handlers map[string]ExtensionHandler) func(context.Context }, TimeoutSeconds: handler.TimeoutSeconds, FailurePolicy: handler.FailurePolicy, + Priority: handler.Priority, + Serial: handler.Serial, }) } diff --git a/internal/runtime/client/client.go b/internal/runtime/client/client.go index cb98d45bdd4a..c0640b00f9dc 100644 --- a/internal/runtime/client/client.go +++ b/internal/runtime/client/client.go @@ -143,6 +143,8 @@ func (c *client) Discover(ctx context.Context, extensionConfig *runtimev1.Extens }, TimeoutSeconds: handler.TimeoutSeconds, FailurePolicy: (*runtimev1.FailurePolicy)(handler.FailurePolicy), + Priority: handler.Priority, + Serial: handler.Serial, }, ) } @@ -193,6 +195,7 @@ func (c *client) CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook log.V(4).Info(fmt.Sprintf("Calling all extensions of hook %q", hookName)) responses := []runtimehooksv1.ResponseObject{} + retry := false for _, registration := range registrations { // Creates a new instance of the response parameter. responseObject, err := c.catalog.NewResponse(gvh) @@ -212,13 +215,28 @@ func (c *client) CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook continue } + if registration.Serial && retry { + log.V(5).Info(fmt.Sprintf("Serial handler %q waits for blocking response to complete", registration.Name)) + break + } + err = c.CallExtension(ctx, hook, forObject, registration.Name, request, tmpResponse) // If one of the extension handlers fails lets short-circuit here and return early. if err != nil { log.Error(err, "failed to call extension handlers") return errors.Wrapf(err, "failed to call extension handlers for hook %q", gvh.GroupHook()) } + + if retryResponse, isRetry := tmpResponse.(runtimehooksv1.RetryResponseObject); isRetry && !retry && retryResponse.GetRetryAfterSeconds() > 0 { + retry = isRetry + } + responses = append(responses, tmpResponse) + + if registration.Serial && retry { + log.V(5).Info(fmt.Sprintf("Serial handler %q is blocking hook until it is completed", registration.Name)) + break + } } // Aggregate all responses into a single response. diff --git a/internal/runtime/client/client_test.go b/internal/runtime/client/client_test.go index 72be50e66fac..008b3ac8ac83 100644 --- a/internal/runtime/client/client_test.go +++ b/internal/runtime/client/client_test.go @@ -984,6 +984,17 @@ func TestClient_CallAllExtensions(t *testing.T) { }, } + secondBlockingConfig := extensionConfig.DeepCopy() + secondBlockingConfig.Status.Handlers[0].Priority = 2 + secondBlockingConfig.Status.Handlers[1].Priority = 1 + secondBlockingConfig.Status.Handlers[1].Serial = true + secondBlockingConfig.Status.Handlers[0].RequestHook.Hook = "RetryableFakeHook" + secondBlockingConfig.Status.Handlers[1].RequestHook.Hook = "RetryableFakeHook" + secondBlockingConfig.Status.Handlers[2].RequestHook.Hook = "RetryableFakeHook" + + secondBlockingWithPriorityConfig := secondBlockingConfig.DeepCopy() + secondBlockingWithPriorityConfig.Status.Handlers[1].Priority = 3 + type args struct { hook runtimecatalog.Hook request runtimehooksv1.RequestObject @@ -1072,6 +1083,55 @@ func TestClient_CallAllExtensions(t *testing.T) { }, wantErr: true, }, + { + name: "should succeed and wait on previous blocking responses for serial handler", + registeredExtensionConfigs: []runtimev1.ExtensionConfig{*secondBlockingConfig}, + testServer: testServerConfig{ + start: true, + responses: map[string]testServerResponse{ + "/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/first-extension.*": retryResponse(runtimehooksv1.ResponseStatusSuccess, 1), + // second and third extension has no handler. + }, + }, + args: args{ + hook: fakev1alpha1.RetryableFakeHook, + request: &fakev1alpha1.RetryableFakeRequest{}, + response: &fakev1alpha1.RetryableFakeResponse{}, + }, + }, + { + name: "should succeed and wait on blocking serial handler", + registeredExtensionConfigs: []runtimev1.ExtensionConfig{*secondBlockingConfig}, + testServer: testServerConfig{ + start: true, + responses: map[string]testServerResponse{ + "/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/first-extension.*": response(runtimehooksv1.ResponseStatusSuccess), + "/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/second-extension.*": retryResponse(runtimehooksv1.ResponseStatusSuccess, 1), + // third-extension has no handler. + }, + }, + args: args{ + hook: fakev1alpha1.RetryableFakeHook, + request: &fakev1alpha1.RetryableFakeRequest{}, + response: &fakev1alpha1.RetryableFakeResponse{}, + }, + }, + { + name: "should succeed and wait on blocking serial handler, which is called with priority", + registeredExtensionConfigs: []runtimev1.ExtensionConfig{*secondBlockingWithPriorityConfig}, + testServer: testServerConfig{ + start: true, + responses: map[string]testServerResponse{ + "/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/second-extension.*": retryResponse(runtimehooksv1.ResponseStatusSuccess, 1), + // second and third extension has no handler. + }, + }, + args: args{ + hook: fakev1alpha1.RetryableFakeHook, + request: &fakev1alpha1.RetryableFakeRequest{}, + response: &fakev1alpha1.RetryableFakeResponse{}, + }, + }, { name: "should fail when one of the ExtensionHandlers returns 404", registeredExtensionConfigs: []runtimev1.ExtensionConfig{extensionConfig}, @@ -1317,6 +1377,20 @@ func response(status runtimehooksv1.ResponseStatus) testServerResponse { } } +func retryResponse(status runtimehooksv1.ResponseStatus, retrySeconds int32) testServerResponse { + return testServerResponse{ + response: &fakev1alpha1.RetryableFakeResponse{ + CommonRetryResponse: runtimehooksv1.CommonRetryResponse{ + RetryAfterSeconds: retrySeconds, + CommonResponse: runtimehooksv1.CommonResponse{ + Status: status, + }, + }, + }, + responseStatusCode: http.StatusOK, + } +} + func createSecureTestServer(server testServerConfig, callbacks ...func()) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -1335,7 +1409,10 @@ func createSecureTestServer(server testServerConfig, callbacks ...func()) *httpt panic(err) } w.WriteHeader(resp.responseStatusCode) - _, _ = w.Write(respBody) + _, err = w.Write(respBody) + if err != nil { + panic(err) + } return } diff --git a/internal/runtime/registry/registry.go b/internal/runtime/registry/registry.go index 1de4c1eebcfa..36f9315c7c56 100644 --- a/internal/runtime/registry/registry.go +++ b/internal/runtime/registry/registry.go @@ -17,6 +17,7 @@ limitations under the License. package registry import ( + "sort" "sync" "github.com/pkg/errors" @@ -84,6 +85,13 @@ type ExtensionRegistration struct { // Settings captures additional information sent in call to the RuntimeExtensions. Settings map[string]string + + // Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order. + Priority int32 + + // Serial defines if the hook should be executed serially. This ensures that previously pending hooks are finished + // first, as well as current hook reached completion, before moving to the next one. + Serial bool } // extensionRegistry is an implementation of ExtensionRegistry. @@ -210,6 +218,11 @@ func (r *extensionRegistry) List(gh runtimecatalog.GroupHook) ([]*ExtensionRegis l = append(l, registration) } } + + sort.SliceStable(l, func(i, j int) bool { + return l[i].Priority > l[j].Priority + }) + return l, nil } @@ -263,6 +276,8 @@ func (r *extensionRegistry) add(extensionConfig *runtimev1.ExtensionConfig) erro TimeoutSeconds: e.TimeoutSeconds, FailurePolicy: e.FailurePolicy, Settings: extensionConfig.Spec.Settings, + Priority: e.Priority, + Serial: e.Serial, }) } diff --git a/test/e2e/cluster_upgrade.go b/test/e2e/cluster_upgrade.go index 8f307c66ca48..f14ac32e3f61 100644 --- a/test/e2e/cluster_upgrade.go +++ b/test/e2e/cluster_upgrade.go @@ -184,10 +184,12 @@ func ClusterUpgradeConformanceSpec(ctx context.Context, inputGetter func() Clust WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), - PreWaitForControlPlaneToBeUpgraded: func() { - if input.PreWaitForControlPlaneToBeUpgraded != nil { - input.PreWaitForControlPlaneToBeUpgraded(input.BootstrapClusterProxy, namespace.Name, clusterName) - } + PreWaitForControlPlaneToBeUpgraded: []func(){ + func() { + if input.PreWaitForControlPlaneToBeUpgraded != nil { + input.PreWaitForControlPlaneToBeUpgraded(input.BootstrapClusterProxy, namespace.Name, clusterName) + } + }, }, }) } else { diff --git a/test/e2e/cluster_upgrade_runtimesdk.go b/test/e2e/cluster_upgrade_runtimesdk.go index 401145fa90e0..f18b67e02ded 100644 --- a/test/e2e/cluster_upgrade_runtimesdk.go +++ b/test/e2e/cluster_upgrade_runtimesdk.go @@ -277,12 +277,20 @@ func ClusterUpgradeWithRuntimeSDKSpec(ctx context.Context, inputGetter func() Cl WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"), - PreWaitForControlPlaneToBeUpgraded: func() { - beforeClusterUpgradeTestHandler(ctx, - input.BootstrapClusterProxy.GetClient(), - clusterRef, - input.E2EConfig.MustGetVariable(KubernetesVersionUpgradeTo), - input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade")) + PreWaitForControlPlaneToBeUpgraded: []func(){ + func() { + justBeforeClusterUpgradeTestHandler(ctx, + input.BootstrapClusterProxy.GetClient(), + clusterRef, + input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade")) + }, + func() { + beforeClusterUpgradeTestHandler(ctx, + input.BootstrapClusterProxy.GetClient(), + clusterRef, + input.E2EConfig.MustGetVariable(KubernetesVersionUpgradeTo), + input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade")) + }, }, PreWaitForWorkersToBeUpgraded: func() { machineSetPreflightChecksTestHandler(ctx, @@ -580,6 +588,19 @@ func beforeClusterUpgradeTestHandler(ctx context.Context, c client.Client, clust }, intervals) } +// justBeforeClusterUpgradeTestHandler calls runtimeHookTestHandler with a blocking function which returns false if +// JustBeforeClusterUpgrade hook passed, and BeforeClusterUpgrade hook is pending. +func justBeforeClusterUpgradeTestHandler(ctx context.Context, c client.Client, cluster types.NamespacedName, intervals []interface{}) { + hookName := "JustBeforeClusterUpgrade" + runtimeHookTestHandler(ctx, c, cluster, hookName, true, func() bool { + // Wait for JustBeforeClusterUpgrade to pass and BeforeClusterUpgrade pending + return checkLifecycleHookResponses(ctx, c, cluster, map[string]string{ + "JustBeforeClusterUpgrade": "Status: Success, RetryAfterSeconds: 0", + "BeforeClusterUpgrade": "Status: Success, RetryAfterSeconds: 5", + }) != nil + }, intervals) +} + // afterControlPlaneUpgradeTestHandler calls runtimeHookTestHandler with a blocking function which returns false if any // MachineDeployment in the Cluster has upgraded to the target Kubernetes version. func afterControlPlaneUpgradeTestHandler(ctx context.Context, c client.Client, cluster types.NamespacedName, version string, intervals []interface{}) { diff --git a/test/extension/handlers/lifecycle/handlers.go b/test/extension/handlers/lifecycle/handlers.go index 9b22b4887037..1f71734b7b8b 100644 --- a/test/extension/handlers/lifecycle/handlers.go +++ b/test/extension/handlers/lifecycle/handlers.go @@ -115,6 +115,30 @@ func (m *ExtensionHandlers) DoAfterControlPlaneInitialized(ctx context.Context, } } +// JustBeforeClusterUpgrade is the hook that will be called before the control plane is upgraded for the first time and before the BeforeClusterUpgrade hook. +func JustBeforeClusterUpgrade(*runtimehooksv1.BeforeClusterUpgradeRequest, *runtimehooksv1.BeforeClusterUpgradeResponse) { +} + +// DoJustBeforeClusterUpgrade implements the HandlerFunc for the DoJustBeforeClusterUpgrade hook. +// The hook answers with the response stored in a well know config map, thus allowing E2E tests to +// control the hook behaviour during a test. +// NOTE: custom RuntimeExtension, must implement the body of this func according to the specific use case. +func (m *ExtensionHandlers) DoJustBeforeClusterUpgrade(ctx context.Context, request *runtimehooksv1.BeforeClusterUpgradeRequest, response *runtimehooksv1.BeforeClusterUpgradeResponse) { + log := ctrl.LoggerFrom(ctx) + log.Info("DoJustBeforeClusterUpgrade is called") + + if err := m.readResponseFromConfigMap(ctx, &request.Cluster, JustBeforeClusterUpgrade, request.GetSettings(), response); err != nil { + response.Status = runtimehooksv1.ResponseStatusFailure + response.Message = err.Error() + return + } + + if err := m.recordCallInConfigMap(ctx, &request.Cluster, JustBeforeClusterUpgrade, response); err != nil { + response.Status = runtimehooksv1.ResponseStatusFailure + response.Message = err.Error() + } +} + // DoAfterControlPlaneUpgrade implements the HandlerFunc for the AfterControlPlaneUpgrade hook. // The hook answers with the response stored in a well know config map, thus allowing E2E tests to // control the hook behaviour during a test. @@ -223,6 +247,7 @@ func responsesConfigMap(cluster *clusterv1.Cluster, defaultAllHandlersToBlocking Data: map[string]string{ // Blocking hooks are set to return RetryAfterSeconds initially. These will be changed during the test. "BeforeClusterCreate-preloadedResponse": fmt.Sprintf(`{"Status": "Success", "RetryAfterSeconds": %d}`, retryAfterSeconds), + "JustBeforeClusterUpgrade-preloadedResponse": fmt.Sprintf(`{"Status": "Success", "RetryAfterSeconds": %d, "Message": "JustBeforeClusterUpgrade message"}`, retryAfterSeconds), "BeforeClusterUpgrade-preloadedResponse": fmt.Sprintf(`{"Status": "Success", "RetryAfterSeconds": %d}`, retryAfterSeconds), "AfterControlPlaneUpgrade-preloadedResponse": fmt.Sprintf(`{"Status": "Success", "RetryAfterSeconds": %d}`, retryAfterSeconds), "BeforeClusterDelete-preloadedResponse": fmt.Sprintf(`{"Status": "Success", "RetryAfterSeconds": %d}`, retryAfterSeconds), diff --git a/test/extension/main.go b/test/extension/main.go index 62a058f88800..5f79737ee0d8 100644 --- a/test/extension/main.go +++ b/test/extension/main.go @@ -341,6 +341,17 @@ func setupLifecycleHookHandlers(mgr ctrl.Manager, runtimeExtensionWebhookServer os.Exit(1) } + if err := runtimeExtensionWebhookServer.AddExtensionHandler(server.ExtensionHandler{ + Hook: lifecycle.JustBeforeClusterUpgrade, + Name: "just-before-cluster-upgrade", + HandlerFunc: lifecycleExtensionHandlers.DoJustBeforeClusterUpgrade, + Priority: 10, + Serial: true, + }); err != nil { + setupLog.Error(err, "Error adding handler") + os.Exit(1) + } + if err := runtimeExtensionWebhookServer.AddExtensionHandler(server.ExtensionHandler{ Hook: runtimehooksv1.BeforeClusterUpgrade, Name: "before-cluster-upgrade", diff --git a/test/framework/cluster_topology_helpers.go b/test/framework/cluster_topology_helpers.go index 2c83da78d80c..0681aec8e5ed 100644 --- a/test/framework/cluster_topology_helpers.go +++ b/test/framework/cluster_topology_helpers.go @@ -73,7 +73,7 @@ type UpgradeClusterTopologyAndWaitForUpgradeInput struct { WaitForKubeProxyUpgrade []interface{} WaitForDNSUpgrade []interface{} WaitForEtcdUpgrade []interface{} - PreWaitForControlPlaneToBeUpgraded func() + PreWaitForControlPlaneToBeUpgraded []func() PreWaitForWorkersToBeUpgraded func() SkipKubeProxyCheck bool } @@ -113,7 +113,9 @@ func UpgradeClusterTopologyAndWaitForUpgrade(ctx context.Context, input UpgradeC // and blocking correctly. if input.PreWaitForControlPlaneToBeUpgraded != nil { log.Logf("Calling PreWaitForControlPlaneToBeUpgraded") - input.PreWaitForControlPlaneToBeUpgraded() + for _, f := range input.PreWaitForControlPlaneToBeUpgraded { + f() + } } log.Logf("Waiting for control-plane machines to have the upgraded Kubernetes version")