Skip to content

Commit 062432a

Browse files
committed
Avoid Event cache hydration by using clinet go watcher
1 parent 812bd26 commit 062432a

File tree

3 files changed

+56
-14
lines changed

3 files changed

+56
-14
lines changed

events/controller.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@ import (
88

99
pmetrics "github.com/awslabs/operatorpkg/metrics"
1010
"github.com/awslabs/operatorpkg/object"
11+
"github.com/awslabs/operatorpkg/singleton"
12+
"github.com/samber/lo"
1113
v1 "k8s.io/api/core/v1"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1215
"k8s.io/apimachinery/pkg/runtime/schema"
16+
"k8s.io/apimachinery/pkg/watch"
17+
"k8s.io/client-go/kubernetes"
1318
"k8s.io/utils/clock"
1419
controllerruntime "sigs.k8s.io/controller-runtime"
15-
"sigs.k8s.io/controller-runtime/pkg/builder"
1620
"sigs.k8s.io/controller-runtime/pkg/client"
17-
"sigs.k8s.io/controller-runtime/pkg/controller"
1821
"sigs.k8s.io/controller-runtime/pkg/manager"
19-
"sigs.k8s.io/controller-runtime/pkg/predicate"
2022
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2123
)
2224

@@ -25,31 +27,32 @@ type Controller[T client.Object] struct {
2527
startTime time.Time
2628
kubeClient client.Client
2729
EventCount pmetrics.CounterMetric
30+
eventWatch watch.Interface
2831
}
2932

30-
func NewController[T client.Object](client client.Client, clock clock.Clock) *Controller[T] {
33+
func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, kubernetesInterface kubernetes.Interface) *Controller[T] {
3134
gvk := object.GVK(object.New[T]())
3235
return &Controller[T]{
3336
gvk: gvk,
3437
startTime: clock.Now(),
3538
kubeClient: client,
3639
EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)),
40+
eventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{})),
3741
}
3842
}
3943

40-
func (c *Controller[T]) Register(_ context.Context, m manager.Manager) error {
44+
func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error {
4145
return controllerruntime.NewControllerManagedBy(m).
42-
For(&v1.Event{}, builder.WithPredicates(predicate.NewTypedPredicateFuncs(func(o client.Object) bool {
43-
// Only reconcile on the object kind we care about
44-
event := o.(*v1.Event)
45-
return event.InvolvedObject.Kind == c.gvk.Kind && event.InvolvedObject.APIVersion == c.gvk.GroupVersion().String()
46-
}))).
47-
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
4846
Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))).
49-
Complete(reconcile.AsReconciler(m.GetClient(), c))
47+
WatchesRawSource(singleton.ChannelSource[*v1.Event](ctx, c.eventWatch.ResultChan())).
48+
Complete(reconcile.AsReconciler(c.kubeClient, c))
5049
}
5150

5251
func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) {
52+
if event.InvolvedObject.Kind != c.gvk.Kind && event.InvolvedObject.APIVersion != c.gvk.GroupVersion().String() {
53+
return reconcile.Result{}, nil
54+
}
55+
5356
// We check if the event was created in the lifetime of this controller
5457
// since we don't duplicate metrics on controller restart or lease handover
5558
if c.startTime.Before(event.LastTimestamp.Time) {

events/suite_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ import (
1919
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2020
"k8s.io/apimachinery/pkg/runtime"
2121
"k8s.io/apimachinery/pkg/runtime/schema"
22+
"k8s.io/client-go/kubernetes"
2223
"k8s.io/client-go/kubernetes/scheme"
2324
clock "k8s.io/utils/clock/testing"
2425
"sigs.k8s.io/controller-runtime/pkg/client"
2526
"sigs.k8s.io/controller-runtime/pkg/client/fake"
27+
"sigs.k8s.io/controller-runtime/pkg/envtest"
2628
"sigs.k8s.io/controller-runtime/pkg/log"
2729
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2830
)
@@ -46,13 +48,18 @@ func Test(t *testing.T) {
4648
}
4749

4850
var _ = BeforeSuite(func() {
51+
ctx = context.Background()
52+
ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr)
53+
4954
fakeClock = clock.NewFakeClock(time.Now())
5055
kubeClient = fake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string {
5156
evt := o.(*corev1.Event)
5257
return []string{evt.InvolvedObject.Kind}
5358
}).Build()
54-
controller = events.NewController[*test.CustomObject](kubeClient, fakeClock)
55-
ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr)
59+
environment := envtest.Environment{Scheme: scheme.Scheme}
60+
_ = lo.Must(environment.Start())
61+
62+
controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, kubernetes.NewForConfigOrDie(environment.Config))
5663
})
5764

5865
var _ = Describe("Controller", func() {

singleton/controller.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"time"
66

7+
"k8s.io/apimachinery/pkg/watch"
78
"k8s.io/client-go/util/workqueue"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
810
"sigs.k8s.io/controller-runtime/pkg/event"
911
"sigs.k8s.io/controller-runtime/pkg/handler"
1012
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -36,3 +38,33 @@ func Source() source.Source {
3638
},
3739
})
3840
}
41+
42+
func ChannelSource[T client.Object](ctx context.Context, objectChan <-chan watch.Event) source.Source {
43+
eventSource := make(chan event.GenericEvent, 1)
44+
45+
go func(ctx context.Context, objectChan <-chan watch.Event, eventSource chan event.GenericEvent) {
46+
47+
for {
48+
select {
49+
case delta, ok := <-objectChan:
50+
if !ok {
51+
return
52+
}
53+
// Convert the delta to GenericEvent
54+
eventSource <- event.GenericEvent{
55+
Object: delta.Object.(T),
56+
}
57+
case <-ctx.Done():
58+
return
59+
}
60+
}
61+
}(ctx, objectChan, eventSource)
62+
63+
return source.Channel(eventSource, handler.Funcs{
64+
GenericFunc: func(_ context.Context, event event.GenericEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
65+
queue.Add(reconcile.Request{
66+
NamespacedName: client.ObjectKeyFromObject(event.Object),
67+
})
68+
},
69+
})
70+
}

0 commit comments

Comments
 (0)