Skip to content

Commit 0d69c02

Browse files
authored
[addon-operator] Backport 1.11: fix: deadlock shell-operator fix (#681)
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent 2444c82 commit 0d69c02

File tree

9 files changed

+26
-24
lines changed

9 files changed

+26
-24
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/dominikbraun/graph v0.23.0
1010
github.com/ettle/strcase v0.2.0
1111
github.com/flant/kube-client v1.3.1
12-
github.com/flant/shell-operator v1.10.4
12+
github.com/flant/shell-operator v1.10.6
1313
github.com/go-chi/chi/v5 v5.2.2
1414
github.com/go-openapi/loads v0.19.5
1515
github.com/go-openapi/spec v0.19.8

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ github.com/flant/go-openapi-validate v0.19.12-flant.1 h1:GuB9XEfiLHq3M7fafRLq1AW
144144
github.com/flant/go-openapi-validate v0.19.12-flant.1/go.mod h1:Rzou8hA/CBw8donlS6WNEUQupNvUZ0waH08tGe6kAQ4=
145145
github.com/flant/kube-client v1.3.1 h1:1SdD799sujXNg2F6Z27le/+qkcKQaKf9Z492YGEhVhc=
146146
github.com/flant/kube-client v1.3.1/go.mod h1:mql6hsZMgBLAhdj3Emb8TrP5MVdXduFQ2NLjzn6IF0Y=
147-
github.com/flant/shell-operator v1.10.4 h1:OvhnpiRIQUMAvyp7w15t36fOAclclcOFjhFerA67pIE=
148-
github.com/flant/shell-operator v1.10.4/go.mod h1:El4fR63G/anIxQklPbK3CUiGTBcurUzf/4runR7FQhg=
147+
github.com/flant/shell-operator v1.10.6 h1:LAIrxG6dLivJVOpn8ucwP0D8WzOBB7U+Fk7B9x8mcH0=
148+
github.com/flant/shell-operator v1.10.6/go.mod h1:El4fR63G/anIxQklPbK3CUiGTBcurUzf/4runR7FQhg=
149149
github.com/flopp/go-findfont v0.1.0 h1:lPn0BymDUtJo+ZkV01VS3661HL6F4qFlkhcJN55u6mU=
150150
github.com/flopp/go-findfont v0.1.0/go.mod h1:wKKxRDjD024Rh7VMwoU90i6ikQRCr+JTHB5n4Ejkqvw=
151151
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=

pkg/addon-operator/metrics.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package addon_operator
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/flant/addon-operator/pkg"
@@ -147,7 +148,7 @@ func StartTasksQueueLengthUpdater(metricStorage metric.Storage, tqs *queue.TaskQ
147148
go func() {
148149
for {
149150
// Gather task queues lengths.
150-
tqs.Iterate(func(queue *queue.TaskQueue) {
151+
tqs.Iterate(context.TODO(), func(_ context.Context, queue *queue.TaskQueue) {
151152
queueLen := float64(queue.Length())
152153
metricStorage.GaugeSet("{PREFIX}tasks_queue_length", queueLen, map[string]string{"queue": queue.Name})
153154
})

pkg/addon-operator/operator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func Test_Operator_startup_tasks(t *testing.T) {
209209
}
210210

211211
i := 0
212-
op.engine.TaskQueues.GetMain().Iterate(func(tsk sh_task.Task) {
212+
op.engine.TaskQueues.GetMain().IterateSnapshot(func(tsk sh_task.Task) {
213213
// Stop checking if no expects left.
214214
if i >= len(expectTasks) {
215215
return

pkg/addon-operator/queue.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func ModulesWithPendingModuleRun(q *queue.TaskQueue) map[string]struct{} {
3434

3535
skipFirstTask := true
3636

37-
q.Iterate(func(t sh_task.Task) {
37+
q.IterateSnapshot(func(t sh_task.Task) {
3838
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
3939
if skipFirstTask {
4040
skipFirstTask = false
@@ -63,7 +63,7 @@ func ConvergeTasksInQueue(q *queue.TaskQueue) int {
6363
}
6464

6565
convergeTasks := 0
66-
q.Iterate(func(t sh_task.Task) {
66+
q.IterateSnapshot(func(t sh_task.Task) {
6767
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
6868
convergeTasks++
6969
}
@@ -78,7 +78,7 @@ func ConvergeModulesInQueue(q *queue.TaskQueue) int {
7878
}
7979

8080
tasks := 0
81-
q.Iterate(func(t sh_task.Task) {
81+
q.IterateSnapshot(func(t sh_task.Task) {
8282
taskType := t.GetType()
8383
if converge.IsConvergeTask(t) && (taskType == task.ModuleRun || taskType == task.ModuleDelete) {
8484
tasks++
@@ -101,7 +101,7 @@ func RemoveCurrentConvergeTasks(convergeQueues []*queue.TaskQueue, logLabels map
101101

102102
stop := false
103103

104-
queue.Filter(func(t sh_task.Task) bool {
104+
queue.DeleteFunc(func(t sh_task.Task) bool {
105105
if stop {
106106
return true
107107
}
@@ -152,7 +152,7 @@ func RemoveCurrentConvergeTasksFromId(q *queue.TaskQueue, afterId string, logLab
152152
IDFound := false
153153
convergeDrained := false
154154
stop := false
155-
q.Filter(func(t sh_task.Task) bool {
155+
q.DeleteFunc(func(t sh_task.Task) bool {
156156
if stop {
157157
return true
158158
}
@@ -197,7 +197,7 @@ func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string, logLabels
197197

198198
IDFound := false
199199
stop := false
200-
q.Filter(func(t sh_task.Task) bool {
200+
q.DeleteFunc(func(t sh_task.Task) bool {
201201
if stop {
202202
return true
203203
}
@@ -218,6 +218,7 @@ func RemoveAdjacentConvergeModules(q *queue.TaskQueue, afterId string, logLabels
218218
}
219219

220220
stop = true
221+
221222
return true
222223
})
223224
}
@@ -229,7 +230,7 @@ func ModuleEnsureCRDsTasksInQueueAfterId(q *queue.TaskQueue, afterId string) boo
229230
IDFound := false
230231
taskFound := false
231232
stop := false
232-
q.Filter(func(t sh_task.Task) bool {
233+
q.DeleteFunc(func(t sh_task.Task) bool {
233234
if stop {
234235
return true
235236
}
@@ -257,7 +258,7 @@ func DrainNonMainQueue(q *queue.TaskQueue) {
257258
}
258259

259260
// Remove all tasks.
260-
q.Filter(func(_ sh_task.Task) bool {
261+
q.DeleteFunc(func(_ sh_task.Task) bool {
261262
return false
262263
})
263264
}

pkg/addon-operator/queue_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func Test_RemoveAdjacentConvergeModules(t *testing.T) {
307307
// Check tasks after remove.
308308
require.Equal(t, len(tt.expect), q.Length(), "queue length should match length of expected tasks")
309309
i := 0
310-
q.Iterate(func(tsk sh_task.Task) {
310+
q.IterateSnapshot(func(tsk sh_task.Task) {
311311
require.Equal(t, tt.expect[i].Id, tsk.GetId(), "ID should match for task %d %+v", i, tsk)
312312
require.Equal(t, tt.expect[i].Type, tsk.GetType(), "Type should match for task %d %+v", i, tsk)
313313
i++
@@ -566,7 +566,7 @@ func Test_RemoveCurrentConvergeTasks(t *testing.T) {
566566
// Check tasks in queue after remove.
567567
require.Equal(t, len(tasks), queues[i].Length(), "length of queue %d should match length of expected tasks", i)
568568
j := 0
569-
queues[i].Iterate(func(tsk sh_task.Task) {
569+
queues[i].IterateSnapshot(func(tsk sh_task.Task) {
570570
require.Equal(t, tt.expectTasks[i][j].Id, tsk.GetId(), "ID should match for task %d %+v", j, tsk)
571571
require.Equal(t, tt.expectTasks[i][j].Type, tsk.GetType(), "Type should match for task %d %+v", j, tsk)
572572
j++
@@ -692,7 +692,7 @@ func Test_RemoveCurrentConvergeTasksFromId(t *testing.T) {
692692
// Check tasks in queue after remove.
693693
require.Equal(t, len(tt.expectTasks), q.Length(), "queue length should match length of expected tasks")
694694
i := 0
695-
q.Iterate(func(tsk sh_task.Task) {
695+
q.IterateSnapshot(func(tsk sh_task.Task) {
696696
require.Equal(t, tt.expectTasks[i].Id, tsk.GetId(), "ID should match for task %d %+v", i, tsk)
697697
require.Equal(t, tt.expectTasks[i].Type, tsk.GetType(), "Type should match for task %d %+v", i, tsk)
698698
i++

pkg/module_manager/module_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1536,7 +1536,7 @@ func modulesWithPendingTasks(q *queue.TaskQueue, taskType sh_task.TaskType) map[
15361536

15371537
skipFirstTask := true
15381538

1539-
q.Iterate(func(t sh_task.Task) {
1539+
q.IterateSnapshot(func(t sh_task.Task) {
15401540
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
15411541
if skipFirstTask {
15421542
skipFirstTask = false

pkg/task/queue/queue.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func convergeTasksInQueue(q *queue.TaskQueue) int {
119119
}
120120

121121
convergeTasks := 0
122-
q.Iterate(func(t sh_task.Task) {
122+
q.IterateSnapshot(func(t sh_task.Task) {
123123
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
124124
convergeTasks++
125125
}
@@ -135,7 +135,7 @@ func (s *Service) DrainNonMainQueue(queueName string) {
135135
}
136136

137137
// Remove all tasks.
138-
q.Filter(func(_ sh_task.Task) bool {
138+
q.DeleteFunc(func(_ sh_task.Task) bool {
139139
return false
140140
})
141141
}
@@ -157,7 +157,7 @@ func (s *Service) RemoveAdjacentConvergeModules(queueName string, afterId string
157157
IDFound := false
158158
stop := false
159159

160-
q.Filter(func(t sh_task.Task) bool {
160+
q.DeleteFunc(func(t sh_task.Task) bool {
161161
if stop {
162162
return true
163163
}
@@ -212,7 +212,7 @@ func modulesWithPendingModuleRun(q *queue.TaskQueue) map[string]struct{} {
212212

213213
skipFirstTask := true
214214

215-
q.Iterate(func(t sh_task.Task) {
215+
q.IterateSnapshot(func(t sh_task.Task) {
216216
// Skip the first task in the queue as it can be executed already, i.e. "not pending".
217217
if skipFirstTask {
218218
skipFirstTask = false
@@ -244,7 +244,7 @@ func (s *Service) ModuleEnsureCRDsTasksInMainQueueAfterId(afterId string) bool {
244244
IDFound := false
245245
taskFound := false
246246
stop := false
247-
q.Filter(func(t sh_task.Task) bool {
247+
q.DeleteFunc(func(t sh_task.Task) bool {
248248
if stop {
249249
return true
250250
}

pkg/task/service/converge.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func ConvergeTasksInQueue(q *queue.TaskQueue) int {
118118
}
119119

120120
convergeTasks := 0
121-
q.Iterate(func(t sh_task.Task) {
121+
q.IterateSnapshot(func(t sh_task.Task) {
122122
if converge.IsConvergeTask(t) || converge.IsFirstConvergeTask(t) {
123123
convergeTasks++
124124
}
@@ -133,7 +133,7 @@ func ConvergeModulesInQueue(q *queue.TaskQueue) int {
133133
}
134134

135135
tasks := 0
136-
q.Iterate(func(t sh_task.Task) {
136+
q.IterateSnapshot(func(t sh_task.Task) {
137137
taskType := t.GetType()
138138
if converge.IsConvergeTask(t) && (taskType == task.ModuleRun || taskType == task.ModuleDelete) {
139139
tasks++

0 commit comments

Comments
 (0)