Skip to content

Commit 981d627

Browse files
committed
[#2269] GracefulShutdown upgrades should tolerate pods that have already been stopped
1 parent d8fbfcc commit 981d627

File tree

5 files changed

+169
-2
lines changed

5 files changed

+169
-2
lines changed

pkg/kubernetes/pods.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kubernetes
33
import (
44
"context"
55
"reflect"
6+
"sort"
67

78
corev1 "k8s.io/api/core/v1"
89
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -176,3 +177,13 @@ func VolumeMountExists(name string, container *corev1.Container) bool {
176177
}
177178
return false
178179
}
180+
181+
func SortPodsByName(podList *corev1.PodList) {
182+
if podList == nil {
183+
return
184+
}
185+
sort.Slice(podList.Items, func(i, j int) bool {
186+
// Compare the Name field of the ObjectMeta for two pods
187+
return podList.Items[i].ObjectMeta.Name < podList.Items[j].ObjectMeta.Name
188+
})
189+
}

pkg/kubernetes/pods_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package kubernetes
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
7+
v1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
)
10+
11+
// TestSortPodsByName tests the SortPodsByName function.
12+
func TestSortPodsByName(t *testing.T) {
13+
testCases := []struct {
14+
name string
15+
input *v1.PodList
16+
expected []string
17+
}{
18+
{
19+
name: "basic unsorted list",
20+
input: &v1.PodList{
21+
Items: []v1.Pod{
22+
{ObjectMeta: metav1.ObjectMeta{Name: "pod-2"}},
23+
{ObjectMeta: metav1.ObjectMeta{Name: "pod-0"}},
24+
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1"}},
25+
},
26+
},
27+
expected: []string{"pod-0", "pod-1", "pod-2"},
28+
},
29+
{
30+
name: "already sorted list",
31+
input: &v1.PodList{
32+
Items: []v1.Pod{
33+
{ObjectMeta: metav1.ObjectMeta{Name: "alpha"}},
34+
{ObjectMeta: metav1.ObjectMeta{Name: "beta"}},
35+
{ObjectMeta: metav1.ObjectMeta{Name: "gamma"}},
36+
},
37+
},
38+
expected: []string{"alpha", "beta", "gamma"},
39+
},
40+
{
41+
name: "reverse sorted list",
42+
input: &v1.PodList{
43+
Items: []v1.Pod{
44+
{ObjectMeta: metav1.ObjectMeta{Name: "zebra"}},
45+
{ObjectMeta: metav1.ObjectMeta{Name: "yak"}},
46+
{ObjectMeta: metav1.ObjectMeta{Name: "xylophone"}},
47+
},
48+
},
49+
expected: []string{"xylophone", "yak", "zebra"},
50+
},
51+
{
52+
name: "empty list",
53+
input: &v1.PodList{
54+
Items: []v1.Pod{},
55+
},
56+
expected: []string{},
57+
},
58+
{
59+
name: "list with one item",
60+
input: &v1.PodList{
61+
Items: []v1.Pod{
62+
{ObjectMeta: metav1.ObjectMeta{Name: "single-pod"}},
63+
},
64+
},
65+
expected: []string{"single-pod"},
66+
},
67+
}
68+
69+
for _, tc := range testCases {
70+
t.Run(tc.name, func(t *testing.T) {
71+
SortPodsByName(tc.input)
72+
73+
// Extract the names from the sorted list for comparison
74+
actualNames := []string{}
75+
if tc.input != nil { // Handle nil input case
76+
for _, pod := range tc.input.Items {
77+
actualNames = append(actualNames, pod.Name)
78+
}
79+
}
80+
81+
// Compare the actual sorted names with the expected names
82+
if !reflect.DeepEqual(actualNames, tc.expected) {
83+
t.Errorf("SortPodsByName() for test case '%s':\nExpected order: %v\nActual order: %v",
84+
tc.name, tc.expected, actualNames)
85+
}
86+
})
87+
}
88+
}

pkg/reconcile/pipeline/infinispan/context/context.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ func (c *contextImpl) InfinispanPods() (*corev1.PodList, error) {
135135
return nil, err
136136
}
137137
kube.FilterPodsByOwnerUID(podList, statefulSet.GetUID())
138+
// The api-server does not guarantee the ordering of pods in the list, so sort by name to make it more deterministic
139+
kube.SortPodsByName(podList)
138140
c.ispnPods = podList
139141
}
140142
return c.ispnPods.DeepCopy(), nil

pkg/reconcile/pipeline/infinispan/handler/manage/upgrades.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package manage
33
import (
44
"errors"
55
"fmt"
6+
"strings"
67

78
ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
89
consts "github.com/infinispan/infinispan-operator/controllers/constants"
@@ -140,18 +141,29 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
140141
return
141142
}
142143

143-
for idx, pod := range podList.Items {
144+
var rebalanceDisabled bool
145+
for _, pod := range podList.Items {
144146
ispnClient, err := ctx.InfinispanClientUnknownVersion(pod.Name)
145147
if err != nil {
148+
if shutdown, state := containerAlreadyShutdown(err); shutdown {
149+
logger.Info("Skipping pod whose cache-container has already been shutdown by the Operator", "pod", pod.Name, "state", state)
150+
continue
151+
}
146152
ctx.Requeue(fmt.Errorf("unable to create Infinispan client for cluster being upgraded: %w", err))
147153
return
148154
}
149-
if idx == 0 {
155+
156+
// Disabling rebalancing is a cluster-wide operation so we only need to perform this on a single pod
157+
// However, multiple calls to this endpoint should be safe, so it's ok if a subsequent reconciliation
158+
// executes this again
159+
if !rebalanceDisabled {
150160
if err := ispnClient.Container().RebalanceDisable(); err != nil {
151161
ctx.Requeue(fmt.Errorf("unable to disable rebalancing: %w", err))
152162
return
153163
}
164+
rebalanceDisabled = true
154165
}
166+
155167
if kube.IsPodReady(pod) {
156168
if err := ispnClient.Container().Shutdown(); err != nil {
157169
ctx.Requeue(fmt.Errorf("error encountered on container shutdown: %w", err))
@@ -223,6 +235,15 @@ func GracefulShutdown(i *ispnv1.Infinispan, ctx pipeline.Context) {
223235
}
224236
}
225237

238+
func containerAlreadyShutdown(err error) (bool, string) {
239+
if strings.Contains(err.Error(), "Server is STOPPING") {
240+
return true, "STOPPING"
241+
} else if strings.Contains(err.Error(), "Server is TERMINATED") {
242+
return true, "TERMINATED"
243+
}
244+
return false, ""
245+
}
246+
226247
// GracefulShutdownUpgrade performs the steps required by GracefulShutdown upgrades once the cluster has been scaled down
227248
// to 0 replicas
228249
func GracefulShutdownUpgrade(i *ispnv1.Infinispan, ctx pipeline.Context) {

test/e2e/infinispan/upgrade_operand_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66

77
ispnv1 "github.com/infinispan/infinispan-operator/api/v1"
8+
"github.com/infinispan/infinispan-operator/pkg/infinispan/client"
89
"github.com/infinispan/infinispan-operator/pkg/infinispan/version"
910
"github.com/infinispan/infinispan-operator/pkg/kubernetes"
1011
"github.com/infinispan/infinispan-operator/pkg/mime"
@@ -377,6 +378,50 @@ func TestSpecImageUpdate(t *testing.T) {
377378
assert.EqualValues(t, 1, ss.Status.ObservedGeneration)
378379
}
379380

381+
// TestPodAlreadyShutdownOnUpgrade simulates a scenario where a GracefulShutdown fails when only a subset of pods have had
382+
// their container shutdown
383+
func TestPodAlreadyShutdownOnUpgrade(t *testing.T) {
384+
defer testKube.CleanNamespaceAndLogOnPanic(t, tutils.Namespace)
385+
386+
i := tutils.DefaultSpec(t, testKube, func(infinispan *ispnv1.Infinispan) {
387+
infinispan.Spec.Replicas = 1
388+
infinispan.Spec.Security.EndpointAuthentication = pointer.Bool(false)
389+
})
390+
testKube.CreateInfinispan(i, tutils.Namespace)
391+
testKube.WaitForInfinispanPods(1, tutils.SinglePodTimeout, i.Name, tutils.Namespace)
392+
i = testKube.WaitForInfinispanCondition(i.Name, i.Namespace, ispnv1.ConditionWellFormed)
393+
394+
schema := i.GetEndpointScheme()
395+
client_ := testKube.WaitForExternalService(i, tutils.RouteTimeout, tutils.NewHTTPClientNoAuth(schema), nil)
396+
ispnClient := client.New(tutils.CurrentOperand, client_)
397+
tutils.ExpectNoError(
398+
ispnClient.Container().Shutdown(),
399+
)
400+
401+
tutils.ExpectNoError(
402+
testKube.UpdateInfinispan(i, func() {
403+
i.Spec.Replicas = 0
404+
}),
405+
)
406+
testKube.WaitForInfinispanPods(0, tutils.SinglePodTimeout, i.Name, tutils.Namespace)
407+
408+
ss := appsv1.StatefulSet{}
409+
tutils.ExpectNoError(
410+
testKube.Kubernetes.Client.Get(
411+
context.TODO(),
412+
types.NamespacedName{
413+
Namespace: i.Namespace,
414+
Name: i.GetStatefulSetName(),
415+
},
416+
&ss,
417+
),
418+
)
419+
assert.EqualValues(t, int64(2), ss.Status.ObservedGeneration)
420+
assert.EqualValues(t, int32(0), ss.Status.ReadyReplicas)
421+
assert.EqualValues(t, int32(0), ss.Status.CurrentReplicas)
422+
assert.EqualValues(t, int32(0), ss.Status.UpdatedReplicas)
423+
}
424+
380425
// specImageOperands() returns two latest Operands with the matching major/minor version
381426
func specImageOperands() (*version.Operand, *version.Operand) {
382427
operands := tutils.VersionManager().Operands

0 commit comments

Comments
 (0)