Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions internal/e2e/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package e2e

import (
"context"
"fmt"
"strings"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// KubeletIsResponsive returns true if the kubelet /healthz endpoint responds with a 200 status code, and propagates
// any non-connection specific errors
func KubeletIsResponsive(ctx context.Context, cfg *rest.Config, nodeName string) (bool, error) {
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return false, fmt.Errorf("failed to initialize client set: %v", err)
}

nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes").
Name(nodeName).SubResource("proxy").Suffix("/healthz").
Do(ctx)

if nodeHealthResponse.Error() != nil {
errMsg := nodeHealthResponse.Error().Error()
// TODO: match errors against types, e.g. syscall.ECONNREFUSED instead, the k8s client doesn't
// currently properly wrap the underlying error to allow this though
if strings.Contains(errMsg, "connection refused") ||
strings.Contains(errMsg, "connection reset by peer") ||
strings.Contains(errMsg, "http2: client connection lost") {
// these errors indicate reachability to the node in general but an unstable connection to kubelet
return false, nil
}

// propagate other errors, e.g. i/o timeout, that may result from things unrelated to kubelet health,
// e.g. security group rules on the instance restricting traffic from the CP
return false, fmt.Errorf("could not reach /healthz endpoint for node %s: %w", nodeName, nodeHealthResponse.Error())
}

var statusCode int
nodeHealthResponse.StatusCode(&statusCode)
return statusCode == 200, nil
}
36 changes: 6 additions & 30 deletions test/cases/disruptive/graceful_reboot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"sigs.k8s.io/e2e-framework/klient/wait"
"sigs.k8s.io/e2e-framework/pkg/envconf"
Expand Down Expand Up @@ -72,24 +71,14 @@ func TestGracefulReboot(t *testing.T) {

t.Logf("Pod %s is running on node %s", terminationCanaryPodName, targetNode.Name)

client, err := kubernetes.NewForConfig(cfg.Client().RESTConfig())
if err != nil {
t.Fatalf("Failed to initialize kubernetes client set: %v", err)
}

// Do an initial check of the /healthz endpoint reachability to ensure we can rely on it later.
// This might fail even if the node is healthy if, for example, the node's security group rules
// do not allow ingress traffic from the control plane
nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes").
Name(targetNode.Name).SubResource("proxy").Suffix("/healthz").
Do(ctx)

if nodeHealthResponse.Error() != nil {
t.Fatalf("Unexpected error checking node %s /healthz endpoint: %v", targetNode.Name, err)
kubeletResponsive, err := fwext.KubeletIsResponsive(ctx, cfg.Client().RESTConfig(), targetNode.Name)
if err != nil || !kubeletResponsive {
t.Fatalf("Node %s is not responding to initial /healthz checks: %v", targetNode.Name, err)
}

t.Logf("Node %s is responding to /healthz", targetNode.Name)

providerIDParts := strings.Split(targetNode.Spec.ProviderID, "/")
instanceID := providerIDParts[len(providerIDParts)-1]
t.Logf("Rebooting underlying instance %s for node %s...", instanceID, targetNode.Name)
Expand All @@ -108,26 +97,13 @@ func TestGracefulReboot(t *testing.T) {

// Use kubelet health probes as the signal for instance shutdown. Since the health endpoint
// could previously be reached, a refused connection implies kubelet was killed.
var kubeletConnectionLost bool
for !kubeletConnectionLost {
for kubeletResponsive {
select {
case <-kubeletShutdownCtx.Done():
t.Fatalf("Failed to wait for kubelet to become unresponsive: %v", ctx.Err())
case <-time.Tick(1 * time.Second):
nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes").
Name(targetNode.Name).SubResource("proxy").Suffix("/healthz").
Do(ctx)

if nodeHealthResponse.Error() != nil {
// TODO: match error against syscall.ECONNREFUSED instead, the k8s client doesn't
// currently properly wrap the underlying error to allow this though
if strings.Contains(nodeHealthResponse.Error().Error(), "connection refused") {
kubeletConnectionLost = true
} else {
t.Fatalf("Unpexected error while monitoring kubelet on node %s: %v", targetNode.Name, nodeHealthResponse.Error())
}
} else {
t.Logf("Node %s still responding to /healthz", targetNode.Name)
if kubeletResponsive, err = fwext.KubeletIsResponsive(ctx, cfg.Client().RESTConfig(), targetNode.Name); err != nil {
t.Fatalf("Unpexected error while monitoring kubelet on node %s: %v", targetNode.Name, err)
}
}
}
Expand Down
Loading