From d2ceb54bca17a32d57a07c42dfc3062ec6e259e8 Mon Sep 17 00:00:00 2001 From: James Westby Date: Mon, 13 May 2024 16:46:50 +0100 Subject: [PATCH 1/4] Some simple cleanups to log messages * Log a more informative error message when giving up on uploading readings to the server. This would be the last message before the pod exits, so if the pod ends up in CrashLoopBackoff it's important to highlight this as the reason. * Remove the "Running Agent" log message, it's not clear what it means * Put [] around the body as if the body is empty the log message can read like the following message is the body. ``` 2024/05/10 16:33:43 retrying in 25.555756126s after error: received response with status code 404. Body: W0510 16:33:43.832278 10875 reflector.go:535] pkg/mod/k8s.io/client-go@v0.28.3/tools/cache/reflector.go:229: failed to list route.openshift.io/v1, Resource=routes: the server could not find the requested resource ``` * Remove "using parent stop channel" from a log message as it's not clear what it means. It used to be conditional, and now it isn't, so it's not providing any information, and it's confusing as to what it might be trying to tell you if you aren't familiar with that code. --- pkg/agent/run.go | 5 ++--- pkg/client/client_api_token.go | 2 +- pkg/client/client_oauth.go | 2 +- pkg/client/client_unauthenticated.go | 2 +- pkg/client/client_venafi_cloud.go | 6 +++--- pkg/datagatherer/k8s/dynamic.go | 2 +- 6 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 5e5d8383..60ec0807 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -351,7 +351,7 @@ func gatherAndOutputData(config Config, preflightClient client.Client, dataGathe log.Printf("retrying in %v after error: %s", t, err) }) if err != nil { - log.Fatalf("%v", err) + log.Fatalf("Exiting due to fatal error uploading: %v", err) } } @@ -401,7 +401,6 @@ func gatherData(config Config, dataGatherers map[string]datagatherer.DataGathere func postData(config Config, preflightClient client.Client, readings []*api.DataReading) error { baseURL := config.Server - log.Println("Running Agent...") log.Println("Posting data to:", baseURL) if VenafiCloudMode { @@ -447,7 +446,7 @@ func postData(config Config, preflightClient client.Client, readings []*api.Data } defer res.Body.Close() - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } log.Println("Data sent successfully.") return err diff --git a/pkg/client/client_api_token.go b/pkg/client/client_api_token.go index cade4842..8670d52d 100644 --- a/pkg/client/client_api_token.go +++ b/pkg/client/client_api_token.go @@ -71,7 +71,7 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil diff --git a/pkg/client/client_oauth.go b/pkg/client/client_oauth.go index c7c83381..b5238e3b 100644 --- a/pkg/client/client_oauth.go +++ b/pkg/client/client_oauth.go @@ -125,7 +125,7 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api. errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil diff --git a/pkg/client/client_unauthenticated.go b/pkg/client/client_unauthenticated.go index e05ebfc6..4a8133e7 100644 --- a/pkg/client/client_unauthenticated.go +++ b/pkg/client/client_unauthenticated.go @@ -67,7 +67,7 @@ func (c *UnauthenticatedClient) PostDataReadings(orgID, clusterID string, readin errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil diff --git a/pkg/client/client_venafi_cloud.go b/pkg/client/client_venafi_cloud.go index e81b6fe7..d0068141 100644 --- a/pkg/client/client_venafi_cloud.go +++ b/pkg/client/client_venafi_cloud.go @@ -199,7 +199,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead if err == nil { errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil @@ -235,7 +235,7 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api if err == nil { errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil @@ -327,7 +327,7 @@ func (c *VenafiCloudClient) sendHTTPRequest(request *http.Request, responseObjec if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusCreated { body, _ := io.ReadAll(response.Body) - return fmt.Errorf("failed to execute http request to VaaS. Request %s, status code: %d, body: %s", request.URL, response.StatusCode, body) + return fmt.Errorf("failed to execute http request to Venafi Control Plane. Request %s, status code: %d, body: [%s]", request.URL, response.StatusCode, body) } body, err := io.ReadAll(response.Body) diff --git a/pkg/datagatherer/k8s/dynamic.go b/pkg/datagatherer/k8s/dynamic.go index ef29f2bb..ae0bf32f 100644 --- a/pkg/datagatherer/k8s/dynamic.go +++ b/pkg/datagatherer/k8s/dynamic.go @@ -302,7 +302,7 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { // before collecting the resources. func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error { if !k8scache.WaitForCacheSync(stopCh, g.informer.HasSynced) { - return fmt.Errorf("timed out waiting for caches to sync, using parent stop channel") + return fmt.Errorf("timed out waiting for Kubernetes caches to sync") } return nil From a92953f589690a3f82fbe32d837ba52cfafce9d2 Mon Sep 17 00:00:00 2001 From: James Westby Date: Mon, 13 May 2024 16:52:23 +0100 Subject: [PATCH 2/4] Fix a crash bug when REST config can't be loaded `discoveryClient` is uninitialized, so the pointer can't be deferenced, so return an empty struct if there's an error. --- pkg/datagatherer/k8s/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/datagatherer/k8s/client.go b/pkg/datagatherer/k8s/client.go index d8513205..f7c75824 100644 --- a/pkg/datagatherer/k8s/client.go +++ b/pkg/datagatherer/k8s/client.go @@ -33,7 +33,7 @@ func NewDiscoveryClient(kubeconfigPath string) (discovery.DiscoveryClient, error cfg, err := loadRESTConfig(kubeconfigPath) if err != nil { - return *discoveryClient, errors.WithStack(err) + return discovery.DiscoveryClient{}, errors.WithStack(err) } discoveryClient, err = discovery.NewDiscoveryClientForConfig(cfg) From 8accb51a06bc245dbb779bd2f02135827d6616db Mon Sep 17 00:00:00 2001 From: James Westby Date: Mon, 13 May 2024 16:54:25 +0100 Subject: [PATCH 3/4] Remove the informer context in the dynamic dg The informer context is created, but not passed to anything, so this code doesn't do anything. --- pkg/datagatherer/k8s/dynamic.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/pkg/datagatherer/k8s/dynamic.go b/pkg/datagatherer/k8s/dynamic.go index ae0bf32f..a55ece2e 100644 --- a/pkg/datagatherer/k8s/dynamic.go +++ b/pkg/datagatherer/k8s/dynamic.go @@ -251,8 +251,6 @@ type DataGathererDynamic struct { informer k8scache.SharedIndexInformer dynamicSharedInformer dynamicinformer.DynamicSharedInformerFactory nativeSharedInformer informers.SharedInformerFactory - informerCtx context.Context - informerCancel context.CancelFunc // isInitialized is set to true when data is first collected, prior to // this the fetch method will return an error @@ -266,12 +264,6 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { return fmt.Errorf("informer was not initialized, impossible to start") } - // starting a new ctx for the informer - // WithCancel copies the parent ctx and creates a new done() channel - informerCtx, cancel := context.WithCancel(g.ctx) - g.informerCtx = informerCtx - g.informerCancel = cancel - // attach WatchErrorHandler, it needs to be set before starting an informer err := g.informer.SetWatchErrorHandler(func(r *k8scache.Reflector, err error) { if strings.Contains(fmt.Sprintf("%s", err), "the server could not find the requested resource") { @@ -279,8 +271,6 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { } else { log.Printf("datagatherer informer for %q has failed and is backing off due to error: %s", g.groupVersionResource, err) } - // cancel the informer ctx to stop the informer in case of error - cancel() }) if err != nil { return fmt.Errorf("failed to SetWatchErrorHandler on informer: %s", err) @@ -312,7 +302,6 @@ func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error { // informer func (g *DataGathererDynamic) Delete() error { g.cache.Flush() - g.informerCancel() return nil } From e84e2e26049993a4e624c657ae0005dfb17f8cb7 Mon Sep 17 00:00:00 2001 From: James Westby Date: Mon, 13 May 2024 16:57:17 +0100 Subject: [PATCH 4/4] Log the number of items from each dg This returns the number of items collected by each datagatherer so that the logs tell us a bit more about what was found in the cluster, and can help find where any items have been missed. --- pkg/agent/dummy_data_gatherer.go | 4 ++-- pkg/agent/run.go | 8 ++++++-- pkg/datagatherer/datagatherer.go | 4 +++- pkg/datagatherer/k8s/discovery.go | 6 +++--- pkg/datagatherer/k8s/dynamic.go | 10 +++++----- pkg/datagatherer/k8s/dynamic_test.go | 12 ++++++++++-- pkg/datagatherer/local/local.go | 6 +++--- 7 files changed, 32 insertions(+), 18 deletions(-) diff --git a/pkg/agent/dummy_data_gatherer.go b/pkg/agent/dummy_data_gatherer.go index 46741edc..73ef6a16 100644 --- a/pkg/agent/dummy_data_gatherer.go +++ b/pkg/agent/dummy_data_gatherer.go @@ -44,7 +44,7 @@ func (g *dummyDataGatherer) Delete() error { return nil } -func (c *dummyDataGatherer) Fetch() (interface{}, error) { +func (c *dummyDataGatherer) Fetch() (interface{}, int, error) { var err error if c.attemptNumber < c.FailedAttempts { err = fmt.Errorf("First %d attempts will fail", c.FailedAttempts) @@ -53,5 +53,5 @@ func (c *dummyDataGatherer) Fetch() (interface{}, error) { err = fmt.Errorf("This data gatherer will always fail") } c.attemptNumber++ - return nil, err + return nil, -1, err } diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 60ec0807..7bcea229 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -362,14 +362,18 @@ func gatherData(config Config, dataGatherers map[string]datagatherer.DataGathere var dgError *multierror.Error for k, dg := range dataGatherers { - dgData, err := dg.Fetch() + dgData, count, err := dg.Fetch() if err != nil { dgError = multierror.Append(dgError, fmt.Errorf("error in datagatherer %s: %w", k, err)) continue } - log.Printf("successfully gathered data from %q datagatherer", k) + if count >= 0 { + log.Printf("successfully gathered %d items from %q datagatherer", count, k) + } else { + log.Printf("successfully gathered data from %q datagatherer", k) + } readings = append(readings, &api.DataReading{ ClusterID: config.ClusterID, DataGatherer: k, diff --git a/pkg/datagatherer/datagatherer.go b/pkg/datagatherer/datagatherer.go index 7b37abef..ec5b8b67 100644 --- a/pkg/datagatherer/datagatherer.go +++ b/pkg/datagatherer/datagatherer.go @@ -12,7 +12,9 @@ type Config interface { // DataGatherer is the interface for Data Gatherers. Data Gatherers are in charge of fetching data from a certain cloud provider API or Kubernetes component. type DataGatherer interface { // Fetch retrieves data. - Fetch() (interface{}, error) + // count is the number of items that were discovered. A negative count means the number + // of items was indeterminate. + Fetch() (data interface{}, count int, err error) // Run starts the data gatherer's informers for resource collection. // Returns error if the data gatherer informer wasn't initialized Run(stopCh <-chan struct{}) error diff --git a/pkg/datagatherer/k8s/discovery.go b/pkg/datagatherer/k8s/discovery.go index 6b7ecef6..0a6dfe45 100644 --- a/pkg/datagatherer/k8s/discovery.go +++ b/pkg/datagatherer/k8s/discovery.go @@ -62,10 +62,10 @@ func (g *DataGathererDiscovery) Delete() error { } // Fetch will fetch discovery data from the apiserver, or return an error -func (g *DataGathererDiscovery) Fetch() (interface{}, error) { +func (g *DataGathererDiscovery) Fetch() (interface{}, int, error) { data, err := g.cl.ServerVersion() if err != nil { - return nil, fmt.Errorf("failed to get server version: %v", err) + return nil, -1, fmt.Errorf("failed to get server version: %v", err) } response := map[string]interface{}{ @@ -73,5 +73,5 @@ func (g *DataGathererDiscovery) Fetch() (interface{}, error) { "server_version": data, } - return response, nil + return response, len(response), nil } diff --git a/pkg/datagatherer/k8s/dynamic.go b/pkg/datagatherer/k8s/dynamic.go index a55ece2e..c9007017 100644 --- a/pkg/datagatherer/k8s/dynamic.go +++ b/pkg/datagatherer/k8s/dynamic.go @@ -307,9 +307,9 @@ func (g *DataGathererDynamic) Delete() error { // Fetch will fetch the requested data from the apiserver, or return an error // if fetching the data fails. -func (g *DataGathererDynamic) Fetch() (interface{}, error) { +func (g *DataGathererDynamic) Fetch() (interface{}, int, error) { if g.groupVersionResource.String() == "" { - return nil, fmt.Errorf("resource type must be specified") + return nil, -1, fmt.Errorf("resource type must be specified") } var list = map[string]interface{}{} @@ -333,19 +333,19 @@ func (g *DataGathererDynamic) Fetch() (interface{}, error) { } continue } - return nil, fmt.Errorf("failed to parse cached resource") + return nil, -1, fmt.Errorf("failed to parse cached resource") } // Redact Secret data err := redactList(items) if err != nil { - return nil, errors.WithStack(err) + return nil, -1, errors.WithStack(err) } // add gathered resources to items list["items"] = items - return list, nil + return list, len(items), nil } func redactList(list []*api.GatheredResource) error { diff --git a/pkg/datagatherer/k8s/dynamic_test.go b/pkg/datagatherer/k8s/dynamic_test.go index d4b01005..150304ff 100644 --- a/pkg/datagatherer/k8s/dynamic_test.go +++ b/pkg/datagatherer/k8s/dynamic_test.go @@ -633,7 +633,7 @@ func TestDynamicGatherer_Fetch(t *testing.T) { if waitTimeout(&wg, 30*time.Second) { t.Fatalf("unexpected timeout") } - res, err := dynamiDg.Fetch() + res, count, err := dynamiDg.Fetch() if err != nil && !tc.err { t.Errorf("expected no error but got: %v", err) } @@ -662,6 +662,10 @@ func TestDynamicGatherer_Fetch(t *testing.T) { gotJSON, _ := json.MarshalIndent(list, "", " ") t.Fatalf("unexpected JSON: \ngot \n%s\nwant\n%s", string(gotJSON), expectedJSON) } + + if len(list) != count { + t.Errorf("wrong count of resources reported: got %d, want %d", count, len(list)) + } } }) } @@ -922,7 +926,7 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) { if waitTimeout(&wg, 5*time.Second) { t.Fatalf("unexpected timeout") } - res, err := dynamiDg.Fetch() + res, count, err := dynamiDg.Fetch() if err != nil && !tc.err { t.Errorf("expected no error but got: %v", err) } @@ -951,6 +955,10 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) { gotJSON, _ := json.MarshalIndent(list, "", " ") t.Fatalf("unexpected JSON: \ngot \n%s\nwant\n%s", string(gotJSON), expectedJSON) } + + if len(list) != count { + t.Errorf("wrong count of resources reported: got %d, want %d", count, len(list)) + } } }) } diff --git a/pkg/datagatherer/local/local.go b/pkg/datagatherer/local/local.go index 1e2512fe..248fcb46 100644 --- a/pkg/datagatherer/local/local.go +++ b/pkg/datagatherer/local/local.go @@ -54,10 +54,10 @@ func (g *DataGatherer) WaitForCacheSync(stopCh <-chan struct{}) error { } // Fetch loads and returns the data from the LocalDatagatherer's dataPath -func (g *DataGatherer) Fetch() (interface{}, error) { +func (g *DataGatherer) Fetch() (interface{}, int, error) { dataBytes, err := ioutil.ReadFile(g.dataPath) if err != nil { - return nil, err + return nil, -1, err } - return dataBytes, nil + return dataBytes, -1, nil }