Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/kubernetes/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"context"
"reflect"
"sort"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -176,3 +177,13 @@ func VolumeMountExists(name string, container *corev1.Container) bool {
}
return false
}

func SortPodsByName(podList *corev1.PodList) {
if podList == nil {
return
}
sort.Slice(podList.Items, func(i, j int) bool {
// Compare the Name field of the ObjectMeta for two pods
return podList.Items[i].ObjectMeta.Name < podList.Items[j].ObjectMeta.Name
})
}
88 changes: 88 additions & 0 deletions pkg/kubernetes/pods_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package kubernetes

import (
"reflect"
"testing"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// TestSortPodsByName tests the SortPodsByName function.
func TestSortPodsByName(t *testing.T) {
testCases := []struct {
name string
input *v1.PodList
expected []string
}{
{
name: "basic unsorted list",
input: &v1.PodList{
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod-2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod-0"}},
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1"}},
},
},
expected: []string{"pod-0", "pod-1", "pod-2"},
},
{
name: "already sorted list",
input: &v1.PodList{
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "alpha"}},
{ObjectMeta: metav1.ObjectMeta{Name: "beta"}},
{ObjectMeta: metav1.ObjectMeta{Name: "gamma"}},
},
},
expected: []string{"alpha", "beta", "gamma"},
},
{
name: "reverse sorted list",
input: &v1.PodList{
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "zebra"}},
{ObjectMeta: metav1.ObjectMeta{Name: "yak"}},
{ObjectMeta: metav1.ObjectMeta{Name: "xylophone"}},
},
},
expected: []string{"xylophone", "yak", "zebra"},
},
{
name: "empty list",
input: &v1.PodList{
Items: []v1.Pod{},
},
expected: []string{},
},
{
name: "list with one item",
input: &v1.PodList{
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "single-pod"}},
},
},
expected: []string{"single-pod"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
SortPodsByName(tc.input)

// Extract the names from the sorted list for comparison
actualNames := []string{}
if tc.input != nil { // Handle nil input case
for _, pod := range tc.input.Items {
actualNames = append(actualNames, pod.Name)
}
}

// Compare the actual sorted names with the expected names
if !reflect.DeepEqual(actualNames, tc.expected) {
t.Errorf("SortPodsByName() for test case '%s':\nExpected order: %v\nActual order: %v",
tc.name, tc.expected, actualNames)
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/reconcile/pipeline/infinispan/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func (c *contextImpl) InfinispanPods() (*corev1.PodList, error) {
return nil, err
}
kube.FilterPodsByOwnerUID(podList, statefulSet.GetUID())
// The api-server does not guarantee the ordering of pods in the list, so sort by name to make it more deterministic
kube.SortPodsByName(podList)
c.ispnPods = podList
}
return c.ispnPods.DeepCopy(), nil
Expand Down
25 changes: 23 additions & 2 deletions pkg/reconcile/pipeline/infinispan/handler/manage/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package manage
import (
"errors"
"fmt"
"strings"

ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
consts "github.com/infinispan/infinispan-operator/controllers/constants"
Expand Down Expand Up @@ -140,18 +141,29 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
return
}

for idx, pod := range podList.Items {
var rebalanceDisabled bool
for _, pod := range podList.Items {
ispnClient, err := ctx.InfinispanClientUnknownVersion(pod.Name)
if err != nil {
if shutdown, state := containerAlreadyShutdown(err); shutdown {
logger.Info("Skipping pod whose cache-container has already been shutdown by the Operator", "pod", pod.Name, "state", state)
continue
}
ctx.Requeue(fmt.Errorf("unable to create Infinispan client for cluster being upgraded: %w", err))
return
}
if idx == 0 {

// Disabling rebalancing is a cluster-wide operation so we only need to perform this on a single pod
// However, multiple calls to this endpoint should be safe, so it's ok if a subsequent reconciliation
// executes this again
if !rebalanceDisabled {
if err := ispnClient.Container().RebalanceDisable(); err != nil {
ctx.Requeue(fmt.Errorf("unable to disable rebalancing: %w", err))
return
}
rebalanceDisabled = true
}

if kube.IsPodReady(pod) {
if err := ispnClient.Container().Shutdown(); err != nil {
ctx.Requeue(fmt.Errorf("error encountered on container shutdown: %w", err))
Expand Down Expand Up @@ -223,6 +235,15 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
}
}

func containerAlreadyShutdown(err error) (bool, string) {
if strings.Contains(err.Error(), "Server is STOPPING") {
return true, "STOPPING"
} else if strings.Contains(err.Error(), "Server is TERMINATED") {
return true, "TERMINATED"
}
return false, ""
}

// GracefulShutdownUpgrade performs the steps required by GracefulShutdown upgrades once the cluster has been scaled down
// to 0 replicas
func GracefulShutdownUpgrade(i *ispnv1.Infinispan, ctx pipeline.Context) {
Expand Down
46 changes: 46 additions & 0 deletions test/e2e/infinispan/upgrade_operand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
"github.com/infinispan/infinispan-operator/pkg/infinispan/client"
"github.com/infinispan/infinispan-operator/pkg/infinispan/version"
"github.com/infinispan/infinispan-operator/pkg/kubernetes"
"github.com/infinispan/infinispan-operator/pkg/mime"
Expand Down Expand Up @@ -407,6 +408,51 @@ func TestSpecImageUpdate(t *testing.T) {
assert.EqualValues(t, 1, ss.Status.ObservedGeneration)
}

// TestPodAlreadyShutdownOnUpgrade simulates a scenario where a GracefulShutdown fails when only a subset of pods have had
// their container shutdown
func TestPodAlreadyShutdownOnUpgrade(t *testing.T) {
defer testKube.CleanNamespaceAndLogOnPanic(t, tutils.Namespace)

i := tutils.DefaultSpec(t, testKube, func(infinispan *ispnv1.Infinispan) {
infinispan.Spec.Replicas = 1
infinispan.Spec.Security.EndpointAuthentication = pointer.Bool(false)
})
testKube.CreateInfinispan(i, tutils.Namespace)
testKube.WaitForInfinispanPods(1, tutils.SinglePodTimeout, i.Name, tutils.Namespace)
i = testKube.WaitForInfinispanCondition(i.Name, i.Namespace, ispnv1.ConditionWellFormed)

schema := i.GetEndpointScheme()
client_ := testKube.WaitForExternalService(i, tutils.RouteTimeout, tutils.NewHTTPClientNoAuth(schema), nil)
ispnClient := client.New(tutils.CurrentOperand, client_)
tutils.ExpectNoError(
ispnClient.Container().Shutdown(),
)

tutils.ExpectNoError(
testKube.UpdateInfinispan(i, func() {
i.Spec.Replicas = 0
}),
)
testKube.WaitForInfinispanPods(0, tutils.SinglePodTimeout, i.Name, tutils.Namespace)

ss := appsv1.StatefulSet{}
tutils.ExpectNoError(
testKube.Kubernetes.Client.Get(
context.TODO(),
types.NamespacedName{
Namespace: i.Namespace,
Name: i.GetStatefulSetName(),
},
&ss,
),
)
assert.EqualValues(t, int64(2), ss.Status.ObservedGeneration)
assert.EqualValues(t, int32(0), ss.Status.ReadyReplicas)
assert.EqualValues(t, int32(0), ss.Status.CurrentReplicas)
assert.EqualValues(t, int32(0), ss.Status.UpdatedReplicas)
}

// specImageOperands() returns two latest Operands with the matching major/minor version
func specImageOperands() (*version.Operand, *version.Operand) {
operands := tutils.VersionManager().Operands
length := len(operands)
Expand Down
Loading