diff --git a/internal/e2e/health.go b/internal/e2e/health.go new file mode 100644 index 000000000..76c04db4a --- /dev/null +++ b/internal/e2e/health.go @@ -0,0 +1,43 @@ +package e2e + +import ( + "context" + "fmt" + "strings" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +// KubeletIsResponsive returns true if the kubelet /healthz endpoint responds with a 200 status code, and propagates +// any non-connection specific errors +func KubeletIsResponsive(ctx context.Context, cfg *rest.Config, nodeName string) (bool, error) { + client, err := kubernetes.NewForConfig(cfg) + if err != nil { + return false, fmt.Errorf("failed to initialize client set: %v", err) + } + + nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes"). + Name(nodeName).SubResource("proxy").Suffix("/healthz"). + Do(ctx) + + if nodeHealthResponse.Error() != nil { + errMsg := nodeHealthResponse.Error().Error() + // TODO: match errors against types, e.g. syscall.ECONNREFUSED instead, the k8s client doesn't + // currently properly wrap the underlying error to allow this though + if strings.Contains(errMsg, "connection refused") || + strings.Contains(errMsg, "connection reset by peer") || + strings.Contains(errMsg, "http2: client connection lost") { + // these errors indicate reachability to the node in general but an unstable connection to kubelet + return false, nil + } + + // propagate other errors, e.g. i/o timeout, that may result from things unrelated to kubelet health, + // e.g. security group rules on the instance restricting traffic from the CP + return false, fmt.Errorf("could not reach /healthz endpoint for node %s: %w", nodeName, nodeHealthResponse.Error()) + } + + var statusCode int + nodeHealthResponse.StatusCode(&statusCode) + return statusCode == 200, nil +} diff --git a/test/cases/disruptive/graceful_reboot_test.go b/test/cases/disruptive/graceful_reboot_test.go index e05841ccc..d9b5d139d 100644 --- a/test/cases/disruptive/graceful_reboot_test.go +++ b/test/cases/disruptive/graceful_reboot_test.go @@ -16,7 +16,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "sigs.k8s.io/e2e-framework/klient/wait" "sigs.k8s.io/e2e-framework/pkg/envconf" @@ -72,24 +71,14 @@ func TestGracefulReboot(t *testing.T) { t.Logf("Pod %s is running on node %s", terminationCanaryPodName, targetNode.Name) - client, err := kubernetes.NewForConfig(cfg.Client().RESTConfig()) - if err != nil { - t.Fatalf("Failed to initialize kubernetes client set: %v", err) - } - // Do an initial check of the /healthz endpoint reachability to ensure we can rely on it later. // This might fail even if the node is healthy if, for example, the node's security group rules // do not allow ingress traffic from the control plane - nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes"). - Name(targetNode.Name).SubResource("proxy").Suffix("/healthz"). - Do(ctx) - - if nodeHealthResponse.Error() != nil { - t.Fatalf("Unexpected error checking node %s /healthz endpoint: %v", targetNode.Name, err) + kubeletResponsive, err := fwext.KubeletIsResponsive(ctx, cfg.Client().RESTConfig(), targetNode.Name) + if err != nil || !kubeletResponsive { + t.Fatalf("Node %s is not responding to initial /healthz checks: %v", targetNode.Name, err) } - t.Logf("Node %s is responding to /healthz", targetNode.Name) - providerIDParts := strings.Split(targetNode.Spec.ProviderID, "/") instanceID := providerIDParts[len(providerIDParts)-1] t.Logf("Rebooting underlying instance %s for node %s...", instanceID, targetNode.Name) @@ -108,26 +97,13 @@ func TestGracefulReboot(t *testing.T) { // Use kubelet health probes as the signal for instance shutdown. Since the health endpoint // could previously be reached, a refused connection implies kubelet was killed. - var kubeletConnectionLost bool - for !kubeletConnectionLost { + for kubeletResponsive { select { case <-kubeletShutdownCtx.Done(): t.Fatalf("Failed to wait for kubelet to become unresponsive: %v", ctx.Err()) case <-time.Tick(1 * time.Second): - nodeHealthResponse := client.CoreV1().RESTClient().Get().Resource("nodes"). - Name(targetNode.Name).SubResource("proxy").Suffix("/healthz"). - Do(ctx) - - if nodeHealthResponse.Error() != nil { - // TODO: match error against syscall.ECONNREFUSED instead, the k8s client doesn't - // currently properly wrap the underlying error to allow this though - if strings.Contains(nodeHealthResponse.Error().Error(), "connection refused") { - kubeletConnectionLost = true - } else { - t.Fatalf("Unpexected error while monitoring kubelet on node %s: %v", targetNode.Name, nodeHealthResponse.Error()) - } - } else { - t.Logf("Node %s still responding to /healthz", targetNode.Name) + if kubeletResponsive, err = fwext.KubeletIsResponsive(ctx, cfg.Client().RESTConfig(), targetNode.Name); err != nil { + t.Fatalf("Unpexected error while monitoring kubelet on node %s: %v", targetNode.Name, err) } } }