Skip to content

Commit a84e1d1

Browse files
authored
Add a controller that will watch events and Fire Metrics (#91)
1 parent be645b3 commit a84e1d1

File tree

11 files changed

+591
-349
lines changed

11 files changed

+591
-349
lines changed

events/controller.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"time"
8+
9+
pmetrics "github.com/awslabs/operatorpkg/metrics"
10+
"github.com/awslabs/operatorpkg/object"
11+
v1 "k8s.io/api/core/v1"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
13+
"k8s.io/utils/clock"
14+
controllerruntime "sigs.k8s.io/controller-runtime"
15+
"sigs.k8s.io/controller-runtime/pkg/builder"
16+
"sigs.k8s.io/controller-runtime/pkg/client"
17+
"sigs.k8s.io/controller-runtime/pkg/controller"
18+
"sigs.k8s.io/controller-runtime/pkg/manager"
19+
"sigs.k8s.io/controller-runtime/pkg/predicate"
20+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
21+
)
22+
23+
type Controller[T client.Object] struct {
24+
gvk schema.GroupVersionKind
25+
startTime time.Time
26+
kubeClient client.Client
27+
EventCount pmetrics.CounterMetric
28+
}
29+
30+
func NewController[T client.Object](client client.Client, clock clock.Clock) *Controller[T] {
31+
gvk := object.GVK(object.New[T]())
32+
return &Controller[T]{
33+
gvk: gvk,
34+
startTime: clock.Now(),
35+
kubeClient: client,
36+
EventCount: eventTotalMetric(strings.ToLower(gvk.Kind)),
37+
}
38+
}
39+
40+
func (c *Controller[T]) Register(_ context.Context, m manager.Manager) error {
41+
return controllerruntime.NewControllerManagedBy(m).
42+
For(&v1.Event{}, builder.WithPredicates(predicate.NewTypedPredicateFuncs(func(o client.Object) bool {
43+
// Only reconcile on the object kind we care about
44+
event := o.(*v1.Event)
45+
return event.InvolvedObject.Kind == c.gvk.Kind && event.InvolvedObject.APIVersion == c.gvk.GroupVersion().String()
46+
}))).
47+
WithOptions(controller.Options{MaxConcurrentReconciles: 10}).
48+
Named(fmt.Sprintf("operatorpkg.%s.events", strings.ToLower(c.gvk.Kind))).
49+
Complete(reconcile.AsReconciler(m.GetClient(), c))
50+
}
51+
52+
func (c *Controller[T]) Reconcile(ctx context.Context, event *v1.Event) (reconcile.Result, error) {
53+
// We check if the event was created in the lifetime of this controller
54+
// since we don't duplicate metrics on controller restart or lease handover
55+
if c.startTime.Before(event.LastTimestamp.Time) {
56+
c.EventCount.Inc(map[string]string{
57+
pmetrics.LabelGroup: c.gvk.Group,
58+
pmetrics.LabelKind: event.InvolvedObject.Kind,
59+
pmetrics.LabelType: event.Type,
60+
pmetrics.LabelReason: event.Reason,
61+
})
62+
}
63+
64+
return reconcile.Result{}, nil
65+
}

events/metrics.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package events
2+
3+
import (
4+
pmetrics "github.com/awslabs/operatorpkg/metrics"
5+
"github.com/prometheus/client_golang/prometheus"
6+
"sigs.k8s.io/controller-runtime/pkg/metrics"
7+
)
8+
9+
func eventTotalMetric(objectName string) pmetrics.CounterMetric {
10+
return pmetrics.NewPrometheusCounter(
11+
metrics.Registry,
12+
prometheus.CounterOpts{
13+
Namespace: pmetrics.Namespace,
14+
Subsystem: objectName,
15+
Name: "event_total",
16+
Help: "The total of events of a given type for an object.",
17+
},
18+
[]string{
19+
pmetrics.LabelGroup,
20+
pmetrics.LabelKind,
21+
pmetrics.LabelType,
22+
pmetrics.LabelReason,
23+
},
24+
)
25+
}

events/suite_test.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package events_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/awslabs/operatorpkg/events"
10+
pmetrics "github.com/awslabs/operatorpkg/metrics"
11+
"github.com/awslabs/operatorpkg/object"
12+
"github.com/awslabs/operatorpkg/test"
13+
. "github.com/awslabs/operatorpkg/test/expectations"
14+
"github.com/onsi/ginkgo/v2"
15+
. "github.com/onsi/ginkgo/v2"
16+
. "github.com/onsi/gomega"
17+
"github.com/samber/lo"
18+
corev1 "k8s.io/api/core/v1"
19+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/runtime"
21+
"k8s.io/apimachinery/pkg/runtime/schema"
22+
"k8s.io/client-go/kubernetes/scheme"
23+
clock "k8s.io/utils/clock/testing"
24+
"sigs.k8s.io/controller-runtime/pkg/client"
25+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
26+
"sigs.k8s.io/controller-runtime/pkg/log"
27+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
28+
)
29+
30+
var (
31+
SchemeBuilder = runtime.NewSchemeBuilder(func(scheme *runtime.Scheme) error {
32+
scheme.AddKnownTypes(schema.GroupVersion{Group: test.APIGroup, Version: "v1alpha1"}, &test.CustomObject{})
33+
return nil
34+
})
35+
)
36+
37+
var ctx context.Context
38+
var fakeClock *clock.FakeClock
39+
var controller *events.Controller[*test.CustomObject]
40+
var kubeClient client.Client
41+
42+
func Test(t *testing.T) {
43+
lo.Must0(SchemeBuilder.AddToScheme(scheme.Scheme))
44+
RegisterFailHandler(Fail)
45+
RunSpecs(t, "Events")
46+
}
47+
48+
var _ = BeforeSuite(func() {
49+
fakeClock = clock.NewFakeClock(time.Now())
50+
kubeClient = fake.NewClientBuilder().WithScheme(scheme.Scheme).WithIndex(&corev1.Event{}, "involvedObject.kind", func(o client.Object) []string {
51+
evt := o.(*corev1.Event)
52+
return []string{evt.InvolvedObject.Kind}
53+
}).Build()
54+
controller = events.NewController[*test.CustomObject](kubeClient, fakeClock)
55+
ctx = log.IntoContext(context.Background(), ginkgo.GinkgoLogr)
56+
})
57+
58+
var _ = Describe("Controller", func() {
59+
BeforeEach(func() {
60+
controller.EventCount.Reset()
61+
})
62+
It("should emit metrics on an event", func() {
63+
events := []*corev1.Event{}
64+
65+
for i := range 5 {
66+
// create an event for custom object
67+
events = append(events, createEvent("test-object", fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i)))
68+
ExpectApplied(ctx, kubeClient, events[i])
69+
70+
// expect an metrics for custom object to be zero, waiting on controller reconcile
71+
Expect(GetMetric("operator_customobject_event_total", conditionLabels(fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i)))).To(BeNil())
72+
73+
// reconcile on the event
74+
_, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(events[i])})
75+
Expect(err).ToNot(HaveOccurred())
76+
77+
// expect an emitted metric to for the event
78+
Expect(GetMetric("operator_customobject_event_total", conditionLabels(fmt.Sprintf("Test-type-%d", i), fmt.Sprintf("Test-reason-%d", i))).GetCounter().GetValue()).To(BeEquivalentTo(1))
79+
}
80+
})
81+
It("should not fire metrics if the last transition was before controller start-up", func() {
82+
// create an event for custom object that was produced before the controller start-up time
83+
event := createEvent("test-name", corev1.EventTypeNormal, "reason")
84+
event.LastTimestamp.Time = time.Now().Add(30 * time.Minute)
85+
ExpectApplied(ctx, kubeClient, event)
86+
87+
// expect an metrics for custom object to be zero, waiting on controller reconcile
88+
Expect(GetMetric("operator_ustomobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason"))).To(BeNil())
89+
90+
// reconcile on the event
91+
_, err := reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
92+
Expect(err).ToNot(HaveOccurred())
93+
94+
// expect not have an emitted metric to for the event
95+
Expect(GetMetric("operator_customobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason")).GetCounter().GetValue()).To(BeEquivalentTo(1))
96+
97+
// create an event for custom object that was produced after the controller start-up time
98+
event.LastTimestamp.Time = time.Now().Add(-30 * time.Minute)
99+
ExpectApplied(ctx, kubeClient, event)
100+
101+
// reconcile on the event
102+
_, err = reconcile.AsReconciler(kubeClient, controller).Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(event)})
103+
Expect(err).ToNot(HaveOccurred())
104+
105+
// expect an emitted metric to for the event
106+
Expect(GetMetric("operator_customobject_event_total", conditionLabels(corev1.EventTypeNormal, "reason")).GetCounter().GetValue()).To(BeEquivalentTo(1))
107+
})
108+
})
109+
110+
func createEvent(name string, eventType string, reason string) *corev1.Event {
111+
return &corev1.Event{
112+
ObjectMeta: metav1.ObjectMeta{
113+
Name: test.RandomName(),
114+
},
115+
InvolvedObject: corev1.ObjectReference{
116+
Namespace: "default",
117+
Name: name,
118+
Kind: object.GVK(&test.CustomObject{}).Kind,
119+
},
120+
LastTimestamp: metav1.Time{Time: time.Now().Add(30 * time.Minute)},
121+
Type: eventType,
122+
Reason: reason,
123+
Count: 5,
124+
}
125+
}
126+
127+
func conditionLabels(eventType string, reason string) map[string]string {
128+
return map[string]string{
129+
pmetrics.LabelType: eventType,
130+
pmetrics.LabelReason: reason,
131+
}
132+
}

metrics/types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
package metrics
22

3+
const (
4+
Namespace = "operator"
5+
LabelGroup = "group"
6+
LabelKind = "kind"
7+
LabelType = "type"
8+
LabelReason = "reason"
9+
)
10+
311
type ObservationMetric interface {
412
Observe(v float64, labels map[string]string)
513
Delete(labels map[string]string)

0 commit comments

Comments
 (0)