@@ -19,6 +19,8 @@ package flinkcluster
19
19
import (
20
20
"context"
21
21
"fmt"
22
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23
+ "k8s.io/client-go/tools/record"
22
24
"strings"
23
25
"time"
24
26
@@ -49,6 +51,7 @@ type ClusterStateObserver struct {
49
51
context context.Context
50
52
log logr.Logger
51
53
history history.Interface
54
+ recorder record.EventRecorder
52
55
}
53
56
54
57
// ObservedClusterState holds observed state of a cluster.
@@ -141,6 +144,7 @@ func (observer *ClusterStateObserver) observe(
141
144
return err
142
145
}
143
146
log .Info ("Observed cluster" , "cluster" , "nil" )
147
+ observer .sendDeletedEvent ()
144
148
observedCluster = nil
145
149
} else {
146
150
log .Info ("Observed cluster" , "cluster" , * observedCluster )
@@ -312,6 +316,24 @@ func (observer *ClusterStateObserver) observe(
312
316
return nil
313
317
}
314
318
319
+ func (observer * ClusterStateObserver ) sendDeletedEvent () {
320
+ var eventCluster = & v1beta1.FlinkCluster {
321
+ TypeMeta : metav1.TypeMeta {
322
+ Kind : "FlinkCluster" ,
323
+ APIVersion : "flinkoperator.k8s.io/v1beta1" ,
324
+ },
325
+ ObjectMeta : metav1.ObjectMeta {
326
+ Name : observer .request .Name ,
327
+ Namespace : observer .request .Namespace ,
328
+ },
329
+ }
330
+ observer .recorder .Event (
331
+ eventCluster ,
332
+ "Normal" ,
333
+ "StatusUpdate" ,
334
+ fmt .Sprintf ("Cluster status: Deleted" ))
335
+ }
336
+
315
337
func (observer * ClusterStateObserver ) observeJob (
316
338
observed * ObservedClusterState ) error {
317
339
// Either the cluster has been deleted or it is a session cluster.
0 commit comments