Skip to content

Cleanup Kubernetes datagather #571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ predicate.json
*.tgz

_bin
.vscode
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.8.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.30.3
k8s.io/apimachinery v0.30.3
Expand Down Expand Up @@ -53,7 +54,6 @@ require (
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.8.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect
Expand Down
69 changes: 35 additions & 34 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
_ "net/http/pprof"
"os"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/jetstack/preflight/api"
Expand Down Expand Up @@ -101,7 +101,15 @@ func Run(cmd *cobra.Command, args []string) {
}

dataGatherers := map[string]datagatherer.DataGatherer{}
var wg sync.WaitGroup
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This waitgroup was not used.

group, gctx := errgroup.WithContext(ctx)

defer func() {
// TODO: replace Fatalf log calls with Errorf and return the error
cancel()
if err := group.Wait(); err != nil {
logs.Log.Fatalf("failed to wait for controller-runtime component to stop: %v", err)
}
}()
Comment on lines +106 to +112
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this doing? It seems like we now cancel immediately, and then block on group.Wait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, we cancel the context to start the shutdown and wait for the shutdown to complete using group.Wait

Copy link
Member

@maelvls maelvls Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we try to shutdown here? I don't understand the purpose of this defer func.

From what I can tell, it should be the opposite: if the errgroup ever ends, cancel the context so that the other things stop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An error group will always cancel its context when one of the goroutines returns an error.

For successful execution, we only reach this defer call when the following for loop ends because OneShot is true:

for {
// if period is set in the config, then use that if not already set
if Flags.Period == 0 && config.Period > 0 {
logs.Log.Printf("Using period from config %s", config.Period)
Flags.Period = config.Period
}
gatherAndOutputData(config, preflightClient, dataGatherers)
if Flags.OneShot {
break
}
time.Sleep(Flags.Period)
}

If OneShot is false, the loop will keep looping.

Copy link
Member

@maelvls maelvls Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, this information should be written down as a comment in code since this section of code isn't obvious.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about that, I was reading go func instead of defer func... It now makes sense.


// load datagatherer config and boot each one
for _, dgConfig := range config.DataGatherers {
Expand All @@ -111,32 +119,20 @@ func Run(cmd *cobra.Command, args []string) {
logs.Log.Fatalf("running data gatherer %s of type %s as Local, data-path override present: %s", dgConfig.Name, dgConfig.Kind, dgConfig.DataPath)
}

newDg, err := dgConfig.Config.NewDataGatherer(ctx)
newDg, err := dgConfig.Config.NewDataGatherer(gctx)
if err != nil {
logs.Log.Fatalf("failed to instantiate %q data gatherer %q: %v", kind, dgConfig.Name, err)
}

logs.Log.Printf("starting %q datagatherer", dgConfig.Name)

// start the data gatherers and wait for the cache sync
if err := newDg.Run(ctx.Done()); err != nil {
logs.Log.Printf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err)
}

// bootCtx is a context with a timeout to allow the informer 5
// seconds to perform an initial sync. It may fail, and that's fine
// too, it will backoff and retry of its own accord. Initial boot
// will only be delayed by a max of 5 seconds.
bootCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

// wait for the informer to complete an initial sync, we do this to
// attempt to have an initial set of data for the first upload of
// the run.
if err := newDg.WaitForCacheSync(bootCtx.Done()); err != nil {
Copy link
Contributor Author

@inteon inteon Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of waiting for each dataGatherer before starting to the next, we should first start all dataGatherers and loop again to wait for them to finish ( that is what I did in the updated code ).

// log sync failure, this might recover in future
logs.Log.Printf("failed to complete initial sync of %q data gatherer %q: %v", kind, dgConfig.Name, err)
}
group.Go(func() error {
if err := newDg.Run(gctx.Done()); err != nil {
return fmt.Errorf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err)
}
return nil
})

// regardless of success, this dataGatherers has been given a
// chance to sync its cache and we will now continue as normal. We
Expand All @@ -145,19 +141,24 @@ func Run(cmd *cobra.Command, args []string) {
dataGatherers[dgConfig.Name] = newDg
}

// wait for initial sync period to complete. if unsuccessful, then crash
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic was not doing anything, since the WaitGroup was not being used.

// and restart.
c := make(chan struct{})
go func() {
defer close(c)
logs.Log.Printf("waiting for datagatherers to complete inital syncs")
wg.Wait()
}()
select {
case <-c:
logs.Log.Printf("datagatherers inital sync completed")
case <-time.After(60 * time.Second):
logs.Log.Fatalf("datagatherers inital sync failed due to timeout of 60 seconds")
// Wait for 5 seconds for all informers to sync. If they fail to sync
// we continue (as we have no way to know if they will recover or not).
//
// bootCtx is a context with a timeout to allow the informer 5
// seconds to perform an initial sync. It may fail, and that's fine
// too, it will backoff and retry of its own accord. Initial boot
// will only be delayed by a max of 5 seconds.
bootCtx, bootCancel := context.WithTimeout(gctx, 5*time.Second)
defer bootCancel()
for _, dgConfig := range config.DataGatherers {
dg := dataGatherers[dgConfig.Name]
// wait for the informer to complete an initial sync, we do this to
// attempt to have an initial set of data for the first upload of
// the run.
if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil {
// log sync failure, this might recover in future
logs.Log.Printf("failed to complete initial sync of %q data gatherer %q: %v", dgConfig.Kind, dgConfig.Name, err)
}
}

// begin the datagathering loop, periodically sending data to the
Expand Down
3 changes: 1 addition & 2 deletions pkg/datagatherer/k8s/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ func onDelete(obj interface{}, dgCache *cache.Cache) {
// creates a new updated instance of a cache object, with the resource
// argument. If the object is present in the cache it fetches the object's
// properties.
func updateCacheGatheredResource(cacheKey string, resource interface{},
dgCache *cache.Cache) *api.GatheredResource {
func updateCacheGatheredResource(cacheKey string, resource interface{}, dgCache *cache.Cache) *api.GatheredResource {
// updated cache object
cacheObject := &api.GatheredResource{
Resource: resource,
Expand Down
19 changes: 10 additions & 9 deletions pkg/datagatherer/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,45 @@ func NewDynamicClient(kubeconfigPath string) (dynamic.Interface, error) {
if err != nil {
return nil, errors.WithStack(err)
}

cl, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, errors.WithStack(err)
}

return cl, nil
}

// NewDiscoveryClient creates a new 'discovery' client using the provided
// kubeconfig. If kubeconfigPath is not set/empty, it will attempt to load
// configuration using the default loading rules.
func NewDiscoveryClient(kubeconfigPath string) (discovery.DiscoveryClient, error) {
var discoveryClient *discovery.DiscoveryClient

func NewDiscoveryClient(kubeconfigPath string) (*discovery.DiscoveryClient, error) {
cfg, err := kubeconfig.LoadRESTConfig(kubeconfigPath)
if err != nil {
return discovery.DiscoveryClient{}, errors.WithStack(err)
return nil, errors.WithStack(err)
}

discoveryClient, err = discovery.NewDiscoveryClientForConfig(cfg)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return *discoveryClient, errors.WithStack(err)
return nil, errors.WithStack(err)
}

return *discoveryClient, nil
return discoveryClient, nil
}

// NewClientSet creates a new kubernetes clientset using the provided kubeconfig.
// If kubeconfigPath is not set/empty, it will attempt to load configuration using
// the default loading rules.
func NewClientSet(kubeconfigPath string) (kubernetes.Interface, error) {
var clientset *kubernetes.Clientset
cfg, err := kubeconfig.LoadRESTConfig(kubeconfigPath)
if err != nil {
return nil, errors.WithStack(err)
}
clientset, err = kubernetes.NewForConfig(cfg)

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, errors.WithStack(err)
}

return clientset, nil
}
2 changes: 1 addition & 1 deletion pkg/datagatherer/k8s/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *ConfigDiscovery) NewDataGatherer(ctx context.Context) (datagatherer.Dat
// DataGathererDiscovery stores the config for a k8s-discovery datagatherer
type DataGathererDiscovery struct {
// The 'discovery' client used for fetching data.
cl discovery.DiscoveryClient
cl *discovery.DiscoveryClient
}

func (g *DataGathererDiscovery) Run(stopCh <-chan struct{}) error {
Expand Down
98 changes: 32 additions & 66 deletions pkg/datagatherer/k8s/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,21 @@ var kubernetesNativeResources = map[schema.GroupVersionResource]sharedInformerFu

// NewDataGatherer constructs a new instance of the generic K8s data-gatherer for the provided
func (c *ConfigDynamic) NewDataGatherer(ctx context.Context) (datagatherer.DataGatherer, error) {
cl, err := NewDynamicClient(c.KubeConfigPath)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only have to create a dynamic client when the GVK is not a native resource.
So this function should be called as part of the else body.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Is it fair to say that there's not really a reason for newDataGathererWithClient to be a separate function? It's fishy that we call it with one of cl or clientset with the other set to nil.

Not a requirement in this PR, of course!

if err != nil {
return nil, err
}

if isNativeResource(c.GroupVersionResource) {
clientset, err := NewClientSet(c.KubeConfigPath)
if err != nil {
return nil, errors.WithStack(err)
}

return c.newDataGathererWithClient(ctx, nil, clientset)
}
} else {
cl, err := NewDynamicClient(c.KubeConfigPath)
if err != nil {
return nil, err
}

return c.newDataGathererWithClient(ctx, cl, nil)
return c.newDataGathererWithClient(ctx, cl, nil)
}
}

func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynamic.Interface, clientset kubernetes.Interface) (datagatherer.DataGatherer, error) {
Expand All @@ -178,8 +179,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami

newDataGatherer := &DataGathererDynamic{
ctx: ctx,
cl: cl,
k8sClientSet: clientset,
groupVersionResource: c.GroupVersionResource,
fieldSelector: fieldSelector.String(),
namespaces: c.IncludeNamespaces,
Expand All @@ -200,34 +199,22 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
informers.WithNamespace(metav1.NamespaceAll),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}))
newDataGatherer.nativeSharedInformer = factory
informer := informerFunc(factory)
informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
onAdd(obj, dgCache)
},
UpdateFunc: func(old, new interface{}) {
onUpdate(old, new, dgCache)
},
DeleteFunc: func(obj interface{}) {
onDelete(obj, dgCache)
}),
)
newDataGatherer.informer = informerFunc(factory)
} else {
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
cl,
60*time.Second,
metav1.NamespaceAll,
func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
},
})
newDataGatherer.informer = informer
return newDataGatherer, nil
)
newDataGatherer.informer = factory.ForResource(c.GroupVersionResource).Informer()
}

factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are creating these factories to create just 1 informer.
I still create the factory ( maybe this can be simplified in the future ), but I now let it go out-of-scope and continue with the created informer instead.

cl,
60*time.Second,
metav1.NamespaceAll,
func(options *metav1.ListOptions) { options.FieldSelector = fieldSelector.String() },
)
resourceInformer := factory.ForResource(c.GroupVersionResource)
informer := resourceInformer.Informer()
newDataGatherer.dynamicSharedInformer = factory
informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
registration, err := newDataGatherer.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
onAdd(obj, dgCache)
},
Expand All @@ -238,7 +225,10 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
onDelete(obj, dgCache)
},
})
newDataGatherer.informer = informer
if err != nil {
return nil, err
}
newDataGatherer.registration = registration

return newDataGatherer, nil
}
Expand All @@ -251,10 +241,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami
// does not have registered as part of its `runtime.Scheme`.
type DataGathererDynamic struct {
ctx context.Context
// The 'dynamic' client used for fetching data.
cl dynamic.Interface
// The k8s clientset used for fetching known resources.
k8sClientSet kubernetes.Interface
// groupVersionResource is the name of the API group, version and resource
// that should be fetched by this data gatherer.
groupVersionResource schema.GroupVersionResource
Expand All @@ -270,19 +256,15 @@ type DataGathererDynamic struct {
// 30 seconds purge time https://pkg.go.dev/github.com/patrickmn/go-cache
cache *cache.Cache
// informer watches the events around the targeted resource and updates the cache
informer k8scache.SharedIndexInformer
dynamicSharedInformer dynamicinformer.DynamicSharedInformerFactory
nativeSharedInformer informers.SharedInformerFactory

// isInitialized is set to true when data is first collected, prior to
// this the fetch method will return an error
isInitialized bool
informer k8scache.SharedIndexInformer
registration k8scache.ResourceEventHandlerRegistration
}

// Run starts the dynamic data gatherer's informers for resource collection.
// Returns error if the data gatherer informer wasn't initialized
// Returns error if the data gatherer informer wasn't initialized, Run blocks
// until the stopCh is closed.
func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error {
if g.dynamicSharedInformer == nil && g.nativeSharedInformer == nil {
if g.informer == nil {
return fmt.Errorf("informer was not initialized, impossible to start")
}

Expand All @@ -299,21 +281,15 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error {
}

// start shared informer
if g.dynamicSharedInformer != nil {
g.dynamicSharedInformer.Start(stopCh)
}

if g.nativeSharedInformer != nil {
g.nativeSharedInformer.Start(stopCh)
}
g.informer.Run(stopCh)

return nil
}

// WaitForCacheSync waits for the data gatherer's informers cache to sync
// before collecting the resources.
func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error {
if !k8scache.WaitForCacheSync(stopCh, g.informer.HasSynced) {
if !k8scache.WaitForCacheSync(stopCh, g.registration.HasSynced) {
return fmt.Errorf("timed out waiting for Kubernetes caches to sync")
}

Expand Down Expand Up @@ -432,16 +408,6 @@ func redactList(list []*api.GatheredResource) error {
return nil
}

// namespaceResourceInterface will 'namespace' a NamespaceableResourceInterface
// if the 'namespace' parameter is non-empty, otherwise it will return the
// given ResourceInterface as-is.
func namespaceResourceInterface(iface dynamic.NamespaceableResourceInterface, namespace string) dynamic.ResourceInterface {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was unused.

if namespace == "" {
return iface
}
return iface.Namespace(namespace)
}

// generateExcludedNamespacesFieldSelector creates a field selector string from
// a list of namespaces to exclude.
func generateExcludedNamespacesFieldSelector(excludeNamespaces []string) fields.Selector {
Expand Down
Loading