diff --git a/.gitignore b/.gitignore index 17966351..60d15f17 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ predicate.json *.tgz _bin +.vscode diff --git a/go.mod b/go.mod index e20deed3..eac955bf 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 + golang.org/x/sync v0.8.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.30.3 k8s.io/apimachinery v0.30.3 @@ -53,7 +54,6 @@ require ( golang.org/x/crypto v0.24.0 // indirect golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect golang.org/x/net v0.26.0 // indirect - golang.org/x/sync v0.8.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 39619f08..91e57bdd 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -11,7 +11,6 @@ import ( _ "net/http/pprof" "os" "strings" - "sync" "time" "github.com/cenkalti/backoff" @@ -19,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/jetstack/preflight/api" @@ -101,7 +101,15 @@ func Run(cmd *cobra.Command, args []string) { } dataGatherers := map[string]datagatherer.DataGatherer{} - var wg sync.WaitGroup + group, gctx := errgroup.WithContext(ctx) + + defer func() { + // TODO: replace Fatalf log calls with Errorf and return the error + cancel() + if err := group.Wait(); err != nil { + logs.Log.Fatalf("failed to wait for controller-runtime component to stop: %v", err) + } + }() // load datagatherer config and boot each one for _, dgConfig := range config.DataGatherers { @@ -111,7 +119,7 @@ func Run(cmd *cobra.Command, args []string) { logs.Log.Fatalf("running data gatherer %s of type %s as Local, data-path override present: %s", dgConfig.Name, dgConfig.Kind, dgConfig.DataPath) } - newDg, err := dgConfig.Config.NewDataGatherer(ctx) + newDg, err := dgConfig.Config.NewDataGatherer(gctx) if err != nil { logs.Log.Fatalf("failed to instantiate %q data gatherer %q: %v", kind, dgConfig.Name, err) } @@ -119,24 +127,12 @@ func Run(cmd *cobra.Command, args []string) { logs.Log.Printf("starting %q datagatherer", dgConfig.Name) // start the data gatherers and wait for the cache sync - if err := newDg.Run(ctx.Done()); err != nil { - logs.Log.Printf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err) - } - - // bootCtx is a context with a timeout to allow the informer 5 - // seconds to perform an initial sync. It may fail, and that's fine - // too, it will backoff and retry of its own accord. Initial boot - // will only be delayed by a max of 5 seconds. - bootCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - // wait for the informer to complete an initial sync, we do this to - // attempt to have an initial set of data for the first upload of - // the run. - if err := newDg.WaitForCacheSync(bootCtx.Done()); err != nil { - // log sync failure, this might recover in future - logs.Log.Printf("failed to complete initial sync of %q data gatherer %q: %v", kind, dgConfig.Name, err) - } + group.Go(func() error { + if err := newDg.Run(gctx.Done()); err != nil { + return fmt.Errorf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err) + } + return nil + }) // regardless of success, this dataGatherers has been given a // chance to sync its cache and we will now continue as normal. We @@ -145,19 +141,24 @@ func Run(cmd *cobra.Command, args []string) { dataGatherers[dgConfig.Name] = newDg } - // wait for initial sync period to complete. if unsuccessful, then crash - // and restart. - c := make(chan struct{}) - go func() { - defer close(c) - logs.Log.Printf("waiting for datagatherers to complete inital syncs") - wg.Wait() - }() - select { - case <-c: - logs.Log.Printf("datagatherers inital sync completed") - case <-time.After(60 * time.Second): - logs.Log.Fatalf("datagatherers inital sync failed due to timeout of 60 seconds") + // Wait for 5 seconds for all informers to sync. If they fail to sync + // we continue (as we have no way to know if they will recover or not). + // + // bootCtx is a context with a timeout to allow the informer 5 + // seconds to perform an initial sync. It may fail, and that's fine + // too, it will backoff and retry of its own accord. Initial boot + // will only be delayed by a max of 5 seconds. + bootCtx, bootCancel := context.WithTimeout(gctx, 5*time.Second) + defer bootCancel() + for _, dgConfig := range config.DataGatherers { + dg := dataGatherers[dgConfig.Name] + // wait for the informer to complete an initial sync, we do this to + // attempt to have an initial set of data for the first upload of + // the run. + if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil { + // log sync failure, this might recover in future + logs.Log.Printf("failed to complete initial sync of %q data gatherer %q: %v", dgConfig.Kind, dgConfig.Name, err) + } } // begin the datagathering loop, periodically sending data to the diff --git a/pkg/datagatherer/k8s/cache.go b/pkg/datagatherer/k8s/cache.go index 98d1df6e..fd89c580 100644 --- a/pkg/datagatherer/k8s/cache.go +++ b/pkg/datagatherer/k8s/cache.go @@ -75,8 +75,7 @@ func onDelete(obj interface{}, dgCache *cache.Cache) { // creates a new updated instance of a cache object, with the resource // argument. If the object is present in the cache it fetches the object's // properties. -func updateCacheGatheredResource(cacheKey string, resource interface{}, - dgCache *cache.Cache) *api.GatheredResource { +func updateCacheGatheredResource(cacheKey string, resource interface{}, dgCache *cache.Cache) *api.GatheredResource { // updated cache object cacheObject := &api.GatheredResource{ Resource: resource, diff --git a/pkg/datagatherer/k8s/client.go b/pkg/datagatherer/k8s/client.go index 8205c603..2dd01b4c 100644 --- a/pkg/datagatherer/k8s/client.go +++ b/pkg/datagatherer/k8s/client.go @@ -18,44 +18,45 @@ func NewDynamicClient(kubeconfigPath string) (dynamic.Interface, error) { if err != nil { return nil, errors.WithStack(err) } + cl, err := dynamic.NewForConfig(cfg) if err != nil { return nil, errors.WithStack(err) } + return cl, nil } // NewDiscoveryClient creates a new 'discovery' client using the provided // kubeconfig. If kubeconfigPath is not set/empty, it will attempt to load // configuration using the default loading rules. -func NewDiscoveryClient(kubeconfigPath string) (discovery.DiscoveryClient, error) { - var discoveryClient *discovery.DiscoveryClient - +func NewDiscoveryClient(kubeconfigPath string) (*discovery.DiscoveryClient, error) { cfg, err := kubeconfig.LoadRESTConfig(kubeconfigPath) if err != nil { - return discovery.DiscoveryClient{}, errors.WithStack(err) + return nil, errors.WithStack(err) } - discoveryClient, err = discovery.NewDiscoveryClientForConfig(cfg) + discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) if err != nil { - return *discoveryClient, errors.WithStack(err) + return nil, errors.WithStack(err) } - return *discoveryClient, nil + return discoveryClient, nil } // NewClientSet creates a new kubernetes clientset using the provided kubeconfig. // If kubeconfigPath is not set/empty, it will attempt to load configuration using // the default loading rules. func NewClientSet(kubeconfigPath string) (kubernetes.Interface, error) { - var clientset *kubernetes.Clientset cfg, err := kubeconfig.LoadRESTConfig(kubeconfigPath) if err != nil { return nil, errors.WithStack(err) } - clientset, err = kubernetes.NewForConfig(cfg) + + clientset, err := kubernetes.NewForConfig(cfg) if err != nil { return nil, errors.WithStack(err) } + return clientset, nil } diff --git a/pkg/datagatherer/k8s/discovery.go b/pkg/datagatherer/k8s/discovery.go index 0a6dfe45..9eec310e 100644 --- a/pkg/datagatherer/k8s/discovery.go +++ b/pkg/datagatherer/k8s/discovery.go @@ -43,7 +43,7 @@ func (c *ConfigDiscovery) NewDataGatherer(ctx context.Context) (datagatherer.Dat // DataGathererDiscovery stores the config for a k8s-discovery datagatherer type DataGathererDiscovery struct { // The 'discovery' client used for fetching data. - cl discovery.DiscoveryClient + cl *discovery.DiscoveryClient } func (g *DataGathererDiscovery) Run(stopCh <-chan struct{}) error { diff --git a/pkg/datagatherer/k8s/dynamic.go b/pkg/datagatherer/k8s/dynamic.go index 20ddda1a..ca7f8d3a 100644 --- a/pkg/datagatherer/k8s/dynamic.go +++ b/pkg/datagatherer/k8s/dynamic.go @@ -143,20 +143,21 @@ var kubernetesNativeResources = map[schema.GroupVersionResource]sharedInformerFu // NewDataGatherer constructs a new instance of the generic K8s data-gatherer for the provided func (c *ConfigDynamic) NewDataGatherer(ctx context.Context) (datagatherer.DataGatherer, error) { - cl, err := NewDynamicClient(c.KubeConfigPath) - if err != nil { - return nil, err - } - if isNativeResource(c.GroupVersionResource) { clientset, err := NewClientSet(c.KubeConfigPath) if err != nil { return nil, errors.WithStack(err) } + return c.newDataGathererWithClient(ctx, nil, clientset) - } + } else { + cl, err := NewDynamicClient(c.KubeConfigPath) + if err != nil { + return nil, err + } - return c.newDataGathererWithClient(ctx, cl, nil) + return c.newDataGathererWithClient(ctx, cl, nil) + } } func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynamic.Interface, clientset kubernetes.Interface) (datagatherer.DataGatherer, error) { @@ -178,8 +179,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami newDataGatherer := &DataGathererDynamic{ ctx: ctx, - cl: cl, - k8sClientSet: clientset, groupVersionResource: c.GroupVersionResource, fieldSelector: fieldSelector.String(), namespaces: c.IncludeNamespaces, @@ -200,34 +199,22 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami informers.WithNamespace(metav1.NamespaceAll), informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.FieldSelector = fieldSelector.String() - })) - newDataGatherer.nativeSharedInformer = factory - informer := informerFunc(factory) - informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - onAdd(obj, dgCache) - }, - UpdateFunc: func(old, new interface{}) { - onUpdate(old, new, dgCache) - }, - DeleteFunc: func(obj interface{}) { - onDelete(obj, dgCache) + }), + ) + newDataGatherer.informer = informerFunc(factory) + } else { + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( + cl, + 60*time.Second, + metav1.NamespaceAll, + func(options *metav1.ListOptions) { + options.FieldSelector = fieldSelector.String() }, - }) - newDataGatherer.informer = informer - return newDataGatherer, nil + ) + newDataGatherer.informer = factory.ForResource(c.GroupVersionResource).Informer() } - factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( - cl, - 60*time.Second, - metav1.NamespaceAll, - func(options *metav1.ListOptions) { options.FieldSelector = fieldSelector.String() }, - ) - resourceInformer := factory.ForResource(c.GroupVersionResource) - informer := resourceInformer.Informer() - newDataGatherer.dynamicSharedInformer = factory - informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ + registration, err := newDataGatherer.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { onAdd(obj, dgCache) }, @@ -238,7 +225,10 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami onDelete(obj, dgCache) }, }) - newDataGatherer.informer = informer + if err != nil { + return nil, err + } + newDataGatherer.registration = registration return newDataGatherer, nil } @@ -251,10 +241,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami // does not have registered as part of its `runtime.Scheme`. type DataGathererDynamic struct { ctx context.Context - // The 'dynamic' client used for fetching data. - cl dynamic.Interface - // The k8s clientset used for fetching known resources. - k8sClientSet kubernetes.Interface // groupVersionResource is the name of the API group, version and resource // that should be fetched by this data gatherer. groupVersionResource schema.GroupVersionResource @@ -270,19 +256,15 @@ type DataGathererDynamic struct { // 30 seconds purge time https://pkg.go.dev/github.com/patrickmn/go-cache cache *cache.Cache // informer watches the events around the targeted resource and updates the cache - informer k8scache.SharedIndexInformer - dynamicSharedInformer dynamicinformer.DynamicSharedInformerFactory - nativeSharedInformer informers.SharedInformerFactory - - // isInitialized is set to true when data is first collected, prior to - // this the fetch method will return an error - isInitialized bool + informer k8scache.SharedIndexInformer + registration k8scache.ResourceEventHandlerRegistration } // Run starts the dynamic data gatherer's informers for resource collection. -// Returns error if the data gatherer informer wasn't initialized +// Returns error if the data gatherer informer wasn't initialized, Run blocks +// until the stopCh is closed. func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { - if g.dynamicSharedInformer == nil && g.nativeSharedInformer == nil { + if g.informer == nil { return fmt.Errorf("informer was not initialized, impossible to start") } @@ -299,13 +281,7 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { } // start shared informer - if g.dynamicSharedInformer != nil { - g.dynamicSharedInformer.Start(stopCh) - } - - if g.nativeSharedInformer != nil { - g.nativeSharedInformer.Start(stopCh) - } + g.informer.Run(stopCh) return nil } @@ -313,7 +289,7 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { // WaitForCacheSync waits for the data gatherer's informers cache to sync // before collecting the resources. func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error { - if !k8scache.WaitForCacheSync(stopCh, g.informer.HasSynced) { + if !k8scache.WaitForCacheSync(stopCh, g.registration.HasSynced) { return fmt.Errorf("timed out waiting for Kubernetes caches to sync") } @@ -432,16 +408,6 @@ func redactList(list []*api.GatheredResource) error { return nil } -// namespaceResourceInterface will 'namespace' a NamespaceableResourceInterface -// if the 'namespace' parameter is non-empty, otherwise it will return the -// given ResourceInterface as-is. -func namespaceResourceInterface(iface dynamic.NamespaceableResourceInterface, namespace string) dynamic.ResourceInterface { - if namespace == "" { - return iface - } - return iface.Namespace(namespace) -} - // generateExcludedNamespacesFieldSelector creates a field selector string from // a list of namespaces to exclude. func generateExcludedNamespacesFieldSelector(excludeNamespaces []string) fields.Selector { diff --git a/pkg/datagatherer/k8s/dynamic_test.go b/pkg/datagatherer/k8s/dynamic_test.go index 21bdb715..fe88ea0c 100644 --- a/pkg/datagatherer/k8s/dynamic_test.go +++ b/pkg/datagatherer/k8s/dynamic_test.go @@ -121,7 +121,6 @@ func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) { expected := &DataGathererDynamic{ ctx: ctx, - cl: cl, groupVersionResource: config.GroupVersionResource, // it's important that the namespaces are set as the IncludeNamespaces // during initialization @@ -134,9 +133,6 @@ func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) { if !reflect.DeepEqual(gatherer.ctx, expected.ctx) { t.Errorf("expected %v, got %v", expected, dg) } - if !reflect.DeepEqual(gatherer.cl, expected.cl) { - t.Errorf("expected %v, got %v", expected, dg) - } if !reflect.DeepEqual(gatherer.groupVersionResource, expected.groupVersionResource) { t.Errorf("expected %v, got %v", expected, dg) } @@ -149,11 +145,8 @@ func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) { if gatherer.informer == nil { t.Errorf("unexpected resource informer value: %v", nil) } - if gatherer.dynamicSharedInformer == nil { - t.Errorf("unexpected dynamicSharedInformer value: %v", nil) - } - if gatherer.nativeSharedInformer != nil { - t.Errorf("unexpected nativeSharedInformer value: %v. should be nil", gatherer.nativeSharedInformer) + if gatherer.registration == nil { + t.Errorf("unexpected resource event handler registration value: %v", nil) } if !reflect.DeepEqual(gatherer.fieldSelector, expected.fieldSelector) { t.Errorf("expected %v, got %v", expected.fieldSelector, gatherer.fieldSelector) @@ -174,7 +167,6 @@ func TestNewDataGathererWithClientAndSharedIndexInformer(t *testing.T) { expected := &DataGathererDynamic{ ctx: ctx, - k8sClientSet: clientset, groupVersionResource: config.GroupVersionResource, // it's important that the namespaces are set as the IncludeNamespaces // during initialization @@ -186,9 +178,6 @@ func TestNewDataGathererWithClientAndSharedIndexInformer(t *testing.T) { if !reflect.DeepEqual(gatherer.ctx, expected.ctx) { t.Errorf("expected %v, got %v", expected, dg) } - if !reflect.DeepEqual(gatherer.k8sClientSet, expected.k8sClientSet) { - t.Errorf("expected %v, got %v", expected, dg) - } if !reflect.DeepEqual(gatherer.groupVersionResource, expected.groupVersionResource) { t.Errorf("expected %v, got %v", expected, dg) } @@ -201,11 +190,8 @@ func TestNewDataGathererWithClientAndSharedIndexInformer(t *testing.T) { if gatherer.informer == nil { t.Errorf("unexpected resource informer value: %v", nil) } - if gatherer.nativeSharedInformer == nil { - t.Errorf("unexpected nativeSharedInformer value: %v", nil) - } - if gatherer.dynamicSharedInformer != nil { - t.Errorf("unexpected dynamicSharedInformer value: %v. should be nil", gatherer.dynamicSharedInformer) + if gatherer.registration == nil { + t.Errorf("unexpected event handler registration value: %v", nil) } } @@ -638,10 +624,11 @@ func TestDynamicGatherer_Fetch(t *testing.T) { // start data gatherer informer dynamiDg := dg - err = dynamiDg.Run(ctx.Done()) - if err != nil { - t.Fatalf("unexpected client error: %+v", err) - } + go func() { + if err = dynamiDg.Run(ctx.Done()); err != nil { + t.Errorf("unexpected client error: %+v", err) + } + }() err = dynamiDg.WaitForCacheSync(ctx.Done()) if err != nil { t.Fatalf("unexpected client error: %+v", err) @@ -932,10 +919,11 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) { // start data gatherer informer dynamiDg := dg - err = dynamiDg.Run(ctx.Done()) - if err != nil { - t.Fatalf("unexpected client error: %+v", err) - } + go func() { + if err = dynamiDg.Run(ctx.Done()); err != nil { + t.Errorf("unexpected client error: %+v", err) + } + }() err = dynamiDg.WaitForCacheSync(ctx.Done()) if err != nil { t.Fatalf("unexpected client error: %+v", err)