@@ -2,6 +2,7 @@ package watch
2
2
3
3
import (
4
4
"context"
5
+ "encoding/json"
5
6
"errors"
6
7
"fmt"
7
8
"log/slog"
@@ -14,6 +15,7 @@ import (
14
15
15
16
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/auditlog"
16
17
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/datastore"
18
+ "github.com/e-flux-platform/fluxcd-suspend-notifier/internal/fluxcd"
17
19
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/k8s"
18
20
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/notification"
19
21
)
@@ -43,13 +45,13 @@ func NewWatcher(
43
45
}
44
46
45
47
type k8sClient interface {
46
- GetRawResource (ctx context.Context , resource k8s.Resource ) (map [ string ] any , error )
47
- GetRawResources (ctx context.Context , group k8s.ResourceType ) (map [ string ] any , error )
48
+ GetRawResource (ctx context.Context , resource k8s.ResourceReference ) ([] byte , error )
49
+ GetRawResources (ctx context.Context , group k8s.ResourceType ) ([] byte , error )
48
50
GetCustomResourceDefinitions (ctx context.Context , listOptions metav1.ListOptions ) (* v1.CustomResourceDefinitionList , error )
49
51
}
50
52
51
53
type store interface {
52
- GetEntry (k8s.Resource ) (datastore.Entry , error )
54
+ GetEntry (k8s.ResourceReference ) (datastore.Entry , error )
53
55
SaveEntry (datastore.Entry ) error
54
56
}
55
57
@@ -90,33 +92,21 @@ func (w *Watcher) Watch(ctx context.Context) error {
90
92
func (w * Watcher ) init (ctx context.Context , groups []k8s.ResourceType ) error {
91
93
slog .Info ("initializing" )
92
94
for _ , group := range groups {
93
- resources , err := w .k8sClient .GetRawResources (ctx , group )
95
+ res , err := w .k8sClient .GetRawResources (ctx , group )
94
96
if err != nil {
95
97
return err
96
98
}
97
- items , ok := resources [ "items" ].([] any )
98
- if ! ok {
99
- return errors . New ( "expected items to be set" )
99
+ var resourceList fluxcd. ResourceList
100
+ if err = json . Unmarshal ( res , & resourceList ); err != nil {
101
+ return fmt . Errorf ( "failed to unmarshal resource: %w" , err )
100
102
}
101
- for _ , i := range items {
102
- item , ok := i .(map [string ]any )
103
- if ! ok {
104
- return errors .New ("invalid item" )
105
- }
106
- spec , ok := item ["spec" ].(map [string ]any )
107
- if ! ok {
108
- return errors .New ("invalid spec" )
109
- }
110
- metadata , ok := item ["metadata" ].(map [string ]any )
111
- if ! ok {
112
- return errors .New ("invalid metadata" )
113
- }
114
- resource := k8s.Resource {
103
+ for _ , resource := range resourceList .Items {
104
+ resourceRef := k8s.ResourceReference {
115
105
Type : group ,
116
- Namespace : metadata [ "namespace" ].( string ) ,
117
- Name : metadata [ "name" ].( string ) ,
106
+ Namespace : resource . Metadata . Namespace ,
107
+ Name : resource . Metadata . Name ,
118
108
}
119
- if err = w .processResource (ctx , resource , spec , "<unknown>" ); err != nil {
109
+ if err = w .processResource (ctx , resourceRef , resource , "<unknown>" ); err != nil {
120
110
return fmt .Errorf ("failed to process resource: %w" , err )
121
111
}
122
112
}
@@ -140,22 +130,22 @@ func (w *Watcher) watch(ctx context.Context, groups []k8s.ResourceType) error {
140
130
resourceName := logEntry .GetResourceName ()
141
131
email := logEntry .GetAuthenticationInfo ().GetPrincipalEmail ()
142
132
143
- resource , err := k8s .ResourceFromPath (resourceName )
133
+ resourceRef , err := k8s .ResourceReferenceFromPath (resourceName )
144
134
if err != nil {
145
135
return err
146
136
}
147
137
148
- res , err := w .k8sClient .GetRawResource (ctx , resource )
138
+ res , err := w .k8sClient .GetRawResource (ctx , resourceRef )
149
139
if err != nil {
150
140
return fmt .Errorf ("failed to get raw resource: %w" , err )
151
141
}
152
142
153
- spec , ok := res [ "spec" ].( map [ string ] any )
154
- if ! ok {
155
- return errors . New ( "unexpected response payload" )
143
+ var resource fluxcd. Resource
144
+ if err = json . Unmarshal ( res , & resource ); err != nil {
145
+ return fmt . Errorf ( "failed to unmarshal resource: %w" , err )
156
146
}
157
147
158
- if err = w .processResource (ctx , resource , spec , email ); err != nil {
148
+ if err = w .processResource (ctx , resourceRef , resource , email ); err != nil {
159
149
return fmt .Errorf ("failed to re-check suspension status: %w" , err )
160
150
}
161
151
@@ -165,26 +155,24 @@ func (w *Watcher) watch(ctx context.Context, groups []k8s.ResourceType) error {
165
155
166
156
func (w * Watcher ) processResource (
167
157
ctx context.Context ,
168
- resource k8s.Resource ,
169
- spec map [ string ] any ,
158
+ resourceRef k8s.ResourceReference ,
159
+ resource fluxcd. Resource ,
170
160
updatedBy string ,
171
161
) error {
172
- suspended , _ := spec ["suspend" ].(bool )
173
-
174
- entry , err := w .store .GetEntry (resource )
162
+ entry , err := w .store .GetEntry (resourceRef )
175
163
if err != nil {
176
164
if errors .Is (err , datastore .ErrNotFound ) {
177
165
// First time seeing the resource, so we'll save the state, but not notify - as we don't know what has
178
166
// changed
179
167
slog .Info (
180
168
"new resource discovered" ,
181
- slog .String ("kind" , resource .Type .Kind ),
182
- slog .String ("resource" , resource .Name ),
183
- slog .Bool ("suspended" , suspended ),
169
+ slog .String ("kind" , resourceRef .Type .Kind ),
170
+ slog .String ("resource" , resourceRef .Name ),
171
+ slog .Bool ("suspended" , resource . Spec . Suspend ),
184
172
)
185
173
if err = w .store .SaveEntry (datastore.Entry {
186
- Resource : resource ,
187
- Suspended : suspended ,
174
+ Resource : resourceRef ,
175
+ Suspended : resource . Spec . Suspend ,
188
176
UpdatedBy : updatedBy ,
189
177
UpdatedAt : time .Now ().UTC (),
190
178
}); err != nil {
@@ -195,31 +183,31 @@ func (w *Watcher) processResource(
195
183
return fmt .Errorf ("failed to fetch entry: %w" , err )
196
184
}
197
185
198
- if suspended == entry .Suspended {
186
+ if resource . Spec . Suspend == entry .Suspended {
199
187
return nil // Probably something else about the resource modified
200
188
}
201
189
202
- entry .Resource = resource
203
- entry .Suspended = suspended
190
+ slog .Info (
191
+ "suspension status updated" ,
192
+ slog .String ("kind" , resourceRef .Type .Kind ),
193
+ slog .String ("resourceRef" , resourceRef .Name ),
194
+ slog .String ("user" , updatedBy ),
195
+ slog .Bool ("suspended" , resource .Spec .Suspend ),
196
+ )
197
+
198
+ entry .Resource = resourceRef
199
+ entry .Suspended = resource .Spec .Suspend
204
200
entry .UpdatedBy = updatedBy
205
201
entry .UpdatedAt = time .Now ().UTC ()
206
202
207
203
if err = w .store .SaveEntry (entry ); err != nil {
208
204
return err
209
205
}
210
206
211
- slog .Info (
212
- "suspension status updated" ,
213
- slog .String ("kind" , resource .Type .Kind ),
214
- slog .String ("resource" , resource .Name ),
215
- slog .String ("user" , updatedBy ),
216
- slog .Bool ("suspended" , suspended ),
217
- )
218
-
219
207
return w .notifier .Notify (ctx , notification.Notification {
220
- Resource : resource ,
221
- Suspended : suspended ,
222
- Email : updatedBy ,
208
+ Resource : entry . Resource ,
209
+ Suspended : entry . Suspended ,
210
+ Email : entry . UpdatedBy ,
223
211
GoogleCloudProjectID : w .googleCloudProjectID ,
224
212
})
225
213
}
0 commit comments