Skip to content

Commit 531af49

Browse files
committed
Removed the source queue and pulled object from the channel
1 parent 54ddca1 commit 531af49

File tree

2 files changed

+47
-15
lines changed

2 files changed

+47
-15
lines changed

events/controller.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Controller[T client.Object] struct {
2727
startTime time.Time
2828
kubeClient client.Client
2929
EventCount pmetrics.CounterMetric
30-
eventWatch watch.Interface
30+
EventWatch watch.Interface
3131
}
3232

3333
func NewController[T client.Object](ctx context.Context, client client.Client, clock clock.Clock, kubernetesInterface kubernetes.Interface) *Controller[T] {
@@ -37,7 +37,7 @@ func NewController[T client.Object](ctx context.Context, client client.Client, c
3737
startTime: clock.Now(),
3838
kubeClient: client,
3939
EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)),
40-
eventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{
40+
EventWatch: lo.Must(kubernetesInterface.CoreV1().Events("").Watch(ctx, metav1.ListOptions{
4141
FieldSelector: fmt.Sprintf("involvedObject.kind=%s,involvedObject.apiVersion=%s", gvk.Kind, gvk.GroupVersion().String()),
4242
})),
4343
}
@@ -47,21 +47,17 @@ func (c *Controller[T]) Register(ctx context.Context, m manager.Manager) error {
4747
return controllerruntime.NewControllerManagedBy(m).
4848
Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))).
4949
WatchesRawSource(singleton.Source()).
50-
Complete(singleton.AsReconciler(c))
50+
Complete(singleton.AsChannelObjectReconciler(c.EventWatch.ResultChan(), c))
5151
}
5252

53-
func (c *Controller[T]) Reconcile(ctx context.Context) (reconcile.Result, error) {
54-
for e := range c.eventWatch.ResultChan() {
55-
event := e.Object.(*v1.Event)
56-
57-
// We check if the event was created in the lifetime of this controller
58-
// since we don't duplicate metrics on controller restart or lease handover
59-
if c.startTime.Before(event.LastTimestamp.Time) {
60-
c.EventCount.Inc(map[string]string{
61-
pmetrics.LabelType: event.Type,
62-
pmetrics.LabelReason: event.Reason,
63-
})
64-
}
53+
func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) {
54+
// We check if the event was created in the lifetime of this controller
55+
// since we don't duplicate metrics on controller restart or lease handover
56+
if c.startTime.Before(event.LastTimestamp.Time) {
57+
c.EventCount.Inc(map[string]string{
58+
pmetrics.LabelType: event.Type,
59+
pmetrics.LabelReason: event.Reason,
60+
})
6561
}
6662

6763
return reconcile.Result{}, nil

singleton/controller.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package singleton
22

33
import (
44
"context"
5+
"math"
56
"time"
67

8+
"go.uber.org/multierr"
9+
"k8s.io/apimachinery/pkg/watch"
710
"k8s.io/client-go/util/workqueue"
11+
"sigs.k8s.io/controller-runtime/pkg/client"
812
"sigs.k8s.io/controller-runtime/pkg/event"
913
"sigs.k8s.io/controller-runtime/pkg/handler"
1014
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -27,6 +31,38 @@ func AsReconciler(reconciler Reconciler) reconcile.Reconciler {
2731
})
2832
}
2933

34+
type ChannalObjectReconciler[T client.Object] interface {
35+
Reconcile(ctx context.Context, object T) (reconcile.Result, error)
36+
}
37+
38+
func AsChannelObjectReconciler[T client.Object](watchEvents <-chan watch.Event, reconciler ChannalObjectReconciler[T]) reconcile.Reconciler {
39+
return reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
40+
var errs error
41+
var results []reconcile.Result
42+
for event := range watchEvents {
43+
res, err := reconciler.Reconcile(ctx, event.Object.(T))
44+
errs = multierr.Append(errs, err)
45+
results = append(results, res)
46+
47+
}
48+
49+
var result reconcile.Result
50+
min := time.Duration(math.MaxInt64)
51+
for _, r := range results {
52+
if r.IsZero() {
53+
continue
54+
}
55+
if r.RequeueAfter < min {
56+
min = r.RequeueAfter
57+
result.RequeueAfter = min
58+
result.Requeue = true
59+
}
60+
}
61+
62+
return result, errs
63+
})
64+
}
65+
3066
func Source() source.Source {
3167
eventSource := make(chan event.GenericEvent, 1)
3268
eventSource <- event.GenericEvent{}

0 commit comments

Comments
 (0)