Skip to content

Commit a5c781e

Browse files
authored
Fix/1.11 possibly deadlock queue (#688)
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent 0d69c02 commit a5c781e

File tree

7 files changed

+21
-20
lines changed

7 files changed

+21
-20
lines changed

.golangci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ linters:
9999
- third_party$
100100
- builtin$
101101
- examples$
102+
- _test\.go$
102103
issues:
103104
max-issues-per-linter: 0
104105
max-same-issues: 0

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/dominikbraun/graph v0.23.0
1010
github.com/ettle/strcase v0.2.0
1111
github.com/flant/kube-client v1.3.1
12-
github.com/flant/shell-operator v1.10.6
12+
github.com/flant/shell-operator v1.10.7
1313
github.com/go-chi/chi/v5 v5.2.2
1414
github.com/go-openapi/loads v0.19.5
1515
github.com/go-openapi/spec v0.19.8

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ github.com/flant/go-openapi-validate v0.19.12-flant.1 h1:GuB9XEfiLHq3M7fafRLq1AW
144144
github.com/flant/go-openapi-validate v0.19.12-flant.1/go.mod h1:Rzou8hA/CBw8donlS6WNEUQupNvUZ0waH08tGe6kAQ4=
145145
github.com/flant/kube-client v1.3.1 h1:1SdD799sujXNg2F6Z27le/+qkcKQaKf9Z492YGEhVhc=
146146
github.com/flant/kube-client v1.3.1/go.mod h1:mql6hsZMgBLAhdj3Emb8TrP5MVdXduFQ2NLjzn6IF0Y=
147-
github.com/flant/shell-operator v1.10.6 h1:LAIrxG6dLivJVOpn8ucwP0D8WzOBB7U+Fk7B9x8mcH0=
148-
github.com/flant/shell-operator v1.10.6/go.mod h1:El4fR63G/anIxQklPbK3CUiGTBcurUzf/4runR7FQhg=
147+
github.com/flant/shell-operator v1.10.7 h1:gyP8ncNnIdE4Ft8oKaG645pfA77W/Evj3zylxdpCl50=
148+
github.com/flant/shell-operator v1.10.7/go.mod h1:El4fR63G/anIxQklPbK3CUiGTBcurUzf/4runR7FQhg=
149149
github.com/flopp/go-findfont v0.1.0 h1:lPn0BymDUtJo+ZkV01VS3661HL6F4qFlkhcJN55u6mU=
150150
github.com/flopp/go-findfont v0.1.0/go.mod h1:wKKxRDjD024Rh7VMwoU90i6ikQRCr+JTHB5n4Ejkqvw=
151151
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=

pkg/addon-operator/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func StartTasksQueueLengthUpdater(metricStorage metric.Storage, tqs *queue.TaskQ
148148
go func() {
149149
for {
150150
// Gather task queues lengths.
151-
tqs.Iterate(context.TODO(), func(_ context.Context, queue *queue.TaskQueue) {
151+
tqs.IterateSnapshot(context.Background(), func(_ context.Context, queue *queue.TaskQueue) {
152152
queueLen := float64(queue.Length())
153153
metricStorage.GaugeSet("{PREFIX}tasks_queue_length", queueLen, map[string]string{"queue": queue.Name})
154154
})

pkg/addon-operator/operator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func convergeDone(op *AddonOperator) func(g Gomega) bool {
168168
}
169169
mainQueue := op.engine.TaskQueues.GetMain()
170170
g.Expect(func() bool {
171-
if mainQueue.IsEmpty() {
171+
if mainQueue.Length() == 0 {
172172
return true
173173
}
174174
return mainQueue.GetFirst().GetFailureCount() >= 2

pkg/addon-operator/queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func RemoveCurrentConvergeTasks(convergeQueues []*queue.TaskQueue, logLabels map
9595
convergeDrained := false
9696

9797
for _, queue := range convergeQueues {
98-
if queue == nil || queue.IsEmpty() {
98+
if queue == nil || queue.Length() == 0 {
9999
continue
100100
}
101101

pkg/addon-operator/queue_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func Test_ModuleEnsureCRDsTasksInQueueAfterId(t *testing.T) {
2727
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
2828
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
2929

30-
q := queue.NewTasksQueue(metricStorage)
30+
q := queue.NewTasksQueue("test-queue", metricStorage)
3131

3232
Task := &sh_task.BaseTask{Type: task.ModuleEnsureCRDs, Id: currentTaskID}
3333
q.AddLast(Task)
@@ -49,7 +49,7 @@ func Test_ModuleEnsureCRDsTasksInQueueAfterId(t *testing.T) {
4949
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
5050
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
5151

52-
q := queue.NewTasksQueue(metricStorage)
52+
q := queue.NewTasksQueue("test-queue", metricStorage)
5353

5454
Task := &sh_task.BaseTask{Type: task.ModuleEnsureCRDs, Id: currentTaskID}
5555
q.AddLast(Task)
@@ -70,7 +70,7 @@ func Test_ModuleEnsureCRDsTasksInQueueAfterId(t *testing.T) {
7070
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
7171
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
7272

73-
q := queue.NewTasksQueue(metricStorage)
73+
q := queue.NewTasksQueue("test-queue", metricStorage)
7474

7575
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: currentTaskID}
7676
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -107,7 +107,7 @@ func Test_QueueHasPendingModuleRunTask(t *testing.T) {
107107
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
108108
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
109109

110-
q := queue.NewTasksQueue(metricStorage)
110+
q := queue.NewTasksQueue("test-queue", metricStorage)
111111

112112
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "unknown"}
113113
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -128,7 +128,7 @@ func Test_QueueHasPendingModuleRunTask(t *testing.T) {
128128
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
129129
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
130130

131-
q := queue.NewTasksQueue(metricStorage)
131+
q := queue.NewTasksQueue("test-queue", metricStorage)
132132

133133
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "unknown"}
134134
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -152,7 +152,7 @@ func Test_QueueHasPendingModuleRunTask(t *testing.T) {
152152
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
153153
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
154154

155-
q := queue.NewTasksQueue(metricStorage)
155+
q := queue.NewTasksQueue("test-queue", metricStorage)
156156

157157
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "test"}
158158
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "test"}))
@@ -173,7 +173,7 @@ func Test_QueueHasPendingModuleRunTask(t *testing.T) {
173173
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
174174
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
175175

176-
q := queue.NewTasksQueue(metricStorage)
176+
q := queue.NewTasksQueue("test-queue", metricStorage)
177177

178178
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "unknown"}
179179
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -294,7 +294,7 @@ func Test_RemoveAdjacentConvergeModules(t *testing.T) {
294294
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
295295
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
296296

297-
q := queue.NewTasksQueue(metricStorage)
297+
q := queue.NewTasksQueue("test-queue", metricStorage)
298298

299299
for i := range tt.in {
300300
tsk := &tt.in[i]
@@ -385,7 +385,7 @@ func Test_ModulesWithPendingModuleRun(t *testing.T) {
385385
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
386386
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
387387

388-
q := queue.NewTasksQueue(metricStorage)
388+
q := queue.NewTasksQueue("test-queue", metricStorage)
389389

390390
for _, tsk := range tt.in {
391391
q.AddLast(tsk)
@@ -537,7 +537,7 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
537537
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
538538
metricStorage.GaugeSetMock.Optional().Set(func(_ string, _ float64, _ map[string]string) {})
539539

540-
q := queue.NewTasksQueue(metricStorage)
540+
q := queue.NewTasksQueue("test-queue", metricStorage)
541541

542542
// Fill queue from the test case.
543543
queues = append(queues, q)
@@ -666,7 +666,7 @@ func Test_RemoveCurrentConvergeTasksFromId(t *testing.T) {
666666
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
667667
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
668668

669-
q := queue.NewTasksQueue(metricStorage)
669+
q := queue.NewTasksQueue("test-queue", metricStorage)
670670

671671
// Fill queue from the test case.
672672
for i := range tt.initialTasks {
@@ -715,7 +715,7 @@ func Test_ConvergeModulesInQueue(t *testing.T) {
715715
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
716716
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
717717

718-
q := queue.NewTasksQueue(metricStorage)
718+
q := queue.NewTasksQueue("test-queue", metricStorage)
719719

720720
Task := &sh_task.BaseTask{Type: task.ModuleRun, Id: "unknown"}
721721
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -736,7 +736,7 @@ func Test_ConvergeModulesInQueue(t *testing.T) {
736736
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
737737
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
738738

739-
q := queue.NewTasksQueue(metricStorage)
739+
q := queue.NewTasksQueue("test-queue", metricStorage)
740740

741741
Task := &sh_task.BaseTask{Type: task.GlobalHookRun, Id: "unknown"}
742742
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))
@@ -764,7 +764,7 @@ func Test_ConvergeModulesInQueue(t *testing.T) {
764764
metricStorage.HistogramObserveMock.Set(func(_ string, _ float64, _ map[string]string, _ []float64) {})
765765
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {})
766766

767-
q := queue.NewTasksQueue(metricStorage)
767+
q := queue.NewTasksQueue("test-queue", metricStorage)
768768

769769
Task := &sh_task.BaseTask{Type: task.GlobalHookRun, Id: "unknown"}
770770
q.AddLast(Task.WithMetadata(task.HookMetadata{ModuleName: "unknown"}))

0 commit comments

Comments
 (0)