Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 70 additions & 61 deletions test/cases/disruptive/graceful_reboot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,27 @@
package disruptive

import (
"bytes"
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/aws/aws-k8s-tester/internal/awssdk"
fwext "github.com/aws/aws-k8s-tester/internal/e2e"

"github.com/aws/aws-k8s-tester/internal/awssdk"
"github.com/aws/aws-sdk-go-v2/service/ec2"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/exec"
"k8s.io/client-go/kubernetes"

"sigs.k8s.io/e2e-framework/klient/wait"
"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/features"
)

func getSleepPodTemplate(name string, targetNodeName string, duration string) corev1.Pod {
func getSleepPodTemplate(name string) corev1.Pod {
return corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -36,47 +34,26 @@ func getSleepPodTemplate(name string, targetNodeName string, duration string) co
{
Name: name,
Image: "public.ecr.aws/amazonlinux/amazonlinux:2023",
Command: []string{"sleep", duration},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("250m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("250m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
},
Command: []string{"sleep", "infinity"},
},
},
RestartPolicy: corev1.RestartPolicyNever,
NodeName: targetNodeName,
Resources: &corev1.ResourceRequirements{
// set high pod limits to make sure the pod does not get
// OOMKilled, and make requests equal to qualify the pod
// for the Guaranteed Quality of Service class
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("250m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("250m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
},
},
}
}

func TestGracefulReboot(t *testing.T) {
terminationCanaryPodName := fmt.Sprintf("termination-canary-%d", time.Now().Unix())
canaryPod := getSleepPodTemplate(terminationCanaryPodName, "", "infinity")
canaryPod := getSleepPodTemplate(terminationCanaryPodName)
bootIndicatorPodName := fmt.Sprintf("boot-detection-%d", time.Now().Unix())
bootIndicatorPod := getSleepPodTemplate(bootIndicatorPodName, "", "infinity")
bootIndicatorPod := getSleepPodTemplate(bootIndicatorPodName)

feat := features.New("graceful-reboot").
WithLabel("suite", "disruptive").
Assess("Node gracefully reboots", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
// Create an initial pod to allow the default scheduler to do the work of identifying a healthy node.
// Starting with a healthy node is essential to the test, as the only expectation is for the node to
// return to its same initial state after the reboot.
if err := cfg.Client().Resources().Create(ctx, &canaryPod); err != nil {
t.Fatalf("Failed to create heartbeat pod: %v", err)
}
Expand All @@ -85,48 +62,84 @@ func TestGracefulReboot(t *testing.T) {
wait.WithContext(ctx),
wait.WithTimeout(5*time.Minute),
); err != nil {
t.Fatalf("Failed to wait for pod to go into running status %s: %v", terminationCanaryPodName, err)
t.Fatalf("Failed to wait for pod %s to go into running status: %v", terminationCanaryPodName, err)
}

var targetNode corev1.Node
if err := cfg.Client().Resources().Get(ctx, canaryPod.Spec.NodeName, "", &targetNode); err != nil {
t.Fatalf("failed to get node %s: %v", canaryPod.Spec.NodeName, err)
t.Fatalf("Failed to get node %s: %v", canaryPod.Spec.NodeName, err)
}

t.Logf("Pod %s is running on node %s", terminationCanaryPodName, targetNode.Name)

client, err := kubernetes.NewForConfig(cfg.Client().RESTConfig())
if err != nil {
t.Fatalf("Failed to initialize kubernetes client set: %v", err)
}

// Do an initial check of the /healthz endpoint reachability to ensure we can rely on it later.
// This might fail even if the node is healthy if, for example, the node's security group rules
// do not allow ingress traffic from the control plane
nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes").
Name(targetNode.Name).SubResource("proxy").Suffix("/healthz").
Do(ctx)

if nodeHealthResponse.Error() != nil {
t.Fatalf("Unexpected error checking node %s /healthz endpoint: %v", targetNode.Name, err)
}

t.Logf("Node %s is responding to /healthz", targetNode.Name)

providerIDParts := strings.Split(targetNode.Spec.ProviderID, "/")
instanceID := providerIDParts[len(providerIDParts)-1]
t.Logf("Node %s corresponds to EC2 instance: %s", targetNode.Name, instanceID)
t.Logf("Rebooting underlying instance %s for node %s...", instanceID, targetNode.Name)

ec2Client := ec2.NewFromConfig(awssdk.NewConfig())

// TODO: make sure the exec starts before the reboot to promote better determinism
t.Logf("Rebooting instance %s to test graceful reboot...", instanceID)
_, err := ec2Client.RebootInstances(ctx, &ec2.RebootInstancesInput{
if _, err := ec2Client.RebootInstances(ctx, &ec2.RebootInstancesInput{
InstanceIds: []string{instanceID},
})
if err != nil {
t.Fatalf("Failed to reboot EC2 instance %s: %v", instanceID, err)
}); err != nil {
t.Fatalf("Failed to reboot instance %s: %v", instanceID, err)
}
t.Logf("Successfully initiated reboot of instance %s, waiting for pod %s to terminate...", instanceID, canaryPod.Name)

t.Logf("Started exec into pod %s", terminationCanaryPodName)
// Attempt to execute a blocking command in the pod until we get a 143, which would indicate a SIGTERM.
// This a reliable way to check termination since it requires direct response from Kubelet
var execOut, execErr bytes.Buffer
err = cfg.Client().Resources().ExecInPod(ctx, "default", terminationCanaryPodName, terminationCanaryPodName, []string{"sleep", "infinity"}, &execOut, &execErr)
if err != nil {
if execErr, ok := err.(exec.CodeExitError); ok && execErr.Code == 143 {
t.Logf("Pod %s was terminated", terminationCanaryPodName)
} else {
t.Fatalf("Got unexpected error terminating pod: %v", err)
t.Logf("Successfully triggered reboot of instance %s, waiting for kubelet to become unresponsive...", instanceID)

kubeletShutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

// Use kubelet health probes as the signal for instance shutdown. Since the health endpoint
// could previously be reached, a refused connection implies kubelet was killed.
var kubeletConnectionLost bool
for !kubeletConnectionLost {
select {
case <-kubeletShutdownCtx.Done():
t.Fatalf("Failed to wait for kubelet to become unresponsive: %v", ctx.Err())
case <-time.Tick(1 * time.Second):
nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes").
Name(targetNode.Name).SubResource("proxy").Suffix("/healthz").
Do(ctx)

if nodeHealthResponse.Error() != nil {
// TODO: match error against syscall.ECONNREFUSED instead, the k8s client doesn't
// currently properly wrap the underlying error to allow this though
if strings.Contains(nodeHealthResponse.Error().Error(), "connection refused") {
kubeletConnectionLost = true
} else {
t.Fatalf("Unpexected error while monitoring kubelet on node %s: %v", targetNode.Name, nodeHealthResponse.Error())
}
} else {
t.Logf("Node %s still responding to /healthz", targetNode.Name)
}
}
}

t.Logf("Waiting up to 10 minutes for node %s to become schedulable again", targetNode.Name)
t.Logf("Node %s has become unresponsive, waiting for the node to become schedulable again...", targetNode.Name)

// Create a second pod, under the assumption that a new pod cannot be scheduled by a shutting down kubelet
// that has already evicted other pods, so this one should only schedule with a new kubelet after boot
bootIndicatorPod.Spec.NodeName = targetNode.Name
// Create a second pod, we will rely on this pod starting to run as an indication of a healthy state.
// Since kubelet was killed at this point, we know the reboot must complete and kubelet must start
// again for this pod to start running.
bootIndicatorPod.Spec.NodeSelector = map[string]string{
"kubernetes.io/hostname": targetNode.Name,
}
if err := cfg.Client().Resources().Create(ctx, &bootIndicatorPod); err != nil {
t.Fatalf("Failed to create boot indicator pod: %v", err)
}
Expand All @@ -144,14 +157,10 @@ func TestGracefulReboot(t *testing.T) {
Teardown(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
if err := cfg.Client().Resources().Delete(ctx, &canaryPod); err != nil {
t.Logf("Failed to delete pod %s: %v", terminationCanaryPodName, err)
} else {
t.Logf("Successfully cleaned up pod %s", terminationCanaryPodName)
}

if err := cfg.Client().Resources().Delete(ctx, &bootIndicatorPod); err != nil {
t.Logf("Failed to delete pod %s: %v", bootIndicatorPodName, err)
} else {
t.Logf("Successfully cleaned up pod %s", bootIndicatorPodName)
}
return ctx
}).
Expand Down
Loading