Skip to content

Commit a893020

Browse files
Remove Trigger finalizer when Broker is removed (#1597)
This bug is reproducible in this situation: - Create Kafka Broker `b1` - Create a trigger `t1` associated with `b1` - Remove Kafka Broker `b1` - Remove Trigger `t1` `t1` will be in `Terminating` state forever since our reconciler will not reconcile it since we can't know if Trigger is our trigger given that there is not `b1`. Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com> Co-authored-by: Pierangelo Di Pilato <pdipilat@redhat.com>
1 parent 67b2cdd commit a893020

File tree

2 files changed

+133
-5
lines changed

2 files changed

+133
-5
lines changed

control-plane/pkg/reconciler/trigger/controller.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ func filterTriggers(lister eventinglisters.BrokerLister) func(interface{}) bool
119119
return false
120120
}
121121

122+
if hasKafkaBrokerTriggerFinalizer(trigger.Finalizers, FinalizerName) {
123+
return true
124+
}
125+
122126
broker, err := lister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker)
123127
if err != nil {
124128
return false
@@ -129,6 +133,15 @@ func filterTriggers(lister eventinglisters.BrokerLister) func(interface{}) bool
129133
}
130134
}
131135

136+
func hasKafkaBrokerTriggerFinalizer(finalizers []string, finalizerName string) bool {
137+
for _, f := range finalizers {
138+
if f == finalizerName {
139+
return true
140+
}
141+
}
142+
return false
143+
}
144+
132145
func enqueueTriggers(
133146
logger *zap.Logger,
134147
lister eventinglisters.TriggerLister,

control-plane/pkg/reconciler/trigger/controller_test.go

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"testing"
2121

2222
"github.com/stretchr/testify/assert"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2324
eventing "knative.dev/eventing/pkg/apis/eventing/v1"
2425
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
2526
"knative.dev/pkg/configmap"
@@ -34,6 +35,7 @@ import (
3435
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake"
3536

3637
"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
38+
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka"
3739
)
3840

3941
func TestNewController(t *testing.T) {
@@ -48,11 +50,124 @@ func TestNewController(t *testing.T) {
4850
func TestFilterTriggers(t *testing.T) {
4951
ctx, _ := reconcilertesting.SetupFakeContext(t)
5052

51-
pass := filterTriggers(brokerinformer.Get(ctx).Lister())(&eventing.Trigger{
52-
Spec: eventing.TriggerSpec{
53-
Broker: "not-exists",
53+
tt := []struct {
54+
name string
55+
trigger interface{}
56+
pass bool
57+
brokers []*eventing.Broker
58+
}{
59+
{
60+
name: "unknown type",
61+
trigger: &eventing.Broker{},
62+
pass: false,
5463
},
55-
})
64+
{
65+
name: "non existing broker",
66+
trigger: &eventing.Trigger{
67+
Spec: eventing.TriggerSpec{
68+
Broker: "not-exists",
69+
},
70+
},
71+
pass: false,
72+
},
73+
{
74+
name: "non existing broker and trigger with kafka broker finalizer",
75+
trigger: &eventing.Trigger{
76+
ObjectMeta: metav1.ObjectMeta{
77+
Finalizers: []string{FinalizerName},
78+
},
79+
Spec: eventing.TriggerSpec{
80+
Broker: "not-exists",
81+
},
82+
},
83+
pass: true,
84+
},
85+
{
86+
name: "exiting kafka broker",
87+
trigger: &eventing.Trigger{
88+
ObjectMeta: metav1.ObjectMeta{
89+
Namespace: "ns",
90+
Name: "tr",
91+
Finalizers: []string{FinalizerName},
92+
},
93+
Spec: eventing.TriggerSpec{
94+
Broker: "br",
95+
},
96+
},
97+
brokers: []*eventing.Broker{
98+
{
99+
ObjectMeta: metav1.ObjectMeta{
100+
Namespace: "ns",
101+
Name: "br",
102+
Annotations: map[string]string{
103+
eventing.BrokerClassAnnotationKey: kafka.BrokerClass,
104+
},
105+
},
106+
},
107+
},
108+
pass: true,
109+
},
110+
{
111+
name: "exiting non kafka broker",
112+
trigger: &eventing.Trigger{
113+
ObjectMeta: metav1.ObjectMeta{
114+
Namespace: "ns",
115+
Name: "tr",
116+
},
117+
Spec: eventing.TriggerSpec{
118+
Broker: "br",
119+
},
120+
},
121+
brokers: []*eventing.Broker{
122+
{
123+
ObjectMeta: metav1.ObjectMeta{
124+
Namespace: "ns",
125+
Name: "br",
126+
Annotations: map[string]string{
127+
eventing.BrokerClassAnnotationKey: kafka.BrokerClass + "-boh",
128+
},
129+
},
130+
},
131+
},
132+
pass: false,
133+
},
134+
{
135+
name: "exiting non kafka broker - trigger with kafka broker finalizer",
136+
trigger: &eventing.Trigger{
137+
ObjectMeta: metav1.ObjectMeta{
138+
Namespace: "ns",
139+
Name: "tr",
140+
Finalizers: []string{FinalizerName},
141+
},
142+
Spec: eventing.TriggerSpec{
143+
Broker: "br",
144+
},
145+
},
146+
brokers: []*eventing.Broker{
147+
{
148+
ObjectMeta: metav1.ObjectMeta{
149+
Namespace: "ns",
150+
Name: "br",
151+
Annotations: map[string]string{
152+
eventing.BrokerClassAnnotationKey: kafka.BrokerClass + "-boh",
153+
},
154+
},
155+
},
156+
},
157+
pass: true,
158+
},
159+
}
56160

57-
assert.False(t, pass)
161+
for _, tc := range tt {
162+
tc := tc
163+
t.Run(tc.name, func(t *testing.T) {
164+
brokerInformer := brokerinformer.Get(ctx)
165+
for _, obj := range tc.brokers {
166+
_ = brokerInformer.Informer().GetStore().Add(obj)
167+
}
168+
filter := filterTriggers(brokerInformer.Lister())
169+
pass := filter(tc.trigger)
170+
assert.Equal(t, tc.pass, pass)
171+
})
172+
}
58173
}

0 commit comments

Comments
 (0)