Skip to content

Commit 0dc6003

Browse files
committed
Shift resource type filtering into watcher, instead of log query filter
1 parent aea2235 commit 0dc6003

File tree

2 files changed

+41
-36
lines changed

2 files changed

+41
-36
lines changed

internal/auditlog/tail.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,7 @@ import (
1616
"google.golang.org/grpc/status"
1717
)
1818

19-
func Tail(
20-
ctx context.Context,
21-
projectID string,
22-
clusterName string,
23-
resourceKinds []string,
24-
cb func(*audit.AuditLog) error,
25-
) error {
19+
func Tail(ctx context.Context, projectID string, clusterName string, cb func(*audit.AuditLog) error) error {
2620
client, err := logging.NewClient(ctx)
2721
if err != nil {
2822
return fmt.Errorf("failed to create client: %w", err)
@@ -43,7 +37,7 @@ func Tail(
4337
return fmt.Errorf("limit wait failed: %w", err)
4438
}
4539

46-
if err = tailLogs(ctx, client, projectID, clusterName, resourceKinds, cb); err != nil {
40+
if err = tailLogs(ctx, client, projectID, clusterName, cb); err != nil {
4741
if _, ok := status.FromError(err); ok {
4842
slog.Warn("gRPC request terminated, restarting", slog.Any("error", err))
4943
continue
@@ -53,14 +47,7 @@ func Tail(
5347
}
5448
}
5549

56-
func tailLogs(
57-
ctx context.Context,
58-
client *logging.Client,
59-
projectID string,
60-
clusterName string,
61-
resourceKinds []string,
62-
cb func(*audit.AuditLog) error,
63-
) error {
50+
func tailLogs(ctx context.Context, client *logging.Client, projectID, clusterName string, cb func(*audit.AuditLog) error) error {
6451
stream, err := client.TailLogEntries(ctx)
6552
if err != nil {
6653
return fmt.Errorf("request to tail log entries failed: %w", err)
@@ -77,7 +64,7 @@ func tailLogs(
7764
fmt.Sprintf(`log_name="projects/%s/logs/cloudaudit.googleapis.com%%2Factivity"`, projectID),
7865
fmt.Sprintf(`resource.labels.cluster_name="%s"`, clusterName),
7966
`protoPayload."@type"="type.googleapis.com/google.cloud.audit.AuditLog"`,
80-
fmt.Sprintf(`protoPayload.methodName=~"io\.fluxcd\.toolkit\..*\.(%s)\.patch"`, strings.Join(resourceKinds, "|")),
67+
`protoPayload.methodName=~"io\.fluxcd\.toolkit\..*\.(patch|create)"`,
8168
`-protoPayload.authenticationInfo.principalEmail=~"system:.*"`,
8269
},
8370
" AND ",

internal/watch/watcher.go

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,39 +60,47 @@ type notifier interface {
6060
}
6161

6262
func (w *Watcher) Watch(ctx context.Context) error {
63+
resourceTypes, err := w.resolveFluxResourceTypes(ctx)
64+
if err != nil {
65+
return fmt.Errorf("could not resolve flux resource types: %w", err)
66+
}
67+
68+
if err = w.init(ctx, resourceTypes); err != nil {
69+
return fmt.Errorf("failed to initialize: %w", err)
70+
}
71+
72+
return w.watch(ctx, resourceTypes)
73+
}
74+
75+
func (w *Watcher) resolveFluxResourceTypes(ctx context.Context) ([]k8s.ResourceType, error) {
6376
crds, err := w.k8sClient.GetCustomResourceDefinitions(ctx, metav1.ListOptions{
6477
LabelSelector: "app.kubernetes.io/part-of=flux",
6578
})
6679
if err != nil {
67-
return err
80+
return nil, fmt.Errorf("failed to fetch crds: %w", err)
6881
}
6982

70-
uniqueResourceGroups := make(map[string]k8s.ResourceType)
83+
uniqueResourceTypes := make(map[string]k8s.ResourceType)
7184
for _, crd := range crds.Items {
7285
for _, version := range crd.Spec.Versions {
7386
if _, exists := version.Schema.OpenAPIV3Schema.Properties["spec"].Properties["suspend"]; !exists {
7487
continue
7588
}
7689
key := fmt.Sprintf("%s:%s", crd.Spec.Group, crd.Spec.Names.Plural)
77-
uniqueResourceGroups[key] = k8s.ResourceType{
90+
uniqueResourceTypes[key] = k8s.ResourceType{
7891
Group: crd.Spec.Group,
7992
Version: version.Name,
8093
Kind: crd.Spec.Names.Plural,
8194
}
8295
}
8396
}
84-
resourceGroups := maps.Values(uniqueResourceGroups)
85-
86-
if err = w.init(ctx, resourceGroups); err != nil {
87-
return fmt.Errorf("failed to initialize: %w", err)
88-
}
89-
return w.watch(ctx, resourceGroups)
97+
return maps.Values(uniqueResourceTypes), nil
9098
}
9199

92-
func (w *Watcher) init(ctx context.Context, groups []k8s.ResourceType) error {
100+
func (w *Watcher) init(ctx context.Context, types []k8s.ResourceType) error {
93101
slog.Info("initializing")
94-
for _, group := range groups {
95-
res, err := w.k8sClient.GetRawResources(ctx, group)
102+
for _, t := range types {
103+
res, err := w.k8sClient.GetRawResources(ctx, t)
96104
if err != nil {
97105
return err
98106
}
@@ -102,7 +110,7 @@ func (w *Watcher) init(ctx context.Context, groups []k8s.ResourceType) error {
102110
}
103111
for _, resource := range resourceList.Items {
104112
resourceRef := k8s.ResourceReference{
105-
Type: group,
113+
Type: t,
106114
Namespace: resource.Metadata.Namespace,
107115
Name: resource.Metadata.Name,
108116
}
@@ -114,14 +122,10 @@ func (w *Watcher) init(ctx context.Context, groups []k8s.ResourceType) error {
114122
return nil
115123
}
116124

117-
func (w *Watcher) watch(ctx context.Context, groups []k8s.ResourceType) error {
125+
func (w *Watcher) watch(ctx context.Context, types []k8s.ResourceType) error {
118126
slog.Info("watching for resource modifications")
119-
resourceKinds := make([]string, 0, len(groups))
120-
for _, group := range groups {
121-
resourceKinds = append(resourceKinds, group.Kind)
122-
}
123127

124-
return auditlog.Tail(ctx, w.googleCloudProjectID, w.gkeClusterName, resourceKinds, func(logEntry *audit.AuditLog) error {
128+
return auditlog.Tail(ctx, w.googleCloudProjectID, w.gkeClusterName, func(logEntry *audit.AuditLog) error {
125129
if code := logEntry.GetStatus().GetCode(); code != 0 {
126130
slog.Warn("operation appeared to fail", slog.Int("code", int(code)))
127131
return nil
@@ -135,6 +139,11 @@ func (w *Watcher) watch(ctx context.Context, groups []k8s.ResourceType) error {
135139
return err
136140
}
137141

142+
if !isWatchedResource(resourceRef, types) {
143+
slog.Info("ignoring non-watched resource", slog.String("kind", resourceRef.Type.Kind))
144+
return nil
145+
}
146+
138147
res, err := w.k8sClient.GetRawResource(ctx, resourceRef)
139148
if err != nil {
140149
return fmt.Errorf("failed to get raw resource: %w", err)
@@ -211,3 +220,12 @@ func (w *Watcher) processResource(
211220
GoogleCloudProjectID: w.googleCloudProjectID,
212221
})
213222
}
223+
224+
func isWatchedResource(ref k8s.ResourceReference, types []k8s.ResourceType) bool {
225+
for _, t := range types {
226+
if t.Group == ref.Type.Group && t.Kind == ref.Type.Kind {
227+
return true
228+
}
229+
}
230+
return false
231+
}

0 commit comments

Comments
 (0)