From 56c1d7d7d54ad407bedfd13190fcae95c40b25c9 Mon Sep 17 00:00:00 2001 From: Salah Aldeen Al Saleh Date: Wed, 18 Jun 2025 13:36:51 -0700 Subject: [PATCH 1/3] Address CMX flakiness on reboot --- e2e/cluster/cmx/cluster.go | 112 +++++++++++++++++++++++-------------- 1 file changed, 69 insertions(+), 43 deletions(-) diff --git a/e2e/cluster/cmx/cluster.go b/e2e/cluster/cmx/cluster.go index edc2bf1836..aca1b0f6a4 100644 --- a/e2e/cluster/cmx/cluster.go +++ b/e2e/cluster/cmx/cluster.go @@ -36,6 +36,7 @@ type Node struct { ID string `json:"id"` Name string `json:"name"` NetworkID string `json:"network_id"` + Status string `json:"status"` privateIP string `json:"-"` sshEndpoint string `json:"-"` @@ -100,10 +101,6 @@ func NewNodes(in *ClusterInput) ([]Node, error) { return nil, fmt.Errorf("unmarshal node: %v: %s", err, string(output)) } - // TODO (@salah): remove this once the bug is fixed in CMX - // note: the vm gets marked as ready before the services are actually running - time.Sleep(30 * time.Second) - for i := range nodes { in.T.Logf("%s: getting ssh endpoint for node ID: %s", time.Now().Format(time.RFC3339), nodes[i].ID) @@ -113,10 +110,6 @@ func NewNodes(in *ClusterInput) ([]Node, error) { } nodes[i].sshEndpoint = sshEndpoint - if err := waitForSSH(nodes[i], in.T); err != nil { - return nil, fmt.Errorf("wait for ssh to be available on node %d: %v", i, err) - } - privateIP, err := discoverPrivateIP(nodes[i]) if err != nil { return nil, fmt.Errorf("discover node private IP: %v", err) @@ -212,28 +205,6 @@ func getSSHEndpoint(nodeID string) (string, error) { return strings.TrimSpace(string(output)), nil } -func waitForSSH(node Node, t *testing.T) error { - timeout := time.After(5 * time.Minute) - tick := time.Tick(5 * time.Second) - var lastErr error - - for { - select { - case <-timeout: - return fmt.Errorf("timed out after 5 minutes: last error: %w", lastErr) - case <-tick: - t.Logf("%s: checking SSH connectivity to node ID: %s", time.Now().Format(time.RFC3339), node.ID) - stdout, stderr, err := runCommandOnNode(node, []string{"uptime"}) - t.Logf("%s: SSH attempt - stdout: %s, stderr: %s, err: %v", time.Now().Format(time.RFC3339), stdout, stderr, err) - if err == nil { - t.Logf("%s: SSH connection successful to node ID: %s", time.Now().Format(time.RFC3339), node.ID) - return nil - } - lastErr = fmt.Errorf("%w: stdout: %s: stderr: %s", err, stdout, stderr) - } - } -} - func (c *Cluster) Airgap() error { // Update network policy to airgap output, err := exec.Command("replicated", "network", "update", "policy", "--id", c.network.ID, "--policy=airgap").CombinedOutput() @@ -271,30 +242,85 @@ func (c *Cluster) waitUntilAirgapped(node int) error { } func (c *Cluster) WaitForReboot() { - time.Sleep(30 * time.Second) - for i := range c.Nodes { - c.waitForSSH(i) - c.waitForClockSync(i) + c.waitForRunning() + c.waitForClockSync() +} + +func (c *Cluster) waitForRunning() { + timeout := time.After(5 * time.Minute) + tick := time.Tick(10 * time.Second) + + for { + select { + case <-timeout: + c.t.Fatalf("timed out waiting for nodes to be running after 5 minutes") + + case <-tick: + if err := c.refreshNodes(); err != nil { + c.t.Logf("failed to refresh nodes: %v", err) + continue + } + + for i, node := range c.Nodes { + if node.Status != "running" { + c.t.Logf("waiting for node %d (%s) to be running (status: %s)", i, node.ID, node.Status) + continue + } + } + + c.t.Logf("all nodes are running") + return + } } } -func (c *Cluster) waitForSSH(node int) { - if err := waitForSSH(c.Nodes[node], c.t); err != nil { - c.t.Fatalf("failed to wait for ssh to be available on node %d: %v", node, err) +func (c *Cluster) refreshNodes() error { + output, err := exec.Command("replicated", "vm", "ls", "-ojson").Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + return fmt.Errorf("list nodes: %w: stderr: %s: stdout: %s", err, string(exitErr.Stderr), string(output)) + } + return fmt.Errorf("list nodes: %w: stdout: %s", err, string(output)) + } + + var allNodes []Node + if err := json.Unmarshal(output, &allNodes); err != nil { + return fmt.Errorf("unmarshal nodes: %v: %s", err, string(output)) } + + allNodesMap := make(map[string]Node) + for _, node := range allNodes { + allNodesMap[node.ID] = node + } + + for i, node := range c.Nodes { + if updated, ok := allNodesMap[node.ID]; ok { + c.Nodes[i] = updated + } + } + + return nil } -func (c *Cluster) waitForClockSync(node int) { +func (c *Cluster) waitForClockSync() { timeout := time.After(5 * time.Minute) - tick := time.Tick(time.Second) + tick := time.Tick(5 * time.Second) + for { select { case <-timeout: - stdout, stderr, err := c.RunCommandOnNode(node, []string{"timedatectl show -p NTP -p NTPSynchronized"}) - c.t.Fatalf("timeout waiting for clock sync on node %d: %v: %s: %s", node, err, stdout, stderr) + c.t.Fatalf("timeout waiting for clock sync on all nodes") + case <-tick: - status, _, _ := c.RunCommandOnNode(node, []string{"timedatectl show -p NTP -p NTPSynchronized"}) - if strings.Contains(status, "NTP=yes") && strings.Contains(status, "NTPSynchronized=yes") { + for i, _ := range c.Nodes { + status, _, _ := c.RunCommandOnNode(i, []string{"timedatectl show -p NTP -p NTPSynchronized"}) + + if !strings.Contains(status, "NTP=yes") || !strings.Contains(status, "NTPSynchronized=yes") { + c.t.Logf("waiting for NTP=yes and NTPSynchronized=yes on node %d (status: %s)", i, status) + continue + } + + c.t.Logf("clock sync on node %d (status: %s)", i, status) return } } From 19ca5277258064f53844aecba3c774aee91b78c6 Mon Sep 17 00:00:00 2001 From: Salah Aldeen Al Saleh Date: Wed, 18 Jun 2025 13:43:08 -0700 Subject: [PATCH 2/3] fix ci lint --- e2e/cluster/cmx/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/cluster/cmx/cluster.go b/e2e/cluster/cmx/cluster.go index aca1b0f6a4..3d42d5b709 100644 --- a/e2e/cluster/cmx/cluster.go +++ b/e2e/cluster/cmx/cluster.go @@ -312,7 +312,7 @@ func (c *Cluster) waitForClockSync() { c.t.Fatalf("timeout waiting for clock sync on all nodes") case <-tick: - for i, _ := range c.Nodes { + for i := range c.Nodes { status, _, _ := c.RunCommandOnNode(i, []string{"timedatectl show -p NTP -p NTPSynchronized"}) if !strings.Contains(status, "NTP=yes") || !strings.Contains(status, "NTPSynchronized=yes") { From a224f51ec94efb2afbd615fb29c549b0474a183f Mon Sep 17 00:00:00 2001 From: Salah Aldeen Al Saleh Date: Wed, 18 Jun 2025 14:40:04 -0700 Subject: [PATCH 3/3] f --- e2e/cluster/cmx/cluster.go | 41 +++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/e2e/cluster/cmx/cluster.go b/e2e/cluster/cmx/cluster.go index 3d42d5b709..b219e9747c 100644 --- a/e2e/cluster/cmx/cluster.go +++ b/e2e/cluster/cmx/cluster.go @@ -101,6 +101,10 @@ func NewNodes(in *ClusterInput) ([]Node, error) { return nil, fmt.Errorf("unmarshal node: %v: %s", err, string(output)) } + // TODO (@salah): remove this once the bug is fixed in CMX + // note: the vm gets marked as ready before the services are actually running + time.Sleep(30 * time.Second) + for i := range nodes { in.T.Logf("%s: getting ssh endpoint for node ID: %s", time.Now().Format(time.RFC3339), nodes[i].ID) @@ -110,6 +114,10 @@ func NewNodes(in *ClusterInput) ([]Node, error) { } nodes[i].sshEndpoint = sshEndpoint + if err := waitForSSH(nodes[i], in.T); err != nil { + return nil, fmt.Errorf("wait for ssh to be available on node %d: %v", i, err) + } + privateIP, err := discoverPrivateIP(nodes[i]) if err != nil { return nil, fmt.Errorf("discover node private IP: %v", err) @@ -205,6 +213,28 @@ func getSSHEndpoint(nodeID string) (string, error) { return strings.TrimSpace(string(output)), nil } +func waitForSSH(node Node, t *testing.T) error { + timeout := time.After(5 * time.Minute) + tick := time.Tick(5 * time.Second) + var lastErr error + + for { + select { + case <-timeout: + return fmt.Errorf("timed out after 5 minutes: last error: %w", lastErr) + case <-tick: + t.Logf("%s: checking SSH connectivity to node ID: %s", time.Now().Format(time.RFC3339), node.ID) + stdout, stderr, err := runCommandOnNode(node, []string{"uptime"}) + t.Logf("%s: SSH attempt - stdout: %s, stderr: %s, err: %v", time.Now().Format(time.RFC3339), stdout, stderr, err) + if err == nil { + t.Logf("%s: SSH connection successful to node ID: %s", time.Now().Format(time.RFC3339), node.ID) + return nil + } + lastErr = fmt.Errorf("%w: stdout: %s: stderr: %s", err, stdout, stderr) + } + } +} + func (c *Cluster) Airgap() error { // Update network policy to airgap output, err := exec.Command("replicated", "network", "update", "policy", "--id", c.network.ID, "--policy=airgap").CombinedOutput() @@ -243,6 +273,7 @@ func (c *Cluster) waitUntilAirgapped(node int) error { func (c *Cluster) WaitForReboot() { c.waitForRunning() + c.waitForSSH() c.waitForClockSync() } @@ -302,6 +333,14 @@ func (c *Cluster) refreshNodes() error { return nil } +func (c *Cluster) waitForSSH() { + for i, node := range c.Nodes { + if err := waitForSSH(node, c.t); err != nil { + c.t.Fatalf("failed to wait for SSH to be available on node %d: %v", i, err) + } + } +} + func (c *Cluster) waitForClockSync() { timeout := time.After(5 * time.Minute) tick := time.Tick(5 * time.Second) @@ -320,7 +359,7 @@ func (c *Cluster) waitForClockSync() { continue } - c.t.Logf("clock sync on node %d (status: %s)", i, status) + c.t.Logf("clock synced on node %d (status: %s)", i, status) return } }