Skip to content

Commit 54ddca1

Browse files
committed
Adding
1 parent 469541a commit 54ddca1

File tree

3 files changed

+20
-48
lines changed

3 files changed

+20
-48
lines changed

events/controller.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,29 +37,31 @@ func NewController[T client.Object](ctx context.Context, client client.Client, c
3737
startTime: clock.Now(),
3838
kubeClient: client,
3939
EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)),
40-
eventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{})),
40+
eventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{
41+
FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", gvk.Kind, gvk.GroupVersion().String()),
42+
})),
4143
}
4244
}
4345

4446
func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error {
4547
return controllerruntime.NewControllerManagedBy(m).
4648
Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))).
47-
WatchesRawSource(singleton.ChannelSource[*v1.Event](ctx, c.eventWatch.ResultChan())).
48-
Complete(reconcile.AsReconciler(c.kubeClient, c))
49+
WatchesRawSource(singleton.Source()).
50+
Complete(singleton.AsReconciler(c))
4951
}
5052

51-
func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) {
52-
if event.InvolvedObject.Kind != c.gvk.Kind || event.InvolvedObject.APIVersion != c.gvk.GroupVersion().String() {
53-
return reconcile.Result{}, nil
54-
}
53+
func (c *Controller[T]) Reconcile(ctx context.Context) (reconcile.Result, error) {
54+
for e := range c.eventWatch.ResultChan() {
55+
event := e.Object.(*v1.Event)
5556

56-
// We check if the event was created in the lifetime of this controller
57-
// since we don't duplicate metrics on controller restart or lease handover
58-
if c.startTime.Before(event.LastTimestamp.Time) {
59-
c.EventCount.Inc(map[string]string{
60-
pmetrics.LabelType: event.Type,
61-
pmetrics.LabelReason: event.Reason,
62-
})
57+
// We check if the event was created in the lifetime of this controller
58+
// since we don't duplicate metrics on controller restart or lease handover
59+
if c.startTime.Before(event.LastTimestamp.Time) {
60+
c.EventCount.Inc(map[string]string{
61+
pmetrics.LabelType: event.Type,
62+
pmetrics.LabelReason: event.Reason,
63+
})
64+
}
6365
}
6466

6567
return reconcile.Result{}, nil

events/suite_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/awslabs/operatorpkg/events"
1010
pmetrics "github.com/awslabs/operatorpkg/metrics"
1111
"github.com/awslabs/operatorpkg/object"
12+
"github.com/awslabs/operatorpkg/singleton"
1213
"github.com/awslabs/operatorpkg/test"
1314
. "github.com/awslabs/operatorpkg/test/expectations"
1415
"github.com/onsi/ginkgo/v2"
@@ -73,7 +74,7 @@ var _ = Describe("Controller", func() {
7374
Expect(GetMetric("operator_customobject_event_total", conditionLabels(fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i)))).To(BeNil())
7475

7576
// reconcile on the event
76-
_, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(events[i])})
77+
_, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(events[i])})
7778
Expect(err).ToNot(HaveOccurred())
7879

7980
// expect an emitted metric to for the event
@@ -90,7 +91,7 @@ var _ = Describe("Controller", func() {
9091
Expect(GetMetric("operator_ustomobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason"))).To(BeNil())
9192

9293
// reconcile on the event
93-
_, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
94+
_, err := singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
9495
Expect(err).ToNot(HaveOccurred())
9596

9697
// expect not have an emitted metric to for the event
@@ -101,7 +102,7 @@ var _ = Describe("Controller", func() {
101102
ExpectApplied(ctx, kubeClient, event)
102103

103104
// reconcile on the event
104-
_, err = reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
105+
_, err = singleton.AsReconciler(controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
105106
Expect(err).ToNot(HaveOccurred())
106107

107108
// expect an emitted metric to for the event

singleton/controller.go

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import (
44
"context"
55
"time"
66

7-
"k8s.io/apimachinery/pkg/watch"
87
"k8s.io/client-go/util/workqueue"
9-
"sigs.k8s.io/controller-runtime/pkg/client"
108
"sigs.k8s.io/controller-runtime/pkg/event"
119
"sigs.k8s.io/controller-runtime/pkg/handler"
1210
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -38,32 +36,3 @@ func Source() source.Source {
3836
},
3937
})
4038
}
41-
42-
func ChannelSource[T client.Object](ctx context.Context, objectChan <-chan watch.Event) source.Source {
43-
eventSource := make(chan event.GenericEvent, 1000)
44-
45-
go func(ctx context.Context, objectChan <-chan watch.Event, eventSource chan event.GenericEvent) {
46-
for {
47-
select {
48-
case delta, ok := <-objectChan:
49-
if !ok {
50-
return
51-
}
52-
// Convert the delta to GenericEvent
53-
eventSource <- event.GenericEvent{
54-
Object: delta.Object.(T),
55-
}
56-
case <-ctx.Done():
57-
return
58-
}
59-
}
60-
}(ctx, objectChan, eventSource)
61-
62-
return source.Channel(eventSource, handler.Funcs{
63-
GenericFunc: func(_ context.Context, event event.GenericEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
64-
queue.Add(reconcile.Request{
65-
NamespacedName: client.ObjectKeyFromObject(event.Object),
66-
})
67-
},
68-
})
69-
}

0 commit comments

Comments
 (0)