-
Notifications
You must be signed in to change notification settings - Fork 25
Cleanup Kubernetes datagather #571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,3 +14,4 @@ predicate.json | |
*.tgz | ||
|
||
_bin | ||
.vscode |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -11,14 +11,14 @@ import ( | |||||||||||||||||||||||||||||||
_ "net/http/pprof" | ||||||||||||||||||||||||||||||||
"os" | ||||||||||||||||||||||||||||||||
"strings" | ||||||||||||||||||||||||||||||||
"sync" | ||||||||||||||||||||||||||||||||
"time" | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
"github.com/cenkalti/backoff" | ||||||||||||||||||||||||||||||||
"github.com/hashicorp/go-multierror" | ||||||||||||||||||||||||||||||||
"github.com/prometheus/client_golang/prometheus" | ||||||||||||||||||||||||||||||||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||||||||||||||||||||||||||||||||
"github.com/spf13/cobra" | ||||||||||||||||||||||||||||||||
"golang.org/x/sync/errgroup" | ||||||||||||||||||||||||||||||||
"sigs.k8s.io/controller-runtime/pkg/manager" | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
"github.com/jetstack/preflight/api" | ||||||||||||||||||||||||||||||||
|
@@ -101,7 +101,15 @@ func Run(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
dataGatherers := map[string]datagatherer.DataGatherer{} | ||||||||||||||||||||||||||||||||
var wg sync.WaitGroup | ||||||||||||||||||||||||||||||||
group, gctx := errgroup.WithContext(ctx) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||||||||||||
// TODO: replace Fatalf log calls with Errorf and return the error | ||||||||||||||||||||||||||||||||
cancel() | ||||||||||||||||||||||||||||||||
if err := group.Wait(); err != nil { | ||||||||||||||||||||||||||||||||
logs.Log.Fatalf("failed to wait for controller-runtime component to stop: %v", err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||
Comment on lines
+106
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this doing? It seems like we now cancel immediately, and then block on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, we cancel the context to start the shutdown and wait for the shutdown to complete using group.Wait There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we try to shutdown here? I don't understand the purpose of this From what I can tell, it should be the opposite: if the errgroup ever ends, cancel the context so that the other things stop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An error group will always cancel its context when one of the goroutines returns an error. For successful execution, we only reach this defer call when the following for loop ends because OneShot is true: jetstack-secure/pkg/agent/run.go Lines 167 to 181 in 7f756c3
If OneShot is false, the loop will keep looping. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my opinion, this information should be written down as a comment in code since this section of code isn't obvious. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry about that, I was reading |
||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// load datagatherer config and boot each one | ||||||||||||||||||||||||||||||||
for _, dgConfig := range config.DataGatherers { | ||||||||||||||||||||||||||||||||
|
@@ -111,32 +119,20 @@ func Run(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||
logs.Log.Fatalf("running data gatherer %s of type %s as Local, data-path override present: %s", dgConfig.Name, dgConfig.Kind, dgConfig.DataPath) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
newDg, err := dgConfig.Config.NewDataGatherer(ctx) | ||||||||||||||||||||||||||||||||
newDg, err := dgConfig.Config.NewDataGatherer(gctx) | ||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||
logs.Log.Fatalf("failed to instantiate %q data gatherer %q: %v", kind, dgConfig.Name, err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
logs.Log.Printf("starting %q datagatherer", dgConfig.Name) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// start the data gatherers and wait for the cache sync | ||||||||||||||||||||||||||||||||
if err := newDg.Run(ctx.Done()); err != nil { | ||||||||||||||||||||||||||||||||
logs.Log.Printf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// bootCtx is a context with a timeout to allow the informer 5 | ||||||||||||||||||||||||||||||||
// seconds to perform an initial sync. It may fail, and that's fine | ||||||||||||||||||||||||||||||||
// too, it will backoff and retry of its own accord. Initial boot | ||||||||||||||||||||||||||||||||
// will only be delayed by a max of 5 seconds. | ||||||||||||||||||||||||||||||||
bootCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | ||||||||||||||||||||||||||||||||
defer cancel() | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// wait for the informer to complete an initial sync, we do this to | ||||||||||||||||||||||||||||||||
// attempt to have an initial set of data for the first upload of | ||||||||||||||||||||||||||||||||
// the run. | ||||||||||||||||||||||||||||||||
if err := newDg.WaitForCacheSync(bootCtx.Done()); err != nil { | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of waiting for each dataGatherer before starting to the next, we should first start all dataGatherers and loop again to wait for them to finish ( that is what I did in the updated code ). |
||||||||||||||||||||||||||||||||
// log sync failure, this might recover in future | ||||||||||||||||||||||||||||||||
logs.Log.Printf("failed to complete initial sync of %q data gatherer %q: %v", kind, dgConfig.Name, err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
group.Go(func() error { | ||||||||||||||||||||||||||||||||
if err := newDg.Run(gctx.Done()); err != nil { | ||||||||||||||||||||||||||||||||
return fmt.Errorf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// regardless of success, this dataGatherers has been given a | ||||||||||||||||||||||||||||||||
// chance to sync its cache and we will now continue as normal. We | ||||||||||||||||||||||||||||||||
|
@@ -145,19 +141,24 @@ func Run(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||
dataGatherers[dgConfig.Name] = newDg | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// wait for initial sync period to complete. if unsuccessful, then crash | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic was not doing anything, since the WaitGroup was not being used. |
||||||||||||||||||||||||||||||||
// and restart. | ||||||||||||||||||||||||||||||||
c := make(chan struct{}) | ||||||||||||||||||||||||||||||||
go func() { | ||||||||||||||||||||||||||||||||
defer close(c) | ||||||||||||||||||||||||||||||||
logs.Log.Printf("waiting for datagatherers to complete inital syncs") | ||||||||||||||||||||||||||||||||
wg.Wait() | ||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||
select { | ||||||||||||||||||||||||||||||||
case <-c: | ||||||||||||||||||||||||||||||||
logs.Log.Printf("datagatherers inital sync completed") | ||||||||||||||||||||||||||||||||
case <-time.After(60 * time.Second): | ||||||||||||||||||||||||||||||||
logs.Log.Fatalf("datagatherers inital sync failed due to timeout of 60 seconds") | ||||||||||||||||||||||||||||||||
// Wait for 5 seconds for all informers to sync. If they fail to sync | ||||||||||||||||||||||||||||||||
// we continue (as we have no way to know if they will recover or not). | ||||||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||||||
// bootCtx is a context with a timeout to allow the informer 5 | ||||||||||||||||||||||||||||||||
// seconds to perform an initial sync. It may fail, and that's fine | ||||||||||||||||||||||||||||||||
// too, it will backoff and retry of its own accord. Initial boot | ||||||||||||||||||||||||||||||||
// will only be delayed by a max of 5 seconds. | ||||||||||||||||||||||||||||||||
bootCtx, bootCancel := context.WithTimeout(gctx, 5*time.Second) | ||||||||||||||||||||||||||||||||
defer bootCancel() | ||||||||||||||||||||||||||||||||
for _, dgConfig := range config.DataGatherers { | ||||||||||||||||||||||||||||||||
dg := dataGatherers[dgConfig.Name] | ||||||||||||||||||||||||||||||||
// wait for the informer to complete an initial sync, we do this to | ||||||||||||||||||||||||||||||||
// attempt to have an initial set of data for the first upload of | ||||||||||||||||||||||||||||||||
// the run. | ||||||||||||||||||||||||||||||||
if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil { | ||||||||||||||||||||||||||||||||
// log sync failure, this might recover in future | ||||||||||||||||||||||||||||||||
logs.Log.Printf("failed to complete initial sync of %q data gatherer %q: %v", dgConfig.Kind, dgConfig.Name, err) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
// begin the datagathering loop, periodically sending data to the | ||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,20 +143,21 @@ var kubernetesNativeResources = map[schema.GroupVersionResource]sharedInformerFu | |
|
||
// NewDataGatherer constructs a new instance of the generic K8s data-gatherer for the provided | ||
func (c *ConfigDynamic) NewDataGatherer(ctx context.Context) (datagatherer.DataGatherer, error) { | ||
cl, err := NewDynamicClient(c.KubeConfigPath) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only have to create a dynamic client when the GVK is not a native resource. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense! Is it fair to say that there's not really a reason for newDataGathererWithClient to be a separate function? It's fishy that we call it with one of cl or clientset with the other set to nil. Not a requirement in this PR, of course! |
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if isNativeResource(c.GroupVersionResource) { | ||
clientset, err := NewClientSet(c.KubeConfigPath) | ||
if err != nil { | ||
return nil, errors.WithStack(err) | ||
} | ||
|
||
return c.newDataGathererWithClient(ctx, nil, clientset) | ||
} | ||
} else { | ||
cl, err := NewDynamicClient(c.KubeConfigPath) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return c.newDataGathererWithClient(ctx, cl, nil) | ||
return c.newDataGathererWithClient(ctx, cl, nil) | ||
} | ||
} | ||
|
||
func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynamic.Interface, clientset kubernetes.Interface) (datagatherer.DataGatherer, error) { | ||
|
@@ -178,8 +179,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami | |
|
||
newDataGatherer := &DataGathererDynamic{ | ||
ctx: ctx, | ||
cl: cl, | ||
k8sClientSet: clientset, | ||
groupVersionResource: c.GroupVersionResource, | ||
fieldSelector: fieldSelector.String(), | ||
namespaces: c.IncludeNamespaces, | ||
|
@@ -200,34 +199,22 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami | |
informers.WithNamespace(metav1.NamespaceAll), | ||
informers.WithTweakListOptions(func(options *metav1.ListOptions) { | ||
options.FieldSelector = fieldSelector.String() | ||
})) | ||
newDataGatherer.nativeSharedInformer = factory | ||
informer := informerFunc(factory) | ||
informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ | ||
AddFunc: func(obj interface{}) { | ||
onAdd(obj, dgCache) | ||
}, | ||
UpdateFunc: func(old, new interface{}) { | ||
onUpdate(old, new, dgCache) | ||
}, | ||
DeleteFunc: func(obj interface{}) { | ||
onDelete(obj, dgCache) | ||
}), | ||
) | ||
newDataGatherer.informer = informerFunc(factory) | ||
} else { | ||
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( | ||
cl, | ||
60*time.Second, | ||
metav1.NamespaceAll, | ||
func(options *metav1.ListOptions) { | ||
options.FieldSelector = fieldSelector.String() | ||
}, | ||
}) | ||
newDataGatherer.informer = informer | ||
return newDataGatherer, nil | ||
) | ||
newDataGatherer.informer = factory.ForResource(c.GroupVersionResource).Informer() | ||
} | ||
|
||
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are creating these factories to create just 1 informer. |
||
cl, | ||
60*time.Second, | ||
metav1.NamespaceAll, | ||
func(options *metav1.ListOptions) { options.FieldSelector = fieldSelector.String() }, | ||
) | ||
resourceInformer := factory.ForResource(c.GroupVersionResource) | ||
informer := resourceInformer.Informer() | ||
newDataGatherer.dynamicSharedInformer = factory | ||
informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ | ||
registration, err := newDataGatherer.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ | ||
AddFunc: func(obj interface{}) { | ||
onAdd(obj, dgCache) | ||
}, | ||
|
@@ -238,7 +225,10 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami | |
onDelete(obj, dgCache) | ||
}, | ||
}) | ||
newDataGatherer.informer = informer | ||
if err != nil { | ||
return nil, err | ||
} | ||
newDataGatherer.registration = registration | ||
|
||
return newDataGatherer, nil | ||
} | ||
|
@@ -251,10 +241,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami | |
// does not have registered as part of its `runtime.Scheme`. | ||
type DataGathererDynamic struct { | ||
ctx context.Context | ||
// The 'dynamic' client used for fetching data. | ||
cl dynamic.Interface | ||
// The k8s clientset used for fetching known resources. | ||
k8sClientSet kubernetes.Interface | ||
// groupVersionResource is the name of the API group, version and resource | ||
// that should be fetched by this data gatherer. | ||
groupVersionResource schema.GroupVersionResource | ||
|
@@ -270,19 +256,15 @@ type DataGathererDynamic struct { | |
// 30 seconds purge time https://pkg.go.dev/github.com/patrickmn/go-cache | ||
cache *cache.Cache | ||
// informer watches the events around the targeted resource and updates the cache | ||
informer k8scache.SharedIndexInformer | ||
dynamicSharedInformer dynamicinformer.DynamicSharedInformerFactory | ||
nativeSharedInformer informers.SharedInformerFactory | ||
|
||
// isInitialized is set to true when data is first collected, prior to | ||
// this the fetch method will return an error | ||
isInitialized bool | ||
informer k8scache.SharedIndexInformer | ||
registration k8scache.ResourceEventHandlerRegistration | ||
} | ||
|
||
// Run starts the dynamic data gatherer's informers for resource collection. | ||
// Returns error if the data gatherer informer wasn't initialized | ||
// Returns error if the data gatherer informer wasn't initialized, Run blocks | ||
// until the stopCh is closed. | ||
func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { | ||
if g.dynamicSharedInformer == nil && g.nativeSharedInformer == nil { | ||
if g.informer == nil { | ||
return fmt.Errorf("informer was not initialized, impossible to start") | ||
} | ||
|
||
|
@@ -299,21 +281,15 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { | |
} | ||
|
||
// start shared informer | ||
if g.dynamicSharedInformer != nil { | ||
g.dynamicSharedInformer.Start(stopCh) | ||
} | ||
|
||
if g.nativeSharedInformer != nil { | ||
g.nativeSharedInformer.Start(stopCh) | ||
} | ||
g.informer.Run(stopCh) | ||
|
||
return nil | ||
} | ||
|
||
// WaitForCacheSync waits for the data gatherer's informers cache to sync | ||
// before collecting the resources. | ||
func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error { | ||
if !k8scache.WaitForCacheSync(stopCh, g.informer.HasSynced) { | ||
if !k8scache.WaitForCacheSync(stopCh, g.registration.HasSynced) { | ||
return fmt.Errorf("timed out waiting for Kubernetes caches to sync") | ||
} | ||
|
||
|
@@ -432,16 +408,6 @@ func redactList(list []*api.GatheredResource) error { | |
return nil | ||
} | ||
|
||
// namespaceResourceInterface will 'namespace' a NamespaceableResourceInterface | ||
// if the 'namespace' parameter is non-empty, otherwise it will return the | ||
// given ResourceInterface as-is. | ||
func namespaceResourceInterface(iface dynamic.NamespaceableResourceInterface, namespace string) dynamic.ResourceInterface { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function was unused. |
||
if namespace == "" { | ||
return iface | ||
} | ||
return iface.Namespace(namespace) | ||
} | ||
|
||
// generateExcludedNamespacesFieldSelector creates a field selector string from | ||
// a list of namespaces to exclude. | ||
func generateExcludedNamespacesFieldSelector(excludeNamespaces []string) fields.Selector { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This waitgroup was not used.