@@ -9,9 +9,13 @@ import (
9
9
pmetrics "github.com/awslabs/operatorpkg/metrics"
10
10
"github.com/awslabs/operatorpkg/object"
11
11
"github.com/awslabs/operatorpkg/singleton"
12
+ "github.com/samber/lo"
13
+ corev1 "k8s.io/api/core/v1"
12
14
v1 "k8s.io/api/core/v1"
15
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
16
"k8s.io/apimachinery/pkg/runtime/schema"
14
17
"k8s.io/apimachinery/pkg/watch"
18
+ "k8s.io/client-go/kubernetes"
15
19
"k8s.io/utils/clock"
16
20
controllerruntime "sigs.k8s.io/controller-runtime"
17
21
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -27,14 +31,17 @@ type Controller[T client.Object] struct {
27
31
EventWatchChannel <- chan watch.Event
28
32
}
29
33
30
- func NewController [T client.Object ](client client.Client , clock clock.Clock , channel <- chan watch. Event ) * Controller [T ] {
34
+ func NewController [T client.Object ](ctx context. Context , client client.Client , clock clock.Clock , kubernetesInterface kubernetes. Interface ) * Controller [T ] {
31
35
gvk := object .GVK (object .New [T ]())
32
36
return & Controller [T ]{
33
- gvk : gvk ,
34
- startTime : clock .Now (),
35
- kubeClient : client ,
36
- EventCount : eventTotalMetric (strings .ToLower (gvk .Kind )),
37
- EventWatchChannel : channel ,
37
+ gvk : gvk ,
38
+ startTime : clock .Now (),
39
+ kubeClient : client ,
40
+ EventCount : eventTotalMetric (strings .ToLower (gvk .Kind )),
41
+ EventWatchChannel : lo .Must (kubernetesInterface .CoreV1 ().Events ("" ).Watch (ctx , metav1.ListOptions {
42
+ // Only reconcile on the object kind we care about
43
+ FieldSelector : fmt .Sprintf ("involvedObject.kind=%s,involvedObject.apiVersion=%s" , object .GVK (& corev1.Node {}).Kind , object .GVK (& corev1.Node {}).GroupVersion ().String ()),
44
+ })).ResultChan (),
38
45
}
39
46
}
40
47
0 commit comments