From 0a8994047a85866a3aa06b7e0edab0bf7bc83747 Mon Sep 17 00:00:00 2001 From: Mazen Selim Date: Sat, 27 Sep 2025 19:03:49 +0000 Subject: [PATCH] fix: make reboot test more deterministic --- test/cases/disruptive/graceful_reboot_test.go | 131 ++++++++++-------- 1 file changed, 70 insertions(+), 61 deletions(-) diff --git a/test/cases/disruptive/graceful_reboot_test.go b/test/cases/disruptive/graceful_reboot_test.go index 34c87b88b..e05841ccc 100644 --- a/test/cases/disruptive/graceful_reboot_test.go +++ b/test/cases/disruptive/graceful_reboot_test.go @@ -3,29 +3,27 @@ package disruptive import ( - "bytes" "context" "fmt" "strings" "testing" "time" + "github.com/aws/aws-k8s-tester/internal/awssdk" fwext "github.com/aws/aws-k8s-tester/internal/e2e" - "github.com/aws/aws-k8s-tester/internal/awssdk" "github.com/aws/aws-sdk-go-v2/service/ec2" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/util/exec" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/e2e-framework/klient/wait" "sigs.k8s.io/e2e-framework/pkg/envconf" "sigs.k8s.io/e2e-framework/pkg/features" ) -func getSleepPodTemplate(name string, targetNodeName string, duration string) corev1.Pod { +func getSleepPodTemplate(name string) corev1.Pod { return corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -36,47 +34,26 @@ func getSleepPodTemplate(name string, targetNodeName string, duration string) co { Name: name, Image: "public.ecr.aws/amazonlinux/amazonlinux:2023", - Command: []string{"sleep", duration}, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("250m"), - corev1.ResourceMemory: resource.MustParse("64Mi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("250m"), - corev1.ResourceMemory: resource.MustParse("64Mi"), - }, - }, + Command: []string{"sleep", "infinity"}, }, }, RestartPolicy: corev1.RestartPolicyNever, - NodeName: targetNodeName, - Resources: &corev1.ResourceRequirements{ - // set high pod limits to make sure the pod does not get - // OOMKilled, and make requests equal to qualify the pod - // for the Guaranteed Quality of Service class - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("250m"), - corev1.ResourceMemory: resource.MustParse("64Mi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("250m"), - corev1.ResourceMemory: resource.MustParse("64Mi"), - }, - }, }, } } func TestGracefulReboot(t *testing.T) { terminationCanaryPodName := fmt.Sprintf("termination-canary-%d", time.Now().Unix()) - canaryPod := getSleepPodTemplate(terminationCanaryPodName, "", "infinity") + canaryPod := getSleepPodTemplate(terminationCanaryPodName) bootIndicatorPodName := fmt.Sprintf("boot-detection-%d", time.Now().Unix()) - bootIndicatorPod := getSleepPodTemplate(bootIndicatorPodName, "", "infinity") + bootIndicatorPod := getSleepPodTemplate(bootIndicatorPodName) feat := features.New("graceful-reboot"). WithLabel("suite", "disruptive"). Assess("Node gracefully reboots", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + // Create an initial pod to allow the default scheduler to do the work of identifying a healthy node. + // Starting with a healthy node is essential to the test, as the only expectation is for the node to + // return to its same initial state after the reboot. if err := cfg.Client().Resources().Create(ctx, &canaryPod); err != nil { t.Fatalf("Failed to create heartbeat pod: %v", err) } @@ -85,48 +62,84 @@ func TestGracefulReboot(t *testing.T) { wait.WithContext(ctx), wait.WithTimeout(5*time.Minute), ); err != nil { - t.Fatalf("Failed to wait for pod to go into running status %s: %v", terminationCanaryPodName, err) + t.Fatalf("Failed to wait for pod %s to go into running status: %v", terminationCanaryPodName, err) } var targetNode corev1.Node if err := cfg.Client().Resources().Get(ctx, canaryPod.Spec.NodeName, "", &targetNode); err != nil { - t.Fatalf("failed to get node %s: %v", canaryPod.Spec.NodeName, err) + t.Fatalf("Failed to get node %s: %v", canaryPod.Spec.NodeName, err) + } + + t.Logf("Pod %s is running on node %s", terminationCanaryPodName, targetNode.Name) + + client, err := kubernetes.NewForConfig(cfg.Client().RESTConfig()) + if err != nil { + t.Fatalf("Failed to initialize kubernetes client set: %v", err) } + // Do an initial check of the /healthz endpoint reachability to ensure we can rely on it later. + // This might fail even if the node is healthy if, for example, the node's security group rules + // do not allow ingress traffic from the control plane + nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes"). + Name(targetNode.Name).SubResource("proxy").Suffix("/healthz"). + Do(ctx) + + if nodeHealthResponse.Error() != nil { + t.Fatalf("Unexpected error checking node %s /healthz endpoint: %v", targetNode.Name, err) + } + + t.Logf("Node %s is responding to /healthz", targetNode.Name) + providerIDParts := strings.Split(targetNode.Spec.ProviderID, "/") instanceID := providerIDParts[len(providerIDParts)-1] - t.Logf("Node %s corresponds to EC2 instance: %s", targetNode.Name, instanceID) + t.Logf("Rebooting underlying instance %s for node %s...", instanceID, targetNode.Name) ec2Client := ec2.NewFromConfig(awssdk.NewConfig()) - - // TODO: make sure the exec starts before the reboot to promote better determinism - t.Logf("Rebooting instance %s to test graceful reboot...", instanceID) - _, err := ec2Client.RebootInstances(ctx, &ec2.RebootInstancesInput{ + if _, err := ec2Client.RebootInstances(ctx, &ec2.RebootInstancesInput{ InstanceIds: []string{instanceID}, - }) - if err != nil { - t.Fatalf("Failed to reboot EC2 instance %s: %v", instanceID, err) + }); err != nil { + t.Fatalf("Failed to reboot instance %s: %v", instanceID, err) } - t.Logf("Successfully initiated reboot of instance %s, waiting for pod %s to terminate...", instanceID, canaryPod.Name) - t.Logf("Started exec into pod %s", terminationCanaryPodName) - // Attempt to execute a blocking command in the pod until we get a 143, which would indicate a SIGTERM. - // This a reliable way to check termination since it requires direct response from Kubelet - var execOut, execErr bytes.Buffer - err = cfg.Client().Resources().ExecInPod(ctx, "default", terminationCanaryPodName, terminationCanaryPodName, []string{"sleep", "infinity"}, &execOut, &execErr) - if err != nil { - if execErr, ok := err.(exec.CodeExitError); ok && execErr.Code == 143 { - t.Logf("Pod %s was terminated", terminationCanaryPodName) - } else { - t.Fatalf("Got unexpected error terminating pod: %v", err) + t.Logf("Successfully triggered reboot of instance %s, waiting for kubelet to become unresponsive...", instanceID) + + kubeletShutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + // Use kubelet health probes as the signal for instance shutdown. Since the health endpoint + // could previously be reached, a refused connection implies kubelet was killed. + var kubeletConnectionLost bool + for !kubeletConnectionLost { + select { + case <-kubeletShutdownCtx.Done(): + t.Fatalf("Failed to wait for kubelet to become unresponsive: %v", ctx.Err()) + case <-time.Tick(1 * time.Second): + nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes"). + Name(targetNode.Name).SubResource("proxy").Suffix("/healthz"). + Do(ctx) + + if nodeHealthResponse.Error() != nil { + // TODO: match error against syscall.ECONNREFUSED instead, the k8s client doesn't + // currently properly wrap the underlying error to allow this though + if strings.Contains(nodeHealthResponse.Error().Error(), "connection refused") { + kubeletConnectionLost = true + } else { + t.Fatalf("Unpexected error while monitoring kubelet on node %s: %v", targetNode.Name, nodeHealthResponse.Error()) + } + } else { + t.Logf("Node %s still responding to /healthz", targetNode.Name) + } } } - t.Logf("Waiting up to 10 minutes for node %s to become schedulable again", targetNode.Name) + t.Logf("Node %s has become unresponsive, waiting for the node to become schedulable again...", targetNode.Name) - // Create a second pod, under the assumption that a new pod cannot be scheduled by a shutting down kubelet - // that has already evicted other pods, so this one should only schedule with a new kubelet after boot - bootIndicatorPod.Spec.NodeName = targetNode.Name + // Create a second pod, we will rely on this pod starting to run as an indication of a healthy state. + // Since kubelet was killed at this point, we know the reboot must complete and kubelet must start + // again for this pod to start running. + bootIndicatorPod.Spec.NodeSelector = map[string]string{ + "kubernetes.io/hostname": targetNode.Name, + } if err := cfg.Client().Resources().Create(ctx, &bootIndicatorPod); err != nil { t.Fatalf("Failed to create boot indicator pod: %v", err) } @@ -144,14 +157,10 @@ func TestGracefulReboot(t *testing.T) { Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { if err := cfg.Client().Resources().Delete(ctx, &canaryPod); err != nil { t.Logf("Failed to delete pod %s: %v", terminationCanaryPodName, err) - } else { - t.Logf("Successfully cleaned up pod %s", terminationCanaryPodName) } if err := cfg.Client().Resources().Delete(ctx, &bootIndicatorPod); err != nil { t.Logf("Failed to delete pod %s: %v", bootIndicatorPodName, err) - } else { - t.Logf("Successfully cleaned up pod %s", bootIndicatorPodName) } return ctx }).