Skip to content

Commit 3e4f781

Browse files
authored
Merge pull request #5 from kaloom/retry-2
Add longer retry decupled from the event thread.
2 parents e0f0d54 + 43ec04f commit 3e4f781

File tree

3 files changed

+170
-29
lines changed

3 files changed

+170
-29
lines changed

controller/controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/*
2+
/*
23
Copyright 2017-2019 Kaloom Inc.
34
45
Licensed under the Apache License, Version 2.0 (the "License");
@@ -59,6 +60,7 @@ type Controller struct {
5960
kubeClient *kubernetes.Clientset
6061
runtime Runtime
6162
cniPlugin *cni.NetworkPlugin
63+
eventQueue *EventQueue
6264
}
6365

6466
// Run starts a Pod resource controller
@@ -111,6 +113,7 @@ func NewController(kubeClient *kubernetes.Clientset, endpoint, cniBinPath, cniCo
111113
kubeClient: kubeClient,
112114
runtime: runTime,
113115
cniPlugin: cniPlugin,
116+
eventQueue: newQueue(),
114117
}
115118
return c, nil
116119
}

controller/eventqueue.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
Copyright 2017-2021 Kaloom Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"container/list"
21+
"fmt"
22+
"github.com/golang/glog"
23+
"sync"
24+
)
25+
26+
type OpType int
27+
28+
const (
29+
Add OpType = iota
30+
Delete
31+
)
32+
33+
// Event struct
34+
type Event struct {
35+
opType OpType
36+
data interface{}
37+
}
38+
39+
// EventQueue is a FIFO type queue
40+
type EventQueue struct {
41+
q *list.List
42+
m map[string]*list.Element // ref in the eventQueue list
43+
lock sync.Mutex
44+
cond *sync.Cond
45+
}
46+
47+
// newQueue will create a new FIFO queue
48+
func newQueue() *EventQueue {
49+
eq := &EventQueue{m: make(map[string]*list.Element), q: list.New()}
50+
eq.q.Init()
51+
eq.cond = sync.NewCond(&eq.lock)
52+
return eq
53+
}
54+
55+
// getKey will create a unique key identifying the event
56+
func (ev *Event) getKey() string {
57+
return fmt.Sprintf("%+v", ev.data)
58+
}
59+
60+
// Enqueue will push the new event in the FIFO queue
61+
// If a similar event already exists with a opposite operation type, both event will be discarded.
62+
func (eq *EventQueue) Enqueue(event *Event) {
63+
eq.cond.L.Lock()
64+
defer eq.cond.L.Unlock()
65+
66+
key := event.getKey()
67+
glog.V(5).Infof("Enqueuing using key:%s", key)
68+
if e, ok := eq.m[key]; ok {
69+
ev := e.Value.(Event)
70+
if ev.opType != event.opType {
71+
glog.Infof("Cancelling events from queue - events cancels each others:", ev)
72+
eq.q.Remove(e)
73+
delete(eq.m, key)
74+
return
75+
}
76+
}
77+
78+
e := eq.q.PushBack(*event)
79+
glog.V(5).Infof("Enqueue new event:", event)
80+
eq.m[key] = e
81+
eq.cond.Signal()
82+
}
83+
84+
// Dequeue will remove the first element from the queue and return it for processing.
85+
// The caller MUST use the mutex provided by the EventQueue struct
86+
func (eq *EventQueue) Dequeue() *Event {
87+
e := eq.q.Front() // First element
88+
if e == nil {
89+
return nil
90+
}
91+
ev := e.Value.(Event)
92+
eq.q.Remove(e)
93+
delete(eq.m, ev.getKey())
94+
return &ev
95+
}

controller/pods.go

Lines changed: 72 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,51 @@ type cniPodNetworkProperty struct {
4747
type cniPodNetworks []cniPodNetwork
4848

4949
const (
50-
maxRetries = 5
51-
retryDelay = 1 * time.Second
50+
maxRetries = 60
51+
retryDelay = 10 * time.Second
5252
)
5353

54+
// Process will take a element from the FIFO queue and attempt to process it (either add or remove network)
55+
func (c *Controller) Process(e *Event) {
56+
var err error
57+
switch e.opType {
58+
case Add:
59+
for i := 0; i < maxRetries; i++ {
60+
err = c.cniPlugin.AddNetwork(e.data.(*cni.Parameters))
61+
if err == nil {
62+
if i > 0 {
63+
glog.Infof("Succeeded adding network %+v after %d attempt", e.data, i)
64+
} else {
65+
glog.V(5).Infof("Succeeded adding network %+v on first attempt", e.data)
66+
}
67+
return
68+
}
69+
glog.Warningf("Failed adding network %+v... retrying %d/%d. err:%v", e.data, i+1, maxRetries, err)
70+
time.Sleep(retryDelay)
71+
}
72+
glog.Errorf("Failed adding network %+v after %d attempt. err:%v", e.data, maxRetries, err)
73+
74+
case Delete:
75+
for i := 0; i < maxRetries; i++ {
76+
err = c.cniPlugin.DeleteNetwork(e.data.(*cni.Parameters))
77+
if err == nil {
78+
if i > 0 {
79+
glog.Infof("Succeeded deleting network %+v after %d attempt", e.data, i)
80+
} else {
81+
glog.V(5).Infof("Succeeded deleting network %+v on first attempt", e.data)
82+
}
83+
return
84+
}
85+
glog.Warningf("Failed deleting network %+v... retrying %d/%d. err:%v", e.data, i+1, maxRetries, err)
86+
time.Sleep(retryDelay)
87+
}
88+
glog.Errorf("Failed deleting network %+v after %d attempt. err:%v", e.data, maxRetries, err)
89+
90+
default:
91+
glog.Errorf("processing invalid operation type value in event %+v", e)
92+
}
93+
}
94+
5495
func getNetworkSet(networks string) (gset.GSet, error) {
5596
nets := cniPodNetworks{}
5697
netSetBuilder := gset.NewBuilder()
@@ -122,19 +163,8 @@ func (c *Controller) addNetwork(podObj *apiv1.Pod, networkName string, np cniPod
122163
return err
123164
}
124165

125-
for i := 0; i < maxRetries; i++ {
126-
err = c.cniPlugin.AddNetwork(cniParams)
127-
if err == nil {
128-
if i > 0 {
129-
glog.V(4).Infof("Succeeded adding network %s on Pod %s after %d attempt", networkName, podObj.ObjectMeta.Name, i)
130-
}
131-
break
132-
}
133-
glog.Warningf("Failed adding network %s on Pod %s... retrying %d/%d. err:%v", networkName, podObj.ObjectMeta.Name, i+1, maxRetries, err)
134-
time.Sleep(retryDelay)
135-
}
136-
137-
return err
166+
c.eventQueue.Enqueue(&Event{opType: Add, data: cniParams})
167+
return nil
138168
}
139169

140170
func (c *Controller) delNetwork(podObj *apiv1.Pod, networkName string, np cniPodNetworkProperty) error {
@@ -147,19 +177,8 @@ func (c *Controller) delNetwork(podObj *apiv1.Pod, networkName string, np cniPod
147177
return err
148178
}
149179

150-
for i := 0; i < maxRetries; i++ {
151-
err = c.cniPlugin.DeleteNetwork(cniParams)
152-
if err == nil {
153-
if i > 0 {
154-
glog.V(4).Infof("Succeeded deleting network %s on Pod %s after %d attempt", networkName, podObj.ObjectMeta.Name, i)
155-
}
156-
break
157-
}
158-
glog.Warningf("Failed deleting network %s on Pod %s... retrying %d/%d. err:%v", networkName, podObj.ObjectMeta.Name, i+1, maxRetries, err)
159-
time.Sleep(retryDelay)
160-
}
161-
162-
return err
180+
c.eventQueue.Enqueue(&Event{opType: Delete, data: cniParams})
181+
return nil
163182
}
164183

165184
func (c *Controller) podUpdated(oldObj, newObj interface{}) {
@@ -207,6 +226,7 @@ func (c *Controller) podUpdated(oldObj, newObj interface{}) {
207226
np := cniPodNetworkProperty{}
208227
for _, n := range nets {
209228
np.IfMAC = n.IfMAC
229+
np.IsPrimary = n.IsPrimary
210230
err := c.delNetwork(newPod, n.NetworkName, np)
211231
if err != nil {
212232
glog.Errorf("Failed to delete network %s on pod %s", n.NetworkName, podName)
@@ -223,15 +243,38 @@ func (c *Controller) podUpdated(oldObj, newObj interface{}) {
223243
np := cniPodNetworkProperty{}
224244
for _, n := range nets {
225245
np.IfMAC = n.IfMAC
246+
np.IsPrimary = n.IsPrimary
226247
err := c.addNetwork(newPod, n.NetworkName, np)
227248
if err != nil {
228-
glog.V(4).Infof("Failed to add network %s on pod %s", n.NetworkName, podName)
249+
glog.Errorf("Failed to add network %s on pod %s", n.NetworkName, podName)
229250
}
230251
}
231252
}
232253
}
233254

255+
func (c *Controller) eventQueueWorker() {
256+
for {
257+
c.eventQueue.cond.L.Lock()
258+
259+
for c.eventQueue.q.Len() == 0 {
260+
c.eventQueue.cond.Wait()
261+
}
262+
263+
ev := c.eventQueue.Dequeue()
264+
c.eventQueue.cond.L.Unlock()
265+
266+
if ev != nil {
267+
glog.V(5).Infof("Processing event:", ev)
268+
c.Process(ev)
269+
}
270+
}
271+
}
272+
234273
func (c *Controller) watchPods(ctx context.Context, nodeName string) (cache.Controller, error) {
274+
275+
// Initialize the worker queue
276+
go c.eventQueueWorker()
277+
235278
// Currently there is no field selector for a Pod annotation
236279
// https://github.com/kubernetes/kubernetes/blob/master/pkg/registry/core/pod/strategy.go
237280
fs := fields.Set{

0 commit comments

Comments
 (0)