Skip to content

Prevent graceful shutdown on split brain #2277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_hr_rolling_upgrades.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Run Hot Rod Rolling Upgrade Tests
run: make hotrod-upgrade-test
env:
SUBSCRIPTION_STARTING_CSV: infinispan-operator.v2.3.2
SUBSCRIPTION_STARTING_CSV: infinispan-operator.v2.3.7
TESTING_OPERAND_IGNORE_LIST: ${{ inputs.skipList }}
INFINISPAN_CPU: 500m # prevent insufficient cpu error on test-rolling-upgrade pod start

Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/test_upgrades.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ jobs:
exit 1
fi
else
export SUBSCRIPTION_STARTING_CSV=infinispan-operator.v2.3.2
# We must start on a server version greater than 14.0.23.Final to prevent known clustering issues during upgrades
# See commit message for a detailed list
export SUBSCRIPTION_STARTING_CSV=infinispan-operator.v2.3.7
fi
make upgrade-test
env:
Expand Down
11 changes: 10 additions & 1 deletion pkg/http/curl/curl.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (c *Client) executeCurlWithAuth(httpURL, headers, args string) (*http.Respo
return nil, err
}

// Handle anonymous endpoints such as /health/status that will not return a 401 response
if rsp.StatusCode >= 200 && rsp.StatusCode < 300 {
return processResponse(rsp)
}

if rsp.StatusCode != http.StatusUnauthorized {
return rsp, fmt.Errorf("expected 401 DIGEST response before content. Received '%s'", rsp.Status)
}
Expand Down Expand Up @@ -172,9 +177,13 @@ func handleContent(reader *bufio.Reader) (*http.Response, error) {
}
}

return processResponse(rsp)
}

func processResponse(rsp *http.Response) (*http.Response, error) {
// Save response body
b := new(bytes.Buffer)
if _, err = io.Copy(b, rsp.Body); err != nil {
if _, err := io.Copy(b, rsp.Body); err != nil {
return nil, err
}
if err := rsp.Body.Close(); err != nil {
Expand Down
25 changes: 21 additions & 4 deletions pkg/infinispan/client/api/infinispan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Infinispan interface {
type Container interface {
Info() (*ContainerInfo, error)
Backups() Backups
Health() (*Health, error)
HealthStatus() (HealthStatus, error)
Members() ([]string, error)
RebalanceDisable() error
Expand Down Expand Up @@ -101,12 +102,26 @@ type Xsite interface {
PushAllState() error
}

type Health struct {
ClusterHealth ClusterHealth `json:"cluster_health"`
CacheHealth []CacheHealth `json:"cache_health"`
}

type CacheHealth struct {
Name string `json:"cache_name"`
Status HealthStatus `json:"status"`
}

type ClusterHealth struct {
Status HealthStatus `json:"health_status"`
}

// HealthStatus indicated the possible statuses of the Infinispan server
type HealthStatus string

const (
HealthStatusDegraded HealthStatus = "DEGRADED"
HealthStatusHealth HealthStatus = "HEALTHY"
HealthStatusHealthy HealthStatus = "HEALTHY"
HealthStatusHealthRebalancing HealthStatus = "HEALTHY_REBALANCING"
HealthStatusFailed HealthStatus = "FAILED"
)
Expand Down Expand Up @@ -152,9 +167,11 @@ type BackupRestoreResources struct {
}

type ContainerInfo struct {
Coordinator bool `json:"coordinator"`
SitesView *[]interface{} `json:"sites_view,omitempty"`
Version string `json:"version"`
ClusterMembers []string `json:"cluster_members"`
ClusterSize int32 `json:"cluster_size"`
Coordinator bool `json:"coordinator"`
SitesView *[]interface{} `json:"sites_view,omitempty"`
Version string `json:"version"`
}

type NotSupportedError struct {
Expand Down
15 changes: 15 additions & 0 deletions pkg/infinispan/client/v14/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ func (c *Container) Info() (info *api.ContainerInfo, err error) {
return
}

func (c *Container) Health() (status *api.Health, err error) {
rsp, err := c.Get(c.CacheManager("/health"), nil)
defer func() {
err = httpClient.CloseBody(rsp, err)
}()

if err = httpClient.ValidateResponse(rsp, err, "getting cache manager health", http.StatusOK); err != nil {
return
}
if err = json.NewDecoder(rsp.Body).Decode(&status); err != nil {
return nil, fmt.Errorf("unable to decode: %w", err)
}
return
}

func (c *Container) HealthStatus() (status api.HealthStatus, err error) {
rsp, err := c.Get(c.CacheManager("/health/status"), nil)
defer func() {
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconcile/pipeline/infinispan/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func (c *contextImpl) InfinispanClient() (api.Infinispan, error) {
break
}
}
if pod == "" {
return nil, fmt.Errorf("unable to create Infinispan client, no ready pods exist")
}
c.ispnClient = c.InfinispanClientForPod(pod)
return c.ispnClient, nil
}
Expand Down
119 changes: 102 additions & 17 deletions pkg/reconcile/pipeline/infinispan/handler/manage/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (

ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
consts "github.com/infinispan/infinispan-operator/controllers/constants"
ispnApi "github.com/infinispan/infinispan-operator/pkg/infinispan/client/api"
"github.com/infinispan/infinispan-operator/pkg/infinispan/version"
kube "github.com/infinispan/infinispan-operator/pkg/kubernetes"
pipeline "github.com/infinispan/infinispan-operator/pkg/reconcile/pipeline/infinispan"
"github.com/infinispan/infinispan-operator/pkg/reconcile/pipeline/infinispan/handler/provision"
routev1 "github.com/openshift/api/route/v1"
Expand Down Expand Up @@ -130,7 +130,8 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
// Initiate the GracefulShutdown if it's not already in progress
if i.Spec.Replicas == 0 {
logger.Info(".Spec.Replicas==0")
if *statefulSet.Spec.Replicas != 0 {
replicas := *statefulSet.Spec.Replicas
if replicas != 0 {
logger.Info("StatefulSet.Spec.Replicas!=0")
// Only send a GracefulShutdown request to the server if it hasn't succeeded already
if !i.IsConditionTrue(ispnv1.ConditionStopping) {
Expand All @@ -141,18 +142,94 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
return
}

var rebalanceDisabled bool
type ServerMeta struct {
client ispnApi.Infinispan
skip bool
state string
}

// Loop through all pods to initiate the clients, determine that all pods have the expected
// number of cluster members and ensure that the cluster is in a HEALTHY state
var serverMetaMap = make(map[string]*ServerMeta, len(podList.Items))
var shutdownAlreadyInitiated bool
for _, pod := range podList.Items {
ispnClient, err := ctx.InfinispanClientUnknownVersion(pod.Name)
serverMeta := &ServerMeta{}
serverMetaMap[pod.Name] = serverMeta
serverMeta.client, err = ctx.InfinispanClientUnknownVersion(pod.Name)
if err != nil {
if shutdown, state := containerAlreadyShutdown(err); shutdown {
logger.Info("Skipping pod whose cache-container has already been shutdown by the Operator", "pod", pod.Name, "state", state)
serverMeta.skip, serverMeta.state = true, state
logger.Info("At least one cache-container has already been shutdown by the Operator, resuming shutdown", "pod", pod.Name, "state", state)
shutdownAlreadyInitiated = true
// Continue processing other pods here as it's possible that one or more pods still haven't
// been shutdown and we need to initiate the client
continue
}
ctx.Requeue(fmt.Errorf("unable to create Infinispan client for cluster being upgraded: %w", err))
ctx.Requeue(fmt.Errorf("unable to create Infinispan client to determine if cluster is healthy: %w", err))
return
}

// If one or more of the pods have already been shutdown then we must continue to shutdown the remaining
// members of the cluster
if shutdownAlreadyInitiated {
continue
}

info, err := serverMeta.client.Container().Info()
if err != nil {
ctx.Requeue(fmt.Errorf("unable to retrieve cache-container info for pod '%s': %w", pod.Name, err))
return
}

updateCondition := func(msg string) {
err := ctx.UpdateInfinispan(func() {
i.SetCondition(ispnv1.ConditionStopping, metav1.ConditionFalse, msg)
})
if err != nil {
ctx.Log().Error(err, "unable to set condition to False", "condition", "stopping")
}
}

if info.ClusterSize != replicas {
msg := fmt.Sprintf(
"unable to proceed with GracefulShutdown as pod '%s' has '%d' cluster members, expected '%d'. Members: '%s'",
pod.Name,
info.ClusterSize,
i.Spec.Replicas,
strings.Join(info.ClusterMembers, ","),
)
updateCondition(msg)
ctx.EventRecorder().Event(i, corev1.EventTypeWarning, "GracefulShutdownBlocked", msg)
ctx.RequeueAfter(consts.DefaultWaitClusterPodsNotReady, errors.New(msg))
return
}

health, err := serverMeta.client.Container().Health()
if err != nil {
ctx.Requeue(fmt.Errorf("unable to retrieve cluster health status for pod '%s': %w", pod.Name, err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, health is the new endpoint that only verifies if the process is alive?

Copy link
Contributor Author

@ryanemerson ryanemerson May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently calling /health/status and using the cluster_health field which is determined by https://github.com/infinispan/infinispan/blob/3caa2cdf0dc191603067ab32353f072a966125a7/core/src/main/java/org/infinispan/health/impl/ClusterHealthImpl.java#L33-L41.

I think this should remain using this endpoint as this way we ensure we don't proceed with an upgrade if there is one or more DEGRADED caches.

So maybe we shouldn't deprecate container/health/status after all?

return
}

// If any of the caches are not marked as HEALTHY we must prevent a GracefulShutdown to prevent
// the cluster from entering an unexpected state
if health.ClusterHealth.Status != ispnApi.HealthStatusHealthy {
msg := fmt.Sprintf("unable to proceed with GracefulShutdown as the cluster health is '%s': %s", health.ClusterHealth.Status, health.CacheHealth)
updateCondition(msg)
ctx.EventRecorder().Event(i, corev1.EventTypeWarning, "GracefulShutdownBlocked", msg)
ctx.RequeueAfter(consts.DefaultWaitClusterPodsNotReady, errors.New(msg))
return
}
}

var rebalanceDisabled bool
for _, pod := range podList.Items {
serverMeta := serverMetaMap[pod.Name]
if serverMeta.skip {
logger.Info("Skipping pod whose cache-container has already been shutdown by the Operator", "pod", pod, "state", serverMeta.state)
continue
}
ispnClient := serverMeta.client

// Disabling rebalancing is a cluster-wide operation so we only need to perform this on a single pod
// However, multiple calls to this endpoint should be safe, so it's ok if a subsequent reconciliation
// executes this again
Expand All @@ -164,13 +241,11 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
rebalanceDisabled = true
}

if kube.IsPodReady(pod) {
if err := ispnClient.Container().Shutdown(); err != nil {
ctx.Requeue(fmt.Errorf("error encountered on container shutdown: %w", err))
return
} else {
logger.Info("Executed Container Shutdown on pod: ", "Pod.Name", pod.Name)
}
if err := ispnClient.Container().Shutdown(); err != nil {
ctx.Requeue(fmt.Errorf("error encountered on container shutdown: %w", err))
return
} else {
logger.Info("Executed Container Shutdown on pod: ", "Pod.Name", pod.Name)
}
}

Expand Down Expand Up @@ -280,16 +355,26 @@ func EnableRebalanceAfterScaleUp(i *ispnv1.Infinispan, ctx pipeline.Context) {
return
}

if podList, err := ctx.InfinispanPods(); err != nil {
ctx.Requeue(err)
return
} else if len(podList.Items) != int(i.Spec.Replicas) {
ctx.Log().Info("Waiting on cluster pods to be provisioned", "pods", len(podList.Items), "spec.replicas", i.Spec.Replicas)
ctx.RequeueAfter(consts.DefaultWaitClusterPodsNotReady, nil)
return
}

ispnClient, err := ctx.InfinispanClient()
if err != nil {
ctx.Requeue(err)
ctx.Log().Info("Waiting on at least one ready pod", "err", err)
ctx.RequeueAfter(consts.DefaultWaitClusterPodsNotReady, nil)
return
}

if members, err := ispnClient.Container().Members(); err != nil {
ctx.Requeue(fmt.Errorf("unable to retrieve cluster members on scale up: %w", err))
if info, err := ispnClient.Container().Info(); err != nil {
ctx.Requeue(fmt.Errorf("unable to retrieve cluster information on scale up: %w", err))
return
} else if len(members) != int(i.Spec.Replicas) {
} else if info.ClusterSize != i.Spec.Replicas {
ctx.Log().Info("waiting for cluster to form", "replicas", i.Spec.Replicas)
ctx.RequeueAfter(consts.DefaultWaitClusterPodsNotReady, nil)
return
Expand Down
34 changes: 34 additions & 0 deletions test/e2e/infinispan/upgrade_operand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package infinispan

import (
"context"
"strings"
"testing"

ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/infinispan/infinispan-operator/pkg/mime"
"github.com/infinispan/infinispan-operator/pkg/reconcile/pipeline/infinispan/handler/provision"
tutils "github.com/infinispan/infinispan-operator/test/e2e/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -422,6 +424,38 @@ func TestPodAlreadyShutdownOnUpgrade(t *testing.T) {
assert.EqualValues(t, int32(0), ss.Status.UpdatedReplicas)
}

func TestScaleDownBlockedWithDegradedCache(t *testing.T) {
defer testKube.CleanNamespaceAndLogOnPanic(t, tutils.Namespace)

replicas := 1
ispn := tutils.DefaultSpec(t, testKube, func(i *ispnv1.Infinispan) {
i.Spec.Replicas = int32(replicas)
})
testKube.CreateInfinispan(ispn, tutils.Namespace)
testKube.WaitForInfinispanPods(replicas, tutils.SinglePodTimeout, ispn.Name, tutils.Namespace)
ispn = testKube.WaitForInfinispanCondition(ispn.Name, ispn.Namespace, ispnv1.ConditionWellFormed)

_client := tutils.HTTPClientForClusterWithVersionManager(ispn, testKube, tutils.VersionManager())
cacheName := "cache"
cacheConfig := "<distributed-cache><partition-handling when-split=\"DENY_READ_WRITES\" merge-policy=\"PREFERRED_ALWAYS\"/></distributed-cache>"
cacheHelper := tutils.NewCacheHelper(cacheName, _client)
cacheHelper.Create(cacheConfig, mime.ApplicationXml)
cacheHelper.Available(false)

tutils.ExpectNoError(
testKube.UpdateInfinispan(ispn, func() {
ispn.Spec.Replicas = 0
}),
)
testKube.WaitForInfinispanState(ispn.Name, ispn.Namespace, func(i *ispnv1.Infinispan) bool {
c := i.GetCondition(ispnv1.ConditionStopping)
return c.Status == metav1.ConditionFalse && strings.Contains(c.Message, "unable to proceed with GracefulShutdown as the cluster health is 'DEGRADED'")
})
cacheHelper.Available(true)
testKube.WaitForInfinispanPods(0, tutils.SinglePodTimeout, ispn.Name, tutils.Namespace)
testKube.WaitForInfinispanCondition(ispn.Name, ispn.Namespace, ispnv1.ConditionGracefulShutdown)
}

// specImageOperands() returns two latest Operands with the matching major/minor version
func specImageOperands() (*version.Operand, *version.Operand) {
operands := tutils.VersionManager().Operands
Expand Down
15 changes: 13 additions & 2 deletions test/e2e/upgrade/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ func checkBatch(t *testing.T, infinispan *ispnv1.Infinispan) {
testKube.DeleteBatch(batch)
}

func assertNoDegradedCaches() {
func locksCacheDegraded(operand version.Operand) bool {
if operand.UpstreamVersion.Major > 14 {
return false
}
podList := &corev1.PodList{}
tutils.ExpectNoError(
testKube.Kubernetes.ResourcesList(tutils.Namespace, map[string]string{"app": "infinispan-pod"}, podList, ctx),
Expand All @@ -269,7 +272,15 @@ func assertNoDegradedCaches() {
log, err := testKube.Kubernetes.Logs(provision.InfinispanContainer, pod.GetName(), pod.GetNamespace(), false, ctx)
tutils.ExpectNoError(err)
if strings.Contains(log, "DEGRADED_MODE") {
panic("Detected a cache in DEGRADED_MODE, unable to continue with test")
return true
}
}
return false
}

// healDegradedLocksCache forces the org.infinispan.LOCKS cache to AVAILABLE so that subsequent upgrades can proceed
// can be removed once Infinispan 14.0.x servers are no longer supported
func healDegradedLocksCache(operand version.Operand, client tutils.HTTPClient) {
tutils.Log().Infof("Setting org.infinispan.LOCKS cache to AVAILABLE for Operand %s", operand)
tutils.NewCacheHelperForOperand("org.infinispan.LOCKS", operand, client).Available(true)
}
Loading
Loading