Skip to content

Commit 46d566b

Browse files
committed
[#2273] Prevent a GracefulShutdown if view is != #replicas or any cache is not in a HEALTHY state
1 parent e86b91a commit 46d566b

File tree

3 files changed

+97
-21
lines changed

3 files changed

+97
-21
lines changed

pkg/http/curl/curl.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ func (c *Client) executeCurlWithAuth(httpURL, headers, args string) (*http.Respo
120120
return nil, err
121121
}
122122

123+
// Handle anonymous endpoints such as /health/status that will not return a 401 response
124+
if rsp.StatusCode >= 200 && rsp.StatusCode < 300 {
125+
return processResponse(rsp)
126+
}
127+
123128
if rsp.StatusCode != http.StatusUnauthorized {
124129
return rsp, fmt.Errorf("expected 401 DIGEST response before content. Received '%s'", rsp.Status)
125130
}
@@ -172,9 +177,13 @@ func handleContent(reader *bufio.Reader) (*http.Response, error) {
172177
}
173178
}
174179

180+
return processResponse(rsp)
181+
}
182+
183+
func processResponse(rsp *http.Response) (*http.Response, error) {
175184
// Save response body
176185
b := new(bytes.Buffer)
177-
if _, err = io.Copy(b, rsp.Body); err != nil {
186+
if _, err := io.Copy(b, rsp.Body); err != nil {
178187
return nil, err
179188
}
180189
if err := rsp.Body.Close(); err != nil {

pkg/infinispan/client/api/infinispan.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ type HealthStatus string
106106

107107
const (
108108
HealthStatusDegraded HealthStatus = "DEGRADED"
109-
HealthStatusHealth HealthStatus = "HEALTHY"
109+
HealthStatusHealthy HealthStatus = "HEALTHY"
110110
HealthStatusHealthRebalancing HealthStatus = "HEALTHY_REBALANCING"
111111
HealthStatusFailed HealthStatus = "FAILED"
112112
)
@@ -152,9 +152,11 @@ type BackupRestoreResources struct {
152152
}
153153

154154
type ContainerInfo struct {
155-
Coordinator bool `json:"coordinator"`
156-
SitesView *[]interface{} `json:"sites_view,omitempty"`
157-
Version string `json:"version"`
155+
ClusterMembers []string `json:"cluster_members"`
156+
ClusterSize int32 `json:"cluster_size"`
157+
Coordinator bool `json:"coordinator"`
158+
SitesView *[]interface{} `json:"sites_view,omitempty"`
159+
Version string `json:"version"`
158160
}
159161

160162
type NotSupportedError struct {

pkg/reconcile/pipeline/infinispan/handler/manage/upgrades.go

Lines changed: 81 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77

88
ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
99
consts "github.com/infinispan/infinispan-operator/controllers/constants"
10+
ispnApi "github.com/infinispan/infinispan-operator/pkg/infinispan/client/api"
1011
"github.com/infinispan/infinispan-operator/pkg/infinispan/version"
11-
kube "github.com/infinispan/infinispan-operator/pkg/kubernetes"
1212
pipeline "github.com/infinispan/infinispan-operator/pkg/reconcile/pipeline/infinispan"
1313
"github.com/infinispan/infinispan-operator/pkg/reconcile/pipeline/infinispan/handler/provision"
1414
routev1 "github.com/openshift/api/route/v1"
@@ -130,7 +130,8 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
130130
// Initiate the GracefulShutdown if it's not already in progress
131131
if i.Spec.Replicas == 0 {
132132
logger.Info(".Spec.Replicas==0")
133-
if *statefulSet.Spec.Replicas != 0 {
133+
replicas := *statefulSet.Spec.Replicas
134+
if replicas != 0 {
134135
logger.Info("StatefulSet.Spec.Replicas!=0")
135136
// Only send a GracefulShutdown request to the server if it hasn't succeeded already
136137
if !i.IsConditionTrue(ispnv1.ConditionStopping) {
@@ -141,18 +142,80 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
141142
return
142143
}
143144

144-
var rebalanceDisabled bool
145+
type PodMeta struct {
146+
client ispnApi.Infinispan
147+
skip bool
148+
state string
149+
}
150+
151+
// Loop through all pods to initiate the clients, determine that all pods have the expected
152+
// number of cluster members and ensure that the cluster is in a HEALTHY state
153+
var podMetaMap = make(map[string]*PodMeta, len(podList.Items))
154+
var shutdownAlreadyInitiated bool
145155
for _, pod := range podList.Items {
146-
ispnClient, err := ctx.InfinispanClientUnknownVersion(pod.Name)
156+
podMeta := &PodMeta{}
157+
podMetaMap[pod.Name] = podMeta
158+
podMeta.client, err = ctx.InfinispanClientUnknownVersion(pod.Name)
147159
if err != nil {
148160
if shutdown, state := containerAlreadyShutdown(err); shutdown {
149-
logger.Info("Skipping pod whose cache-container has already been shutdown by the Operator", "pod", pod.Name, "state", state)
161+
podMeta.skip, podMeta.state = true, state
162+
logger.Info("At least one cache-container has already been shutdown by the Operator, resuming shutdown", "pod", pod.Name, "state", state)
163+
shutdownAlreadyInitiated = true
164+
// Continue processing other pods here as it's possible that one or more pods still haven't
165+
// been shutdown and we need to initiate the client
150166
continue
151167
}
152-
ctx.Requeue(fmt.Errorf("unable to create Infinispan client for cluster being upgraded: %w", err))
168+
ctx.Requeue(fmt.Errorf("unable to create Infinispan client to determine if split-brain is present: %w", err))
169+
return
170+
}
171+
172+
// If one or more of the pods have already been shutdown then we must continue to shutdown the remaining
173+
// members of the cluster
174+
if shutdownAlreadyInitiated {
175+
continue
176+
}
177+
178+
info, err := podMeta.client.Container().Info()
179+
if err != nil {
180+
ctx.Requeue(fmt.Errorf("unable to retrieve cache-container info for pod '%s': %w", pod.Name, err))
153181
return
154182
}
155183

184+
if info.ClusterSize != replicas {
185+
err = fmt.Errorf(
186+
"unable to proceed with GracefulShutdown as pod '%s' has '%d' cluster members, expected '%d'. Members: '%s'",
187+
pod.Name,
188+
info.ClusterSize,
189+
i.Spec.Replicas,
190+
strings.Join(info.ClusterMembers, ","),
191+
)
192+
ctx.Requeue(err)
193+
return
194+
}
195+
196+
health, err := podMeta.client.Container().HealthStatus()
197+
if err != nil {
198+
ctx.Requeue(fmt.Errorf("unable to retrieve cluster health status for pod '%s': %w", pod.Name, err))
199+
return
200+
}
201+
202+
// If any of the caches are not marked as HEALTHY we must prevent a GracefulShutdown to prevent
203+
// the cluster from entering an unexpected state
204+
if health != ispnApi.HealthStatusHealthy {
205+
ctx.Requeue(fmt.Errorf("unable to proceed with GracefulShutdown as the cluster health is '%s'", health))
206+
return
207+
}
208+
}
209+
210+
var rebalanceDisabled bool
211+
for _, pod := range podList.Items {
212+
podMeta := podMetaMap[pod.Name]
213+
if podMeta.skip {
214+
logger.Info("Skipping pod whose cache-container has already been shutdown by the Operator", "pod", pod, "state", podMeta.state)
215+
continue
216+
}
217+
ispnClient := podMeta.client
218+
156219
// Disabling rebalancing is a cluster-wide operation so we only need to perform this on a single pod
157220
// However, multiple calls to this endpoint should be safe, so it's ok if a subsequent reconciliation
158221
// executes this again
@@ -164,13 +227,11 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
164227
rebalanceDisabled = true
165228
}
166229

167-
if kube.IsPodReady(pod) {
168-
if err := ispnClient.Container().Shutdown(); err != nil {
169-
ctx.Requeue(fmt.Errorf("error encountered on container shutdown: %w", err))
170-
return
171-
} else {
172-
logger.Info("Executed Container Shutdown on pod: ", "Pod.Name", pod.Name)
173-
}
230+
if err := ispnClient.Container().Shutdown(); err != nil {
231+
ctx.Requeue(fmt.Errorf("error encountered on container shutdown: %w", err))
232+
return
233+
} else {
234+
logger.Info("Executed Container Shutdown on pod: ", "Pod.Name", pod.Name)
174235
}
175236
}
176237

@@ -286,10 +347,14 @@ func EnableRebalanceAfterScaleUp(i *ispnv1.Infinispan, ctx pipeline.Context) {
286347
return
287348
}
288349

289-
if members, err := ispnClient.Container().Members(); err != nil {
290-
ctx.Requeue(fmt.Errorf("unable to retrieve cluster members on scale up: %w", err))
350+
// TODO why is this failing in TestOperandUpgrade for 14.0.x servers?
351+
/*
352+
2025-05-16T17:11:58.093+0100 ERROR Reconciler error {"controller": "infinispan", "controllerGroup": "infinispan.org", "controllerKind": "Infinispan", "infinispan": {"name":"test-operand-upgrade","namespace":"namespace-for-testing"}, "namespace": "namespace-for-testing", "name": "test-operand-upgrade", "reconcileID": "eda96c7e-fd83-43cc-b903-4a623e4e2785", "error": "unable to retrieve cluster information on scale up: unexpected error getting cache manager info: stderr: , err: the server does not allow this method on the requested resource"}
353+
*/
354+
if info, err := ispnClient.Container().Info(); err != nil {
355+
ctx.Requeue(fmt.Errorf("unable to retrieve cluster information on scale up: %w", err))
291356
return
292-
} else if len(members) != int(i.Spec.Replicas) {
357+
} else if info.ClusterSize != i.Spec.Replicas {
293358
ctx.Log().Info("waiting for cluster to form", "replicas", i.Spec.Replicas)
294359
ctx.RequeueAfter(consts.DefaultWaitClusterPodsNotReady, nil)
295360
return

0 commit comments

Comments
 (0)