Skip to content

Commit b2c66f7

Browse files
committed
Quick tidy up
1 parent 2209f55 commit b2c66f7

File tree

12 files changed

+60
-3
lines changed

12 files changed

+60
-3
lines changed

internal/auditlog/tail.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"google.golang.org/grpc/status"
1717
)
1818

19+
// Tail streams audit log entries relating to fluxcd resources. Only audit log entries relating to non-system users
20+
// patching or creating resources are returned.
1921
func Tail(ctx context.Context, projectID string, clusterName string, cb func(*audit.AuditLog) error) error {
2022
client, err := logging.NewClient(ctx)
2123
if err != nil {

internal/config/config.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package config
22

33
import (
44
"fmt"
5+
"io"
56
"os"
67

78
"gopkg.in/yaml.v3"
89
)
910

11+
// Config is the application configuration
1012
type Config struct {
1113
GoogleCloudProjectID string `yaml:"googleCloudProjectId"`
1214
GKEClusterName string `yaml:"gkeClusterName"`
@@ -20,15 +22,21 @@ type Config struct {
2022
} `yaml:"notification"`
2123
}
2224

23-
func Parse(path string) (Config, error) {
25+
// ParseFile parses configuration from a given file path
26+
func ParseFile(path string) (Config, error) {
2427
f, err := os.Open(path)
2528
if err != nil {
2629
return Config{}, fmt.Errorf("failed to open config file: %w", err)
2730
}
2831
defer f.Close()
2932

33+
return Parse(f)
34+
}
35+
36+
// Parse parses configuration by reading the contents of the supplied reader
37+
func Parse(r io.Reader) (Config, error) {
3038
var config Config
31-
if err = yaml.NewDecoder(f).Decode(&config); err != nil {
39+
if err := yaml.NewDecoder(r).Decode(&config); err != nil {
3240
return Config{}, fmt.Errorf("failed to parse config file: %w", err)
3341
}
3442
return config, nil

internal/datastore/badgerdb.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,24 @@ import (
1111
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/k8s"
1212
)
1313

14+
// ErrNotFound is returned when an entry cannot be found in the underlying store
1415
var ErrNotFound = errors.New("not found")
1516

17+
// Store is a basic badgerdb backed persistence mechanism
1618
type Store struct {
1719
db *badger.DB
1820
}
1921

22+
// Entry represents a single item held by the store. It relates to a single resource reference, and holds information
23+
// about its suspension status.
2024
type Entry struct {
2125
Resource k8s.ResourceReference `json:"resource"`
2226
Suspended bool `json:"suspended"`
2327
UpdatedBy string `json:"updatedBy"`
2428
UpdatedAt time.Time `json:"updatedAt"`
2529
}
2630

31+
// NewBadgerStore instantiates a Store instance. Data will be persisted the directory pointed at by the supplied path.
2732
func NewBadgerStore(path string) (*Store, error) {
2833
if path == "" {
2934
return nil, errors.New("badger store path cannot be empty")
@@ -37,6 +42,7 @@ func NewBadgerStore(path string) (*Store, error) {
3742
}, nil
3843
}
3944

45+
// GetEntry retrieves an entry
4046
func (s *Store) GetEntry(resource k8s.ResourceReference) (Entry, error) {
4147
var entry Entry
4248
err := s.db.View(func(txn *badger.Txn) error {
@@ -59,6 +65,7 @@ func (s *Store) GetEntry(resource k8s.ResourceReference) (Entry, error) {
5965
return entry, err
6066
}
6167

68+
// SaveEntry creates or replaces an entry
6269
func (s *Store) SaveEntry(entry Entry) error {
6370
return s.db.Update(func(txn *badger.Txn) error {
6471
data, err := json.Marshal(entry)
@@ -69,6 +76,7 @@ func (s *Store) SaveEntry(entry Entry) error {
6976
})
7077
}
7178

79+
// Close cleans up any underlying resources
7280
func (s *Store) Close() error {
7381
return s.db.Close()
7482
}

internal/fluxcd/resource.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package fluxcd
22

3+
// Resource represents an abstract suspendable fluxcd resource. Only the fields relevant to this application are
4+
// covered here
35
type Resource struct {
46
Metadata struct {
57
Name string `json:"name"`
@@ -10,6 +12,7 @@ type Resource struct {
1012
} `json:"spec"`
1113
}
1214

15+
// ResourceList represents a list of resources, aligned to how this would be presented by the kubernetes API
1316
type ResourceList struct {
1417
Items []Resource `json:"items"`
1518
}

internal/k8s/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@ import (
1313
"k8s.io/client-go/tools/clientcmd"
1414
)
1515

16+
// Client is a thing wrapper around the kubernetes client. It exposes functions relevant for checking what fluxcd
17+
// resources exist, and what their underlying state look like.
1618
type Client struct {
1719
client *kubernetes.Clientset
1820
apiExtClient *clientset.Clientset
1921
}
2022

23+
// NewClient instantiates and returns a Client
2124
func NewClient(configPath string) (*Client, error) {
2225
var (
2326
config *rest.Config
@@ -48,6 +51,7 @@ func NewClient(configPath string) (*Client, error) {
4851
}, nil
4952
}
5053

54+
// GetRawResource retrieves a raw resource from the kubernetes API. This is used to fetch fluxcd custom resources.
5155
func (c *Client) GetRawResource(ctx context.Context, resource ResourceReference) ([]byte, error) {
5256
absPath := path.Join(
5357
"apis",
@@ -66,6 +70,7 @@ func (c *Client) GetRawResource(ctx context.Context, resource ResourceReference)
6670
return body, nil
6771
}
6872

73+
// GetRawResources retrieves a list of raw resources
6974
func (c *Client) GetRawResources(ctx context.Context, group ResourceType) ([]byte, error) {
7075
absPath := path.Join(
7176
"apis",
@@ -81,6 +86,7 @@ func (c *Client) GetRawResources(ctx context.Context, group ResourceType) ([]byt
8186
return body, nil
8287
}
8388

89+
// GetCustomResourceDefinitions fetches all custom resource definitions registered with the cluster.
8490
func (c *Client) GetCustomResourceDefinitions(ctx context.Context, listOptions metav1.ListOptions) (*v1.CustomResourceDefinitionList, error) {
8591
return c.apiExtClient.
8692
ApiextensionsV1().

internal/k8s/resource.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,21 @@ import (
55
"strings"
66
)
77

8+
// ResourceType represents a kubernetes resource type
89
type ResourceType struct {
910
Group string `json:"group"`
1011
Version string `json:"version"`
1112
Kind string `json:"kind"`
1213
}
1314

15+
// ResourceReference represents a reference to a kubernetes resource instance
1416
type ResourceReference struct {
1517
Type ResourceType `json:"type"`
1618
Namespace string `json:"namespace"`
1719
Name string `json:"name"`
1820
}
1921

22+
// ResourceReferenceFromPath parses an API path to a resource reference
2023
func ResourceReferenceFromPath(path string) (ResourceReference, error) {
2124
parts := strings.Split(path, "/")
2225
if len(parts) != 6 {

internal/notification/filtering.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/expr-lang/expr/vm"
99
)
1010

11+
// NewFilteringNotifier instantiates and returns FilteringNotifier
1112
func NewFilteringNotifier(rawExpr string, delegate Notifier) (*FilteringNotifier, error) {
1213
filter, err := expr.Compile(rawExpr)
1314
if err != nil {
@@ -19,11 +20,13 @@ func NewFilteringNotifier(rawExpr string, delegate Notifier) (*FilteringNotifier
1920
}, nil
2021
}
2122

23+
// FilteringNotifier is a notifier implementation that filters notifications via an expression.
2224
type FilteringNotifier struct {
2325
filter *vm.Program
2426
delegate Notifier
2527
}
2628

29+
// Notify passes the notification to the underlying delegate if the expression is satisfied.
2730
func (fn *FilteringNotifier) Notify(ctx context.Context, notif Notification) error {
2831
env := map[string]interface{}{
2932
"resource": notif.Resource,

internal/notification/multi.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@ package notification
22

33
import "context"
44

5+
// MultiNotifier is used to broadcast notifications
56
type MultiNotifier struct {
67
notifiers []Notifier
78
}
89

10+
// NewMultiNotifier instantiates and returns MultiNotifier
911
func NewMultiNotifier(notifiers []Notifier) *MultiNotifier {
1012
return &MultiNotifier{
1113
notifiers: notifiers,
1214
}
1315
}
1416

17+
// Notify passes the notification to all underlying notifiers
1518
func (an *MultiNotifier) Notify(ctx context.Context, notif Notification) error {
1619
for _, n := range an.notifiers {
1720
if err := n.Notify(ctx, notif); err != nil {

internal/notification/notifier.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ import (
66
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/k8s"
77
)
88

9+
// Notification carries information relevant for dispatching external notifications
910
type Notification struct {
1011
Resource k8s.ResourceReference
1112
Suspended bool
1213
Email string
1314
GoogleCloudProjectID string
1415
}
1516

17+
// Notifier is the interface that is expected to be implemented for notification mechanisms
1618
type Notifier interface {
1719
Notify(context.Context, Notification) error
1820
}

internal/notification/slack.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import (
1111
"time"
1212
)
1313

14+
// SlackNotifier sends notifications to Slack via a webhook
1415
type SlackNotifier struct {
1516
client *http.Client
1617
webhookURL string
1718
}
1819

20+
// NewSlackNotifier instantiates and returns SlackNotifier
1921
func NewSlackNotifier(webhookURL string) (*SlackNotifier, error) {
2022
if webhookURL == "" {
2123
return nil, errors.New("empty webhook url supplied")
@@ -28,10 +30,12 @@ func NewSlackNotifier(webhookURL string) (*SlackNotifier, error) {
2830
}, nil
2931
}
3032

33+
// SlackWebhook is a Slack webhook payload
3134
type SlackWebhook struct {
3235
Attachments []SlackAttachment `json:"attachments,omitempty"`
3336
}
3437

38+
// SlackAttachment forms part of a Slack webhook payload
3539
type SlackAttachment struct {
3640
Color string `json:"color"`
3741
AuthorName string `json:"author_name"`
@@ -40,12 +44,14 @@ type SlackAttachment struct {
4044
Fields []SlackAttachmentField `json:"fields,omitempty"`
4145
}
4246

47+
// SlackAttachmentField forms part of a Slack webhook attachment value
4348
type SlackAttachmentField struct {
4449
Title string `json:"title"`
4550
Value string `json:"value"`
4651
Short bool `json:"short"`
4752
}
4853

54+
// Notify sends a notification via the underlying Slack webhook URL.
4955
func (sn *SlackNotifier) Notify(ctx context.Context, notif Notification) error {
5056
var (
5157
action string

internal/watch/watcher.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"github.com/e-flux-platform/fluxcd-suspend-notifier/internal/notification"
2121
)
2222

23+
// Watcher is used to orchestrate notifications. It discovers fluxcd resources, watches for changes, and notifies when
24+
// the suspension status changes.
2325
type Watcher struct {
2426
googleCloudProjectID string
2527
gkeClusterName string
@@ -28,6 +30,7 @@ type Watcher struct {
2830
notifier notifier
2931
}
3032

33+
// NewWatcher instantiates and returns Watcher
3134
func NewWatcher(
3235
googleCloudProjectID string,
3336
gkeClusterName string,
@@ -59,6 +62,8 @@ type notifier interface {
5962
Notify(context.Context, notification.Notification) error
6063
}
6164

65+
// Watch blocks waiting for fluxcd resource suspension statuses to change. When a change is observed, the notifier
66+
// is invoked.
6267
func (w *Watcher) Watch(ctx context.Context) error {
6368
resourceTypes, err := w.resolveFluxResourceTypes(ctx)
6469
if err != nil {
@@ -72,6 +77,7 @@ func (w *Watcher) Watch(ctx context.Context) error {
7277
return w.watch(ctx, resourceTypes)
7378
}
7479

80+
// resolveFluxResourceTypes returns fluxcd resource types; specifically only those that can be suspended.
7581
func (w *Watcher) resolveFluxResourceTypes(ctx context.Context) ([]k8s.ResourceType, error) {
7682
crds, err := w.k8sClient.GetCustomResourceDefinitions(ctx, metav1.ListOptions{
7783
LabelSelector: "app.kubernetes.io/part-of=flux",
@@ -97,6 +103,9 @@ func (w *Watcher) resolveFluxResourceTypes(ctx context.Context) ([]k8s.ResourceT
97103
return types, nil
98104
}
99105

106+
// init retries the suspension status of all fluxcd resource instances that are a suspendable resource type. This is
107+
// useful when starting from scratch, to build an initial picture. Equally, if the application has been down for a
108+
// period of time, it allows for the state to be synchronised.
100109
func (w *Watcher) init(ctx context.Context, types []k8s.ResourceType) error {
101110
slog.Info("initializing")
102111
seen := make(map[string]struct{})
@@ -131,6 +140,8 @@ func (w *Watcher) init(ctx context.Context, types []k8s.ResourceType) error {
131140
return nil
132141
}
133142

143+
// watch tails audit logs, waiting for modifications to fluxcd resource types that are suspendable. When a modification
144+
// is observed, the resource state is evaluated via processResource
134145
func (w *Watcher) watch(ctx context.Context, types []k8s.ResourceType) error {
135146
slog.Info("watching for resource modifications")
136147

@@ -171,6 +182,8 @@ func (w *Watcher) watch(ctx context.Context, types []k8s.ResourceType) error {
171182
})
172183
}
173184

185+
// processResource checks to see if the suspend status has been modified. If it has, a notification is dispatched. If
186+
// the resource has never been seen before, we simply save the state.
174187
func (w *Watcher) processResource(
175188
ctx context.Context,
176189
resourceRef k8s.ResourceReference,

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func run(ctx context.Context) error {
3131
return errors.New("config path environment variable not set")
3232
}
3333

34-
conf, err := config.Parse(configPath)
34+
conf, err := config.ParseFile(configPath)
3535
if err != nil {
3636
return fmt.Errorf("failed to parse config: %w", err)
3737
}

0 commit comments

Comments
 (0)