Skip to content

Commit 1592d5b

Browse files
committed
Re-check suspend status on start
1 parent 02cb83c commit 1592d5b

File tree

4 files changed

+134
-70
lines changed

4 files changed

+134
-70
lines changed

internal/auditlog/tail.go

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ func Tail(ctx context.Context, projectID, clusterName string, cb func(*audit.Aud
5050
}
5151

5252
func read(ctx context.Context, stream loggingpb.LoggingServiceV2_TailLogEntriesClient, cb func(*audit.AuditLog) error) error {
53-
slog.Info("reading logs")
54-
5553
for {
5654
select {
5755
case <-ctx.Done():
@@ -64,28 +62,29 @@ func read(ctx context.Context, stream loggingpb.LoggingServiceV2_TailLogEntriesC
6462
case err != nil:
6563
return fmt.Errorf("stream receive failed: %w", err)
6664
default:
67-
for _, entry := range resp.GetEntries() {
68-
payload := entry.GetProtoPayload()
69-
if payload == nil {
70-
slog.Warn("unexpected payload type")
71-
continue
72-
}
65+
}
66+
67+
for _, entry := range resp.GetEntries() {
68+
payload := entry.GetProtoPayload()
69+
if payload == nil {
70+
slog.Warn("unexpected payload type")
71+
continue
72+
}
7373

74-
msg, err := payload.UnmarshalNew()
75-
if err != nil {
76-
slog.Warn("failed to unmarshal payload", slog.Any("error", err))
77-
continue
78-
}
74+
msg, err := payload.UnmarshalNew()
75+
if err != nil {
76+
slog.Warn("failed to unmarshal payload", slog.Any("error", err))
77+
continue
78+
}
7979

80-
auditLog, ok := msg.(*audit.AuditLog)
81-
if !ok {
82-
slog.Warn("unexpected payload type", slog.Any("type", fmt.Sprintf("%t", msg)))
83-
continue
84-
}
80+
auditLog, ok := msg.(*audit.AuditLog)
81+
if !ok {
82+
slog.Warn("unexpected payload type", slog.Any("type", fmt.Sprintf("%t", msg)))
83+
continue
84+
}
8585

86-
if err = cb(auditLog); err != nil {
87-
return fmt.Errorf("callback failed: %w", err)
88-
}
86+
if err = cb(auditLog); err != nil {
87+
return fmt.Errorf("callback failed: %w", err)
8988
}
9089
}
9190
}

internal/datastore/badgerdb.go

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package datastore
22

33
import (
4+
"encoding/json"
45
"errors"
56
"fmt"
7+
"time"
68

79
"github.com/dgraph-io/badger/v4"
810

@@ -15,6 +17,13 @@ type Store struct {
1517
db *badger.DB
1618
}
1719

20+
type Entry struct {
21+
Resource k8s.Resource `json:"resource"`
22+
Suspended bool `json:"suspended"`
23+
UpdatedBy string `json:"updatedBy"`
24+
UpdatedAt time.Time `json:"updatedAt"`
25+
}
26+
1827
func NewBadgerStore(path string) (*Store, error) {
1928
if path == "" {
2029
return nil, errors.New("badger store path cannot be empty")
@@ -28,8 +37,8 @@ func NewBadgerStore(path string) (*Store, error) {
2837
}, nil
2938
}
3039

31-
func (s *Store) IsSuspended(resource k8s.Resource) (bool, error) {
32-
var suspended bool
40+
func (s *Store) GetEntry(resource k8s.Resource) (Entry, error) {
41+
var entry Entry
3342
err := s.db.View(func(txn *badger.Txn) error {
3443
item, err := txn.Get(buildKey(resource))
3544
if err != nil {
@@ -42,29 +51,52 @@ func (s *Store) IsSuspended(resource k8s.Resource) (bool, error) {
4251
if err != nil {
4352
return fmt.Errorf("failed to get value: %w", err)
4453
}
45-
if len(val) != 1 {
46-
return fmt.Errorf("expected value length of 1, got %d", len(val))
54+
if err = json.Unmarshal(val, &entry); err != nil {
55+
return fmt.Errorf("failed to unmarshal entry: %w", err)
4756
}
48-
suspended = val[0] == 1
4957
return nil
5058
})
51-
return suspended, err
59+
return entry, err
5260
}
5361

54-
func (s *Store) SetSuspended(resource k8s.Resource, suspended bool) error {
62+
func (s *Store) SaveEntry(entry Entry) error {
5563
return s.db.Update(func(txn *badger.Txn) error {
56-
var b byte
57-
if suspended {
58-
b = 1
64+
data, err := json.Marshal(entry)
65+
if err != nil {
66+
return fmt.Errorf("failed ot marshal entry: %w", err)
67+
}
68+
return txn.Set(buildKey(entry.Resource), data)
69+
})
70+
}
71+
72+
func (s *Store) AllEntries() ([]Entry, error) {
73+
entries := make([]Entry, 0)
74+
err := s.db.View(func(txn *badger.Txn) error {
75+
it := txn.NewIterator(badger.DefaultIteratorOptions)
76+
defer it.Close()
77+
78+
prefix := []byte("resource:")
79+
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
80+
item := it.Item()
81+
val, err := item.ValueCopy(nil)
82+
if err != nil {
83+
return fmt.Errorf("failed to get value: %w", err)
84+
}
85+
var entry Entry
86+
if err = json.Unmarshal(val, &entry); err != nil {
87+
return fmt.Errorf("failed to unmarshal entry: %w", err)
88+
}
89+
entries = append(entries, entry)
5990
}
60-
return txn.Set(buildKey(resource), []byte{b})
91+
return nil
6192
})
93+
return entries, err
6294
}
6395

6496
func (s *Store) Close() error {
6597
return s.db.Close()
6698
}
6799

68100
func buildKey(resource k8s.Resource) []byte {
69-
return []byte(fmt.Sprintf("%s:%s:%s", resource.Kind, resource.Namespace, resource.Name))
101+
return []byte(fmt.Sprintf("resource:%s:%s:%s", resource.Kind, resource.Namespace, resource.Name))
70102
}

internal/k8s/resource.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ type Resource struct {
99
Namespace string
1010
Kind string
1111
Name string
12+
Path string
1213
}
1314

1415
func ResourceFromPath(path string) (Resource, error) {
@@ -20,5 +21,6 @@ func ResourceFromPath(path string) (Resource, error) {
2021
Namespace: parts[3],
2122
Kind: strings.TrimSuffix(parts[4], "s"),
2223
Name: parts[5],
24+
Path: path,
2325
}, nil
2426
}

internal/watch/watcher.go

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"time"
89

910
"google.golang.org/genproto/googleapis/cloud/audit"
1011

@@ -43,15 +44,31 @@ type k8sClient interface {
4344
}
4445

4546
type store interface {
46-
IsSuspended(resource k8s.Resource) (bool, error)
47-
SetSuspended(resource k8s.Resource, suspended bool) error
47+
GetEntry(k8s.Resource) (datastore.Entry, error)
48+
SaveEntry(datastore.Entry) error
49+
AllEntries() ([]datastore.Entry, error)
4850
}
4951

5052
type notifier interface {
5153
Notify(context.Context, notification.Notification) error
5254
}
5355

5456
func (w *Watcher) Watch(ctx context.Context) error {
57+
// Re-check resource suspension states, in case any have been modified since the process was last running
58+
slog.Info("re-checking resource suspension states")
59+
entries, err := w.store.AllEntries()
60+
if err != nil {
61+
return fmt.Errorf("failed to fetch all entries: %w", err)
62+
}
63+
for _, entry := range entries {
64+
if err = w.checkSuspensionStatus(ctx, entry.Resource, "<unknown>"); err != nil {
65+
// We don't return an error here, as CRD versions are liable to change as fluxcd is upgraded
66+
slog.Warn("failed to re-check suspension status", slog.Any("error", err))
67+
}
68+
}
69+
70+
// Watch for new modifications
71+
slog.Info("watching for resource modifications")
5572
return auditlog.Tail(ctx, w.googleCloudProjectID, w.gkeClusterName, func(logEntry *audit.AuditLog) error {
5673
if code := logEntry.GetStatus().GetCode(); code != 0 {
5774
slog.Warn("operation appeared to fail", slog.Int("code", int(code)))
@@ -66,48 +83,62 @@ func (w *Watcher) Watch(ctx context.Context) error {
6683
return err
6784
}
6885

69-
res, err := w.k8sClient.GetRawResource(ctx, resourceName)
70-
if err != nil {
71-
return fmt.Errorf("failed to get raw resource: %w", err)
86+
if err = w.checkSuspensionStatus(ctx, resource, email); err != nil {
87+
return fmt.Errorf("failed to re-check suspension status: %w", err)
7288
}
7389

74-
spec, ok := res["spec"].(map[string]any)
75-
if !ok {
76-
return errors.New("unexpected response payload")
77-
}
78-
isSuspended, _ := spec["suspend"].(bool)
90+
return nil
91+
})
92+
}
7993

80-
var modified bool
81-
wasSuspended, err := w.store.IsSuspended(resource)
82-
if err != nil {
83-
if !errors.Is(err, datastore.ErrNotFound) {
84-
return err
85-
}
86-
modified = true
87-
} else {
88-
modified = isSuspended != wasSuspended
89-
}
94+
func (w *Watcher) checkSuspensionStatus(ctx context.Context, resource k8s.Resource, updatedBy string) error {
95+
res, err := w.k8sClient.GetRawResource(ctx, resource.Path)
96+
if err != nil {
97+
return fmt.Errorf("failed to get raw resource: %w", err)
98+
}
9099

91-
if !modified {
92-
return nil // Probably something else about the resource modified
93-
}
100+
spec, ok := res["spec"].(map[string]any)
101+
if !ok {
102+
return errors.New("unexpected response payload")
103+
}
104+
suspended, _ := spec["suspend"].(bool)
94105

95-
if err = w.store.SetSuspended(resource, isSuspended); err != nil {
106+
var updated bool
107+
entry, err := w.store.GetEntry(resource)
108+
if err != nil {
109+
if !errors.Is(err, datastore.ErrNotFound) {
96110
return err
97111
}
112+
updated = true
113+
entry = datastore.Entry{}
114+
} else {
115+
updated = suspended != entry.Suspended
116+
}
117+
118+
if !updated {
119+
return nil // Probably something else about the resource modified
120+
}
121+
122+
entry.Resource = resource
123+
entry.Suspended = suspended
124+
entry.UpdatedBy = updatedBy
125+
entry.UpdatedAt = time.Now().UTC()
126+
127+
if err = w.store.SaveEntry(entry); err != nil {
128+
return err
129+
}
98130

99-
slog.Info(
100-
"suspension status modified",
101-
slog.String("resource", resourceName),
102-
slog.String("user", email),
103-
slog.Bool("suspended", isSuspended),
104-
)
105-
106-
return w.notifier.Notify(ctx, notification.Notification{
107-
Resource: resource,
108-
Suspended: isSuspended,
109-
Email: email,
110-
GoogleCloudProjectID: w.googleCloudProjectID,
111-
})
131+
slog.Info(
132+
"suspension status updated",
133+
slog.String("resource", resource.Path),
134+
slog.String("user", updatedBy),
135+
slog.Bool("suspended", suspended),
136+
)
137+
138+
return w.notifier.Notify(ctx, notification.Notification{
139+
Resource: resource,
140+
Suspended: suspended,
141+
Email: updatedBy,
142+
GoogleCloudProjectID: w.googleCloudProjectID,
112143
})
113144
}

0 commit comments

Comments
 (0)