Skip to content

Commit 79e5d2f

Browse files
committed
[#2273] Prevent a GracefulShutdown if view is != #replicas or any cache is not in a HEALTHY state
- Make org.infinispan.LOCKS cache as AVAILABLE automatically in upgrade testsuite to workaround known issues such as: https://issues.redhat.com/browse/ISPN-15191
1 parent e86b91a commit 79e5d2f

File tree

9 files changed

+225
-36
lines changed

9 files changed

+225
-36
lines changed

pkg/http/curl/curl.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ func (c *Client) executeCurlWithAuth(httpURL, headers, args string) (*http.Respo
120120
return nil, err
121121
}
122122

123+
// Handle anonymous endpoints such as /health/status that will not return a 401 response
124+
if rsp.StatusCode >= 200 && rsp.StatusCode < 300 {
125+
return processResponse(rsp)
126+
}
127+
123128
if rsp.StatusCode != http.StatusUnauthorized {
124129
return rsp, fmt.Errorf("expected 401 DIGEST response before content. Received '%s'", rsp.Status)
125130
}
@@ -172,9 +177,13 @@ func handleContent(reader *bufio.Reader) (*http.Response, error) {
172177
}
173178
}
174179

180+
return processResponse(rsp)
181+
}
182+
183+
func processResponse(rsp *http.Response) (*http.Response, error) {
175184
// Save response body
176185
b := new(bytes.Buffer)
177-
if _, err = io.Copy(b, rsp.Body); err != nil {
186+
if _, err := io.Copy(b, rsp.Body); err != nil {
178187
return nil, err
179188
}
180189
if err := rsp.Body.Close(); err != nil {

pkg/infinispan/client/api/infinispan.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Infinispan interface {
2424
type Container interface {
2525
Info() (*ContainerInfo, error)
2626
Backups() Backups
27+
Health() (*Health, error)
2728
HealthStatus() (HealthStatus, error)
2829
Members() ([]string, error)
2930
RebalanceDisable() error
@@ -101,12 +102,26 @@ type Xsite interface {
101102
PushAllState() error
102103
}
103104

105+
type Health struct {
106+
ClusterHealth ClusterHealth `json:"cluster_health"`
107+
CacheHealth []CacheHealth `json:"cache_health"`
108+
}
109+
110+
type CacheHealth struct {
111+
Name string `json:"cache_name"`
112+
Status HealthStatus `json:"status"`
113+
}
114+
115+
type ClusterHealth struct {
116+
Status HealthStatus `json:"health_status"`
117+
}
118+
104119
// HealthStatus indicated the possible statuses of the Infinispan server
105120
type HealthStatus string
106121

107122
const (
108123
HealthStatusDegraded HealthStatus = "DEGRADED"
109-
HealthStatusHealth HealthStatus = "HEALTHY"
124+
HealthStatusHealthy HealthStatus = "HEALTHY"
110125
HealthStatusHealthRebalancing HealthStatus = "HEALTHY_REBALANCING"
111126
HealthStatusFailed HealthStatus = "FAILED"
112127
)
@@ -152,9 +167,11 @@ type BackupRestoreResources struct {
152167
}
153168

154169
type ContainerInfo struct {
155-
Coordinator bool `json:"coordinator"`
156-
SitesView *[]interface{} `json:"sites_view,omitempty"`
157-
Version string `json:"version"`
170+
ClusterMembers []string `json:"cluster_members"`
171+
ClusterSize int32 `json:"cluster_size"`
172+
Coordinator bool `json:"coordinator"`
173+
SitesView *[]interface{} `json:"sites_view,omitempty"`
174+
Version string `json:"version"`
158175
}
159176

160177
type NotSupportedError struct {

pkg/infinispan/client/v14/container.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,21 @@ func (c *Container) Info() (info *api.ContainerInfo, err error) {
3131
return
3232
}
3333

34+
func (c *Container) Health() (status *api.Health, err error) {
35+
rsp, err := c.Get(c.CacheManager("/health"), nil)
36+
defer func() {
37+
err = httpClient.CloseBody(rsp, err)
38+
}()
39+
40+
if err = httpClient.ValidateResponse(rsp, err, "getting cache manager health", http.StatusOK); err != nil {
41+
return
42+
}
43+
if err = json.NewDecoder(rsp.Body).Decode(&status); err != nil {
44+
return nil, fmt.Errorf("unable to decode: %w", err)
45+
}
46+
return
47+
}
48+
3449
func (c *Container) HealthStatus() (status api.HealthStatus, err error) {
3550
rsp, err := c.Get(c.CacheManager("/health/status"), nil)
3651
defer func() {

pkg/reconcile/pipeline/infinispan/handler/manage/upgrades.go

Lines changed: 95 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77

88
ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
99
consts "github.com/infinispan/infinispan-operator/controllers/constants"
10+
ispnApi "github.com/infinispan/infinispan-operator/pkg/infinispan/client/api"
1011
"github.com/infinispan/infinispan-operator/pkg/infinispan/version"
11-
kube "github.com/infinispan/infinispan-operator/pkg/kubernetes"
1212
pipeline "github.com/infinispan/infinispan-operator/pkg/reconcile/pipeline/infinispan"
1313
"github.com/infinispan/infinispan-operator/pkg/reconcile/pipeline/infinispan/handler/provision"
1414
routev1 "github.com/openshift/api/route/v1"
@@ -130,7 +130,8 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
130130
// Initiate the GracefulShutdown if it's not already in progress
131131
if i.Spec.Replicas == 0 {
132132
logger.Info(".Spec.Replicas==0")
133-
if *statefulSet.Spec.Replicas != 0 {
133+
replicas := *statefulSet.Spec.Replicas
134+
if replicas != 0 {
134135
logger.Info("StatefulSet.Spec.Replicas!=0")
135136
// Only send a GracefulShutdown request to the server if it hasn't succeeded already
136137
if !i.IsConditionTrue(ispnv1.ConditionStopping) {
@@ -141,18 +142,94 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
141142
return
142143
}
143144

144-
var rebalanceDisabled bool
145+
type ServerMeta struct {
146+
client ispnApi.Infinispan
147+
skip bool
148+
state string
149+
}
150+
151+
// Loop through all pods to initiate the clients, determine that all pods have the expected
152+
// number of cluster members and ensure that the cluster is in a HEALTHY state
153+
var podMetaMap = make(map[string]*ServerMeta, len(podList.Items))
154+
var shutdownAlreadyInitiated bool
145155
for _, pod := range podList.Items {
146-
ispnClient, err := ctx.InfinispanClientUnknownVersion(pod.Name)
156+
podMeta := &ServerMeta{}
157+
podMetaMap[pod.Name] = podMeta
158+
podMeta.client, err = ctx.InfinispanClientUnknownVersion(pod.Name)
147159
if err != nil {
148160
if shutdown, state := containerAlreadyShutdown(err); shutdown {
149-
logger.Info("Skipping pod whose cache-container has already been shutdown by the Operator", "pod", pod.Name, "state", state)
161+
podMeta.skip, podMeta.state = true, state
162+
logger.Info("At least one cache-container has already been shutdown by the Operator, resuming shutdown", "pod", pod.Name, "state", state)
163+
shutdownAlreadyInitiated = true
164+
// Continue processing other pods here as it's possible that one or more pods still haven't
165+
// been shutdown and we need to initiate the client
150166
continue
151167
}
152-
ctx.Requeue(fmt.Errorf("unable to create Infinispan client for cluster being upgraded: %w", err))
168+
ctx.Requeue(fmt.Errorf("unable to create Infinispan client to determine if split-brain is present: %w", err))
169+
return
170+
}
171+
172+
// If one or more of the pods have already been shutdown then we must continue to shutdown the remaining
173+
// members of the cluster
174+
if shutdownAlreadyInitiated {
175+
continue
176+
}
177+
178+
info, err := podMeta.client.Container().Info()
179+
if err != nil {
180+
ctx.Requeue(fmt.Errorf("unable to retrieve cache-container info for pod '%s': %w", pod.Name, err))
181+
return
182+
}
183+
184+
updateCondition := func(msg string) {
185+
err := ctx.UpdateInfinispan(func() {
186+
i.SetCondition(ispnv1.ConditionStopping, metav1.ConditionFalse, msg)
187+
})
188+
if err != nil {
189+
ctx.Log().Error(err, "unable to set condition to False", "condition", "stopping")
190+
}
191+
}
192+
193+
if info.ClusterSize != replicas {
194+
msg := fmt.Sprintf(
195+
"unable to proceed with GracefulShutdown as pod '%s' has '%d' cluster members, expected '%d'. Members: '%s'",
196+
pod.Name,
197+
info.ClusterSize,
198+
i.Spec.Replicas,
199+
strings.Join(info.ClusterMembers, ","),
200+
)
201+
updateCondition(msg)
202+
ctx.EventRecorder().Event(i, corev1.EventTypeWarning, "GracefulShutdownBlocked", msg)
203+
ctx.Requeue(errors.New(msg))
153204
return
154205
}
155206

207+
health, err := podMeta.client.Container().Health()
208+
if err != nil {
209+
ctx.Requeue(fmt.Errorf("unable to retrieve cluster health status for pod '%s': %w", pod.Name, err))
210+
return
211+
}
212+
213+
// If any of the caches are not marked as HEALTHY we must prevent a GracefulShutdown to prevent
214+
// the cluster from entering an unexpected state
215+
if health.ClusterHealth.Status != ispnApi.HealthStatusHealthy {
216+
msg := fmt.Sprintf("unable to proceed with GracefulShutdown as the cluster health is '%s': %s", health.ClusterHealth.Status, health.CacheHealth)
217+
updateCondition(msg)
218+
ctx.EventRecorder().Event(i, corev1.EventTypeWarning, "GracefulShutdownBlocked", msg)
219+
ctx.Requeue(errors.New(msg))
220+
return
221+
}
222+
}
223+
224+
var rebalanceDisabled bool
225+
for _, pod := range podList.Items {
226+
podMeta := podMetaMap[pod.Name]
227+
if podMeta.skip {
228+
logger.Info("Skipping pod whose cache-container has already been shutdown by the Operator", "pod", pod, "state", podMeta.state)
229+
continue
230+
}
231+
ispnClient := podMeta.client
232+
156233
// Disabling rebalancing is a cluster-wide operation so we only need to perform this on a single pod
157234
// However, multiple calls to this endpoint should be safe, so it's ok if a subsequent reconciliation
158235
// executes this again
@@ -164,13 +241,11 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
164241
rebalanceDisabled = true
165242
}
166243

167-
if kube.IsPodReady(pod) {
168-
if err := ispnClient.Container().Shutdown(); err != nil {
169-
ctx.Requeue(fmt.Errorf("error encountered on container shutdown: %w", err))
170-
return
171-
} else {
172-
logger.Info("Executed Container Shutdown on pod: ", "Pod.Name", pod.Name)
173-
}
244+
if err := ispnClient.Container().Shutdown(); err != nil {
245+
ctx.Requeue(fmt.Errorf("error encountered on container shutdown: %w", err))
246+
return
247+
} else {
248+
logger.Info("Executed Container Shutdown on pod: ", "Pod.Name", pod.Name)
174249
}
175250
}
176251

@@ -286,10 +361,14 @@ func EnableRebalanceAfterScaleUp(i *ispnv1.Infinispan, ctx pipeline.Context) {
286361
return
287362
}
288363

289-
if members, err := ispnClient.Container().Members(); err != nil {
290-
ctx.Requeue(fmt.Errorf("unable to retrieve cluster members on scale up: %w", err))
364+
// TODO why is this failing in TestOperandUpgrade for 14.0.x servers?
365+
/*
366+
2025-05-16T17:11:58.093+0100 ERROR Reconciler error {"controller": "infinispan", "controllerGroup": "infinispan.org", "controllerKind": "Infinispan", "infinispan": {"name":"test-operand-upgrade","namespace":"namespace-for-testing"}, "namespace": "namespace-for-testing", "name": "test-operand-upgrade", "reconcileID": "eda96c7e-fd83-43cc-b903-4a623e4e2785", "error": "unable to retrieve cluster information on scale up: unexpected error getting cache manager info: stderr: , err: the server does not allow this method on the requested resource"}
367+
*/
368+
if info, err := ispnClient.Container().Info(); err != nil {
369+
ctx.Requeue(fmt.Errorf("unable to retrieve cluster information on scale up: %w", err))
291370
return
292-
} else if len(members) != int(i.Spec.Replicas) {
371+
} else if info.ClusterSize != i.Spec.Replicas {
293372
ctx.Log().Info("waiting for cluster to form", "replicas", i.Spec.Replicas)
294373
ctx.RequeueAfter(consts.DefaultWaitClusterPodsNotReady, nil)
295374
return

test/e2e/infinispan/upgrade_operand_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package infinispan
22

33
import (
44
"context"
5+
"strings"
56
"testing"
67

78
ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
@@ -11,6 +12,7 @@ import (
1112
"github.com/infinispan/infinispan-operator/pkg/mime"
1213
"github.com/infinispan/infinispan-operator/pkg/reconcile/pipeline/infinispan/handler/provision"
1314
tutils "github.com/infinispan/infinispan-operator/test/e2e/utils"
15+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1416
"k8s.io/utils/pointer"
1517

1618
"github.com/stretchr/testify/assert"
@@ -422,6 +424,35 @@ func TestPodAlreadyShutdownOnUpgrade(t *testing.T) {
422424
assert.EqualValues(t, int32(0), ss.Status.UpdatedReplicas)
423425
}
424426

427+
func TestScaleDownBlockedWithDegradedCache(t *testing.T) {
428+
defer testKube.CleanNamespaceAndLogOnPanic(t, tutils.Namespace)
429+
430+
replicas := 1
431+
ispn := tutils.DefaultSpec(t, testKube, func(i *ispnv1.Infinispan) {
432+
i.Spec.Replicas = int32(replicas)
433+
})
434+
testKube.CreateInfinispan(ispn, tutils.Namespace)
435+
testKube.WaitForInfinispanPods(replicas, tutils.SinglePodTimeout, ispn.Name, tutils.Namespace)
436+
ispn = testKube.WaitForInfinispanCondition(ispn.Name, ispn.Namespace, ispnv1.ConditionWellFormed)
437+
438+
_client := tutils.HTTPClientForClusterWithVersionManager(ispn, testKube, tutils.VersionManager())
439+
cacheName := "cache"
440+
cacheConfig := "<distributed-cache><partition-handling when-split=\"DENY_READ_WRITES\" merge-policy=\"PREFERRED_ALWAYS\"/></distributed-cache>"
441+
cacheHelper := tutils.NewCacheHelper(cacheName, _client)
442+
cacheHelper.Create(cacheConfig, mime.ApplicationXml)
443+
cacheHelper.Available(false)
444+
445+
tutils.ExpectNoError(
446+
testKube.UpdateInfinispan(ispn, func() {
447+
ispn.Spec.Replicas = 0
448+
}),
449+
)
450+
testKube.WaitForInfinispanState(ispn.Name, ispn.Namespace, func(i *ispnv1.Infinispan) bool {
451+
c := i.GetCondition(ispnv1.ConditionStopping)
452+
return c.Status == metav1.ConditionFalse && strings.Contains(c.Message, "unable to proceed with GracefulShutdown as the cluster health is 'DEGRADED'")
453+
})
454+
}
455+
425456
// specImageOperands() returns two latest Operands with the matching major/minor version
426457
func specImageOperands() (*version.Operand, *version.Operand) {
427458
operands := tutils.VersionManager().Operands

test/e2e/upgrade/common.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,10 @@ func checkBatch(t *testing.T, infinispan *ispnv1.Infinispan) {
259259
testKube.DeleteBatch(batch)
260260
}
261261

262-
func assertNoDegradedCaches() {
262+
func locksCacheDegraded(operand version.Operand) bool {
263+
if operand.UpstreamVersion.Major > 14 {
264+
return false
265+
}
263266
podList := &corev1.PodList{}
264267
tutils.ExpectNoError(
265268
testKube.Kubernetes.ResourcesList(tutils.Namespace, map[string]string{"app": "infinispan-pod"}, podList, ctx),
@@ -269,7 +272,15 @@ func assertNoDegradedCaches() {
269272
log, err := testKube.Kubernetes.Logs(provision.InfinispanContainer, pod.GetName(), pod.GetNamespace(), false, ctx)
270273
tutils.ExpectNoError(err)
271274
if strings.Contains(log, "DEGRADED_MODE") {
272-
panic("Detected a cache in DEGRADED_MODE, unable to continue with test")
275+
return true
273276
}
274277
}
278+
return false
279+
}
280+
281+
// healDegradedLocksCache forces the org.infinispan.LOCKS cache to AVAILABLE so that subsequent upgrades can proceed
282+
// can be removed once Infinispan 14.0.x servers are no longer supported
283+
func healDegradedLocksCache(operand version.Operand, client tutils.HTTPClient) {
284+
tutils.Log().Infof("Setting org.infinispan.LOCKS cache to AVAILABLE for Operand %s", operand)
285+
tutils.NewCacheHelperForOperand("org.infinispan.LOCKS", operand, client).Available(true)
275286
}

test/e2e/upgrade/upgrade_operands_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@ func TestOperandUpgrades(t *testing.T) {
4141

4242
// Create the Infinispan CR
4343
replicas := 2
44+
startingOperand := versionManager.Operands[startingOperandIdx]
4445
ispn := tutils.DefaultSpec(t, testKube, func(i *ispnv1.Infinispan) {
4546
i.Spec.Replicas = int32(replicas)
4647
i.Spec.Service.Container.EphemeralStorage = false
4748
i.Spec.Logging.Categories["org.infinispan.topology"] = ispnv1.LoggingLevelTrace
48-
i.Spec.Version = versionManager.Operands[startingOperandIdx].Ref()
49+
i.Spec.Version = startingOperand.Ref()
4950
})
5051

5152
log.Infof("Testing upgrades from Operand '%s' to '%s'", ispn.Spec.Version, versionManager.Latest().Ref())
@@ -56,15 +57,17 @@ func TestOperandUpgrades(t *testing.T) {
5657
numEntries := 100
5758
client := tutils.HTTPClientForClusterWithVersionManager(ispn, testKube, versionManager)
5859

60+
if op := *startingOperand; locksCacheDegraded(op) {
61+
healDegradedLocksCache(op, client)
62+
}
63+
5964
// Add a persistent cache with data to ensure contents can be read after upgrade(s)
6065
createAndPopulatePersistentCache(persistentCacheName, numEntries, client)
6166

6267
// Add a volatile cache with data to ensure contents can be backed up and then restored after upgrade(s)
6368
createAndPopulateVolatileCache(volatileCacheName, numEntries, client)
6469

65-
assertNoDegradedCaches()
6670
backup := createBackupAndWaitToSucceed(ispn.Name, t)
67-
6871
skippedOperands := tutils.OperandSkipSet()
6972
for _, operand := range versionManager.Operands[startingOperandIdx:] {
7073
// Skip an Operand in the upgrade graph if there's a known issue
@@ -75,6 +78,10 @@ func TestOperandUpgrades(t *testing.T) {
7578
log.Debugf("Next Operand %s", operand.Ref())
7679

7780
ispn = testKube.WaitForInfinispanConditionWithTimeout(ispn.Name, tutils.Namespace, ispnv1.ConditionWellFormed, conditionTimeout)
81+
if op := *operand; locksCacheDegraded(op) {
82+
healDegradedLocksCache(op, client)
83+
}
84+
7885
tutils.ExpectNoError(
7986
testKube.UpdateInfinispan(ispn, func() {
8087
ispn.Spec.Version = operand.Ref()
@@ -89,8 +96,6 @@ func TestOperandUpgrades(t *testing.T) {
8996
i.Status.Operand.Phase == ispnv1.OperandPhaseRunning
9097
})
9198
log.Info("Upgrade complete")
92-
assertOperandImage(operand.Image, ispn)
93-
assertNoDegradedCaches()
9499

95100
// Ensure that persistent cache entries have survived the upgrade(s)
96101
// Refresh the hostAddr and client as the url will change if NodePort is used.

0 commit comments

Comments
 (0)