|
6 | 6 | "errors"
|
7 | 7 | "fmt"
|
8 | 8 | "log/slog"
|
| 9 | + "slices" |
9 | 10 | "time"
|
10 | 11 |
|
11 |
| - "golang.org/x/exp/maps" |
12 | 12 | "google.golang.org/genproto/googleapis/cloud/audit"
|
13 | 13 | v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
14 | 14 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
@@ -80,26 +80,35 @@ func (w *Watcher) resolveFluxResourceTypes(ctx context.Context) ([]k8s.ResourceT
|
80 | 80 | return nil, fmt.Errorf("failed to fetch crds: %w", err)
|
81 | 81 | }
|
82 | 82 |
|
83 |
| - uniqueResourceTypes := make(map[string]k8s.ResourceType) |
| 83 | + types := make([]k8s.ResourceType, 0, len(crds.Items)) |
84 | 84 | for _, crd := range crds.Items {
|
85 | 85 | for _, version := range crd.Spec.Versions {
|
| 86 | + // We're only interested in resources that can be suspended |
86 | 87 | if _, exists := version.Schema.OpenAPIV3Schema.Properties["spec"].Properties["suspend"]; !exists {
|
87 | 88 | continue
|
88 | 89 | }
|
89 |
| - key := fmt.Sprintf("%s:%s", crd.Spec.Group, crd.Spec.Names.Plural) |
90 |
| - uniqueResourceTypes[key] = k8s.ResourceType{ |
| 90 | + types = append(types, k8s.ResourceType{ |
91 | 91 | Group: crd.Spec.Group,
|
92 | 92 | Version: version.Name,
|
93 | 93 | Kind: crd.Spec.Names.Plural,
|
94 |
| - } |
| 94 | + }) |
95 | 95 | }
|
96 | 96 | }
|
97 |
| - return maps.Values(uniqueResourceTypes), nil |
| 97 | + return types, nil |
98 | 98 | }
|
99 | 99 |
|
100 | 100 | func (w *Watcher) init(ctx context.Context, types []k8s.ResourceType) error {
|
101 | 101 | slog.Info("initializing")
|
| 102 | + seen := make(map[string]struct{}) |
102 | 103 | for _, t := range types {
|
| 104 | + // We only need to fetch against one version per group+kind |
| 105 | + key := fmt.Sprintf("%s:%s", t.Group, t.Kind) |
| 106 | + if _, ok := seen[key]; ok { |
| 107 | + continue |
| 108 | + } |
| 109 | + seen[key] = struct{}{} |
| 110 | + |
| 111 | + // Fetch raw fluxcd resource for this specific type |
103 | 112 | res, err := w.k8sClient.GetRawResources(ctx, t)
|
104 | 113 | if err != nil {
|
105 | 114 | return err
|
@@ -139,7 +148,7 @@ func (w *Watcher) watch(ctx context.Context, types []k8s.ResourceType) error {
|
139 | 148 | return err
|
140 | 149 | }
|
141 | 150 |
|
142 |
| - if !isWatchedResource(resourceRef, types) { |
| 151 | + if !slices.Contains(types, resourceRef.Type) { |
143 | 152 | slog.Info("ignoring non-watched resource", slog.String("kind", resourceRef.Type.Kind))
|
144 | 153 | return nil
|
145 | 154 | }
|
@@ -179,15 +188,12 @@ func (w *Watcher) processResource(
|
179 | 188 | slog.String("resource", resourceRef.Name),
|
180 | 189 | slog.Bool("suspended", resource.Spec.Suspend),
|
181 | 190 | )
|
182 |
| - if err = w.store.SaveEntry(datastore.Entry{ |
| 191 | + return w.store.SaveEntry(datastore.Entry{ |
183 | 192 | Resource: resourceRef,
|
184 | 193 | Suspended: resource.Spec.Suspend,
|
185 | 194 | UpdatedBy: updatedBy,
|
186 | 195 | UpdatedAt: time.Now().UTC(),
|
187 |
| - }); err != nil { |
188 |
| - return err |
189 |
| - } |
190 |
| - return nil |
| 196 | + }) |
191 | 197 | }
|
192 | 198 | return fmt.Errorf("failed to fetch entry: %w", err)
|
193 | 199 | }
|
@@ -220,12 +226,3 @@ func (w *Watcher) processResource(
|
220 | 226 | GoogleCloudProjectID: w.googleCloudProjectID,
|
221 | 227 | })
|
222 | 228 | }
|
223 |
| - |
224 |
| -func isWatchedResource(ref k8s.ResourceReference, types []k8s.ResourceType) bool { |
225 |
| - for _, t := range types { |
226 |
| - if t.Group == ref.Type.Group && t.Kind == ref.Type.Kind { |
227 |
| - return true |
228 |
| - } |
229 |
| - } |
230 |
| - return false |
231 |
| -} |
0 commit comments