Skip to content

Commit 3fc2315

Browse files
authored
[addon-operator] fix/add recovery to possibly deadlock (#687)
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent 6ae3a66 commit 3fc2315

File tree

8 files changed

+27
-22
lines changed

8 files changed

+27
-22
lines changed

.golangci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ linters:
9999
- third_party$
100100
- builtin$
101101
- examples$
102+
- _test\.go$
102103
issues:
103104
max-issues-per-linter: 0
104105
max-same-issues: 0

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/dominikbraun/graph v0.23.0
1111
github.com/ettle/strcase v0.2.0
1212
github.com/flant/kube-client v1.3.1
13-
github.com/flant/shell-operator v1.11.3
13+
github.com/flant/shell-operator v1.11.4
1414
github.com/go-chi/chi/v5 v5.2.2
1515
github.com/go-openapi/loads v0.19.5
1616
github.com/go-openapi/spec v0.19.8

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ github.com/flant/go-openapi-validate v0.19.12-flant.1 h1:GuB9XEfiLHq3M7fafRLq1AW
164164
github.com/flant/go-openapi-validate v0.19.12-flant.1/go.mod h1:Rzou8hA/CBw8donlS6WNEUQupNvUZ0waH08tGe6kAQ4=
165165
github.com/flant/kube-client v1.3.1 h1:1SdD799sujXNg2F6Z27le/+qkcKQaKf9Z492YGEhVhc=
166166
github.com/flant/kube-client v1.3.1/go.mod h1:mql6hsZMgBLAhdj3Emb8TrP5MVdXduFQ2NLjzn6IF0Y=
167-
github.com/flant/shell-operator v1.11.3 h1:Yp2N/cn/y1glCHKzfAKB8HPcIZ18TidGkgCaP38E6oc=
168-
github.com/flant/shell-operator v1.11.3/go.mod h1:TFTCgXpp+yrvSUQSQKgotJbRK720fiqwaQdFVU6dAlU=
167+
github.com/flant/shell-operator v1.11.4 h1:PfLAq11RG4+gmtr5/yxBSEeuFH6btDT87oO+usph4ts=
168+
github.com/flant/shell-operator v1.11.4/go.mod h1:TFTCgXpp+yrvSUQSQKgotJbRK720fiqwaQdFVU6dAlU=
169169
github.com/flopp/go-findfont v0.1.0 h1:lPn0BymDUtJo+ZkV01VS3661HL6F4qFlkhcJN55u6mU=
170170
github.com/flopp/go-findfont v0.1.0/go.mod h1:wKKxRDjD024Rh7VMwoU90i6ikQRCr+JTHB5n4Ejkqvw=
171171
github.com/fluxcd/flagger v1.36.1 h1:X2PumtNwZz9YSGaOtZLFm2zAKLgHhFkbNv8beg7ifyc=

pkg/addon-operator/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func StartTasksQueueLengthUpdater(metricStorage metricsstorage.Storage, tqs *que
149149
go func() {
150150
for {
151151
// Gather task queues lengths.
152-
tqs.Iterate(context.TODO(), func(_ context.Context, queue *queue.TaskQueue) {
152+
tqs.IterateSnapshot(context.TODO(), func(_ context.Context, queue *queue.TaskQueue) {
153153
queueLen := float64(queue.Length())
154154
metricStorage.GaugeSet("{PREFIX}tasks_queue_length", queueLen, map[string]string{"queue": queue.Name})
155155
})

pkg/addon-operator/operator_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ func convergeDone(op *AddonOperator) func(g Gomega) bool {
181181
}
182182
mainQueue := op.engine.TaskQueues.GetMain()
183183
g.Expect(func() bool {
184-
if mainQueue.IsEmpty() {
184+
// If main queue is empty, converge is done.
185+
if mainQueue.Length() == 0 {
185186
return true
186187
}
187188
return mainQueue.GetFirst().GetFailureCount() >= 2

pkg/addon-operator/queue.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ func RemoveCurrentConvergeTasks(convergeQueues []*queue.TaskQueue, logLabels map
9595
convergeDrained := false
9696

9797
for _, queue := range convergeQueues {
98-
if queue == nil || queue.IsEmpty() {
98+
// Skip empty queues.
99+
if queue == nil || queue.Length() == 0 {
99100
continue
100101
}
101102

pkg/addon-operator/queue_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func Test_ModuleEnsureCRDsTasksInQueueAfterId(t *testing.T) {
2727
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
2828
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
2929

30-
q := queue.NewTasksQueue(metricStorage)
30+
q := queue.NewTasksQueue("test", metricStorage)
3131

3232
Task := &sh_task.BaseTask{Type: task.ModuleEnsureCRDs, Id: currentTaskID}
3333
q.AddLast(Task)
@@ -49,7 +49,7 @@ func Test_ModuleEnsureCRDsTasksInQueueAfterId(t *testing.T) {
4949
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
5050
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
5151

52-
q := queue.NewTasksQueue(metricStorage)
52+
q := queue.NewTasksQueue("test", metricStorage)
5353

5454
Task := &sh_task.BaseTask{Type: task.ModuleEnsureCRDs, Id: currentTaskID}
5555
q.AddLast(Task)
@@ -70,7 +70,7 @@ func Test_ModuleEnsureCRDsTasksInQueueAfterId(t *testing.T) {
7070
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
7171
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
7272

73-
q := queue.NewTasksQueue(metricStorage)
73+
q := queue.NewTasksQueue("test", metricStorage)
7474

7575
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: currentTaskID}
7676
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -107,7 +107,7 @@ func Test_QueueHasPendingModuleRunTask(t *testing.T) {
107107
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
108108
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
109109

110-
q := queue.NewTasksQueue(metricStorage)
110+
q := queue.NewTasksQueue("test", metricStorage)
111111

112112
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "unknown"}
113113
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -128,7 +128,7 @@ func Test_QueueHasPendingModuleRunTask(t *testing.T) {
128128
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
129129
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
130130

131-
q := queue.NewTasksQueue(metricStorage)
131+
q := queue.NewTasksQueue("test", metricStorage)
132132

133133
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "unknown"}
134134
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -152,7 +152,7 @@ func Test_QueueHasPendingModuleRunTask(t *testing.T) {
152152
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
153153
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
154154

155-
q := queue.NewTasksQueue(metricStorage)
155+
q := queue.NewTasksQueue("test", metricStorage)
156156

157157
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "test"}
158158
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "test"}))
@@ -173,7 +173,7 @@ func Test_QueueHasPendingModuleRunTask(t *testing.T) {
173173
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
174174
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
175175

176-
q := queue.NewTasksQueue(metricStorage)
176+
q := queue.NewTasksQueue("test", metricStorage)
177177

178178
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "unknown"}
179179
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -294,7 +294,7 @@ func Test_RemoveAdjacentConvergeModules(t *testing.T) {
294294
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
295295
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
296296

297-
q := queue.NewTasksQueue(metricStorage)
297+
q := queue.NewTasksQueue("test", metricStorage)
298298

299299
for i := range tt.in {
300300
tsk := &tt.in[i]
@@ -385,7 +385,7 @@ func Test_ModulesWithPendingModuleRun(t *testing.T) {
385385
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
386386
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
387387

388-
q := queue.NewTasksQueue(metricStorage)
388+
q := queue.NewTasksQueue("test", metricStorage)
389389

390390
for _, tsk := range tt.in {
391391
q.AddLast(tsk)
@@ -537,7 +537,7 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
537537
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
538538
metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) {})
539539

540-
q := queue.NewTasksQueue(metricStorage)
540+
q := queue.NewTasksQueue("test", metricStorage)
541541

542542
// Fill queue from the test case.
543543
queues = append(queues, q)
@@ -666,7 +666,7 @@ func Test_RemoveCurrentConvergeTasksFromId(t *testing.T) {
666666
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
667667
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
668668

669-
q := queue.NewTasksQueue(metricStorage)
669+
q := queue.NewTasksQueue("test", metricStorage)
670670

671671
// Fill queue from the test case.
672672
for i := range tt.initialTasks {
@@ -715,7 +715,7 @@ func Test_ConvergeModulesInQueue(t *testing.T) {
715715
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
716716
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
717717

718-
q := queue.NewTasksQueue(metricStorage)
718+
q := queue.NewTasksQueue("test", metricStorage)
719719

720720
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "unknown"}
721721
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -736,7 +736,7 @@ func Test_ConvergeModulesInQueue(t *testing.T) {
736736
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
737737
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
738738

739-
q := queue.NewTasksQueue(metricStorage)
739+
q := queue.NewTasksQueue("test", metricStorage)
740740

741741
Task := &sh_task.BaseTask{Type: task.GlobalHookRun, Id: "unknown"}
742742
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -764,7 +764,7 @@ func Test_ConvergeModulesInQueue(t *testing.T) {
764764
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
765765
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
766766

767-
q := queue.NewTasksQueue(metricStorage)
767+
q := queue.NewTasksQueue("test", metricStorage)
768768

769769
Task := &sh_task.BaseTask{Type: task.GlobalHookRun, Id: "unknown"}
770770
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))

pkg/helm_resources_manager/helm_resources_manager.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,23 @@ type helmResourcesManager struct {
5555
var _ HelmResourcesManager = &helmResourcesManager{}
5656

5757
func NewHelmResourcesManager(ctx context.Context, kclient *klient.Client, logger *log.Logger) (HelmResourcesManager, error) {
58-
//nolint:govet
5958
cctx, cancel := context.WithCancel(ctx)
6059
if kclient == nil {
61-
//nolint:govet
60+
cancel()
6261
return nil, fmt.Errorf("kube client not set")
6362
}
6463

6564
cfg := kclient.RestConfig()
6665
defaultLabelSelector, err := labels.Parse(app.ExtraLabels)
6766
if err != nil {
67+
cancel()
6868
return nil, err
6969
}
7070
cache, err := cr_cache.New(cfg, cr_cache.Options{
7171
DefaultLabelSelector: defaultLabelSelector,
7272
})
7373
if err != nil {
74+
cancel()
7475
return nil, err
7576
}
7677

@@ -80,6 +81,7 @@ func NewHelmResourcesManager(ctx context.Context, kclient *klient.Client, logger
8081

8182
log.Debug("Helm resource manager: cache's been started")
8283
if synced := cache.WaitForCacheSync(cctx); !synced {
84+
cancel()
8385
return nil, fmt.Errorf("Couldn't sync helm resource informer cache")
8486
}
8587
log.Debug("Helm resourcer manager: cache has been synced")

0 commit comments

Comments
 (0)