Skip to content

Commit f5697a3

Browse files
committed
Removed the source queue and pulled object from the channel
1 parent 226bf39 commit f5697a3

File tree

3 files changed

+36
-32
lines changed

3 files changed

+36
-32
lines changed

events/controller.go

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,9 @@ import (
99
pmetrics "github.com/awslabs/operatorpkg/metrics"
1010
"github.com/awslabs/operatorpkg/object"
1111
"github.com/awslabs/operatorpkg/singleton"
12-
"github.com/samber/lo"
1312
v1 "k8s.io/api/core/v1"
14-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1513
"k8s.io/apimachinery/pkg/runtime/schema"
1614
"k8s.io/apimachinery/pkg/watch"
17-
"k8s.io/client-go/kubernetes"
1815
"k8s.io/utils/clock"
1916
controllerruntime "sigs.k8s.io/controller-runtime"
2017
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -23,32 +20,29 @@ import (
2320
)
2421

2522
type Controller[T client.Object] struct {
26-
gvk schema.GroupVersionKind
27-
startTime time.Time
28-
kubeClient client.Client
29-
EventCount pmetrics.CounterMetric
30-
EventWatch watch.Interface
23+
gvk schema.GroupVersionKind
24+
startTime time.Time
25+
kubeClient client.Client
26+
EventCount pmetrics.CounterMetric
27+
EventWatchChannel <-chan watch.Event
3128
}
3229

33-
func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, kubernetesInterface kubernetes.Interface) *Controller[T] {
30+
func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, channel <-chan watch.Event) *Controller[T] {
3431
gvk := object.GVK(object.New[T]())
3532
return &Controller[T]{
36-
gvk: gvk,
37-
startTime: clock.Now(),
38-
kubeClient: client,
39-
EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)),
40-
EventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{
41-
// Only reconcile on the object kind we care about
42-
FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", gvk.Kind, gvk.GroupVersion().String()),
43-
})),
33+
gvk: gvk,
34+
startTime: clock.Now(),
35+
kubeClient: client,
36+
EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)),
37+
EventWatchChannel: channel,
4438
}
4539
}
4640

4741
func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error {
4842
return controllerruntime.NewControllerManagedBy(m).
4943
Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))).
5044
WatchesRawSource(singleton.Source()).
51-
Complete(singleton.AsChannelObjectReconciler(c.EventWatch.ResultChan(), c))
45+
Complete(singleton.AsChannelObjectReconciler(c.EventWatchChannel, c))
5246
}
5347

5448
func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) {

events/suite_test.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import (
2020
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2121
"k8s.io/apimachinery/pkg/runtime"
2222
"k8s.io/apimachinery/pkg/runtime/schema"
23-
"k8s.io/client-go/kubernetes"
23+
"k8s.io/apimachinery/pkg/watch"
2424
"k8s.io/client-go/kubernetes/scheme"
2525
clock "k8s.io/utils/clock/testing"
2626
"sigs.k8s.io/controller-runtime/pkg/client"
27-
"sigs.k8s.io/controller-runtime/pkg/envtest"
27+
ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake"
2828
"sigs.k8s.io/controller-runtime/pkg/log"
2929
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3030
)
@@ -39,8 +39,8 @@ var (
3939
var ctx context.Context
4040
var fakeClock *clock.FakeClock
4141
var controller *events.Controller[*test.CustomObject]
42-
var environment envtest.Environment
4342
var kubeClient client.Client
43+
var eventChannel chan watch.Event
4444

4545
func Test(t *testing.T) {
4646
lo.Must0(SchemeBuilder.AddToScheme(scheme.Scheme))
@@ -52,15 +52,13 @@ var _ = BeforeSuite(func() {
5252
ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr)
5353

5454
fakeClock = clock.NewFakeClock(time.Now())
55-
environment = envtest.Environment{Scheme: scheme.Scheme}
56-
_ = lo.Must(environment.Start())
57-
kubeClient = lo.Must(client.New(environment.Config, client.Options{Scheme: scheme.Scheme}))
55+
kubeClient = ctrlfake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string {
56+
evt := o.(*corev1.Event)
57+
return []string{evt.InvolvedObject.Kind}
58+
}).Build()
5859

59-
controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, kubernetes.NewForConfigOrDie(environment.Config))
60-
})
61-
62-
var _ = AfterSuite(func() {
63-
environment.Stop()
60+
eventChannel = make(chan watch.Event, 1000)
61+
controller = events.NewController[*test.CustomObject](ctx, kubeClient, fakeClock, eventChannel)
6462
})
6563

6664
var _ = Describe("Controller", func() {
@@ -78,8 +76,11 @@ var _ = Describe("Controller", func() {
7876
// expect an metrics for custom object to be zero, waiting on controller reconcile
7977
Expect(GetMetric("operator_customobject_event_total", conditionLabels(fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i)))).To(BeNil())
8078

79+
eventChannel <- watch.Event{
80+
Object: events[i],
81+
}
8182
// reconcile on the event
82-
_, err := singleton.AsChannelObjectReconciler(controller.EventWatch.ResultChan(), controller).Reconcile(ctx, reconcile.Request{})
83+
_, err := singleton.AsChannelObjectReconciler(eventChannel, controller).Reconcile(ctx, reconcile.Request{})
8384
Expect(err).ToNot(HaveOccurred())
8485

8586
// expect an emitted metric to for the event
@@ -95,8 +96,11 @@ var _ = Describe("Controller", func() {
9596
// expect an metrics for custom object to be zero, waiting on controller reconcile
9697
Expect(GetMetric("operator_ustomobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason"))).To(BeNil())
9798

99+
eventChannel <- watch.Event{
100+
Object: event,
101+
}
98102
// reconcile on the event
99-
_, err := singleton.AsChannelObjectReconciler(controller.EventWatch.ResultChan(), controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
103+
_, err := singleton.AsChannelObjectReconciler(eventChannel, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
100104
Expect(err).ToNot(HaveOccurred())
101105

102106
// expect not have an emitted metric to for the event
@@ -106,8 +110,11 @@ var _ = Describe("Controller", func() {
106110
event.LastTimestamp.Time = time.Now().Add(-30 * time.Minute)
107111
ExpectApplied(ctx, kubeClient, event)
108112

113+
eventChannel <- watch.Event{
114+
Object: event,
115+
}
109116
// reconcile on the event
110-
_, err = singleton.AsChannelObjectReconciler(controller.EventWatch.ResultChan(), controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
117+
_, err = singleton.AsChannelObjectReconciler(eventChannel, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
111118
Expect(err).ToNot(HaveOccurred())
112119

113120
// expect an emitted metric to for the event

singleton/controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ func AsChannelObjectReconciler[T client.Object](watchEvents <-chan watch.Event,
4040
var errs error
4141
var results []reconcile.Result
4242
e := <-watchEvents
43+
if e.Object == nil {
44+
return reconcile.Result{RequeueAfter: RequeueImmediately}, nil
45+
}
4346
res, err := reconciler.Reconcile(ctx, e.Object.(T))
4447
errs = multierr.Append(errs, err)
4548
results = append(results, res)

0 commit comments

Comments
 (0)