diff --git a/pkg/agent/dummy_data_gatherer.go b/pkg/agent/dummy_data_gatherer.go index 46741edc..73ef6a16 100644 --- a/pkg/agent/dummy_data_gatherer.go +++ b/pkg/agent/dummy_data_gatherer.go @@ -44,7 +44,7 @@ func (g *dummyDataGatherer) Delete() error { return nil } -func (c *dummyDataGatherer) Fetch() (interface{}, error) { +func (c *dummyDataGatherer) Fetch() (interface{}, int, error) { var err error if c.attemptNumber < c.FailedAttempts { err = fmt.Errorf("First %d attempts will fail", c.FailedAttempts) @@ -53,5 +53,5 @@ func (c *dummyDataGatherer) Fetch() (interface{}, error) { err = fmt.Errorf("This data gatherer will always fail") } c.attemptNumber++ - return nil, err + return nil, -1, err } diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 5e5d8383..7bcea229 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -351,7 +351,7 @@ func gatherAndOutputData(config Config, preflightClient client.Client, dataGathe log.Printf("retrying in %v after error: %s", t, err) }) if err != nil { - log.Fatalf("%v", err) + log.Fatalf("Exiting due to fatal error uploading: %v", err) } } @@ -362,14 +362,18 @@ func gatherData(config Config, dataGatherers map[string]datagatherer.DataGathere var dgError *multierror.Error for k, dg := range dataGatherers { - dgData, err := dg.Fetch() + dgData, count, err := dg.Fetch() if err != nil { dgError = multierror.Append(dgError, fmt.Errorf("error in datagatherer %s: %w", k, err)) continue } - log.Printf("successfully gathered data from %q datagatherer", k) + if count >= 0 { + log.Printf("successfully gathered %d items from %q datagatherer", count, k) + } else { + log.Printf("successfully gathered data from %q datagatherer", k) + } readings = append(readings, &api.DataReading{ ClusterID: config.ClusterID, DataGatherer: k, @@ -401,7 +405,6 @@ func gatherData(config Config, dataGatherers map[string]datagatherer.DataGathere func postData(config Config, preflightClient client.Client, readings []*api.DataReading) error { baseURL := config.Server - log.Println("Running Agent...") log.Println("Posting data to:", baseURL) if VenafiCloudMode { @@ -447,7 +450,7 @@ func postData(config Config, preflightClient client.Client, readings []*api.Data } defer res.Body.Close() - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } log.Println("Data sent successfully.") return err diff --git a/pkg/client/client_api_token.go b/pkg/client/client_api_token.go index cade4842..8670d52d 100644 --- a/pkg/client/client_api_token.go +++ b/pkg/client/client_api_token.go @@ -71,7 +71,7 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil diff --git a/pkg/client/client_oauth.go b/pkg/client/client_oauth.go index c7c83381..b5238e3b 100644 --- a/pkg/client/client_oauth.go +++ b/pkg/client/client_oauth.go @@ -125,7 +125,7 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api. errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil diff --git a/pkg/client/client_unauthenticated.go b/pkg/client/client_unauthenticated.go index e05ebfc6..4a8133e7 100644 --- a/pkg/client/client_unauthenticated.go +++ b/pkg/client/client_unauthenticated.go @@ -67,7 +67,7 @@ func (c *UnauthenticatedClient) PostDataReadings(orgID, clusterID string, readin errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil diff --git a/pkg/client/client_venafi_cloud.go b/pkg/client/client_venafi_cloud.go index e81b6fe7..d0068141 100644 --- a/pkg/client/client_venafi_cloud.go +++ b/pkg/client/client_venafi_cloud.go @@ -199,7 +199,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead if err == nil { errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil @@ -235,7 +235,7 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api if err == nil { errorContent = string(body) } - return fmt.Errorf("received response with status code %d. Body: %s", code, errorContent) + return fmt.Errorf("received response with status code %d. Body: [%s]", code, errorContent) } return nil @@ -327,7 +327,7 @@ func (c *VenafiCloudClient) sendHTTPRequest(request *http.Request, responseObjec if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusCreated { body, _ := io.ReadAll(response.Body) - return fmt.Errorf("failed to execute http request to VaaS. Request %s, status code: %d, body: %s", request.URL, response.StatusCode, body) + return fmt.Errorf("failed to execute http request to Venafi Control Plane. Request %s, status code: %d, body: [%s]", request.URL, response.StatusCode, body) } body, err := io.ReadAll(response.Body) diff --git a/pkg/datagatherer/datagatherer.go b/pkg/datagatherer/datagatherer.go index 7b37abef..ec5b8b67 100644 --- a/pkg/datagatherer/datagatherer.go +++ b/pkg/datagatherer/datagatherer.go @@ -12,7 +12,9 @@ type Config interface { // DataGatherer is the interface for Data Gatherers. Data Gatherers are in charge of fetching data from a certain cloud provider API or Kubernetes component. type DataGatherer interface { // Fetch retrieves data. - Fetch() (interface{}, error) + // count is the number of items that were discovered. A negative count means the number + // of items was indeterminate. + Fetch() (data interface{}, count int, err error) // Run starts the data gatherer's informers for resource collection. // Returns error if the data gatherer informer wasn't initialized Run(stopCh <-chan struct{}) error diff --git a/pkg/datagatherer/k8s/client.go b/pkg/datagatherer/k8s/client.go index d8513205..f7c75824 100644 --- a/pkg/datagatherer/k8s/client.go +++ b/pkg/datagatherer/k8s/client.go @@ -33,7 +33,7 @@ func NewDiscoveryClient(kubeconfigPath string) (discovery.DiscoveryClient, error cfg, err := loadRESTConfig(kubeconfigPath) if err != nil { - return *discoveryClient, errors.WithStack(err) + return discovery.DiscoveryClient{}, errors.WithStack(err) } discoveryClient, err = discovery.NewDiscoveryClientForConfig(cfg) diff --git a/pkg/datagatherer/k8s/discovery.go b/pkg/datagatherer/k8s/discovery.go index 6b7ecef6..0a6dfe45 100644 --- a/pkg/datagatherer/k8s/discovery.go +++ b/pkg/datagatherer/k8s/discovery.go @@ -62,10 +62,10 @@ func (g *DataGathererDiscovery) Delete() error { } // Fetch will fetch discovery data from the apiserver, or return an error -func (g *DataGathererDiscovery) Fetch() (interface{}, error) { +func (g *DataGathererDiscovery) Fetch() (interface{}, int, error) { data, err := g.cl.ServerVersion() if err != nil { - return nil, fmt.Errorf("failed to get server version: %v", err) + return nil, -1, fmt.Errorf("failed to get server version: %v", err) } response := map[string]interface{}{ @@ -73,5 +73,5 @@ func (g *DataGathererDiscovery) Fetch() (interface{}, error) { "server_version": data, } - return response, nil + return response, len(response), nil } diff --git a/pkg/datagatherer/k8s/dynamic.go b/pkg/datagatherer/k8s/dynamic.go index ef29f2bb..c9007017 100644 --- a/pkg/datagatherer/k8s/dynamic.go +++ b/pkg/datagatherer/k8s/dynamic.go @@ -251,8 +251,6 @@ type DataGathererDynamic struct { informer k8scache.SharedIndexInformer dynamicSharedInformer dynamicinformer.DynamicSharedInformerFactory nativeSharedInformer informers.SharedInformerFactory - informerCtx context.Context - informerCancel context.CancelFunc // isInitialized is set to true when data is first collected, prior to // this the fetch method will return an error @@ -266,12 +264,6 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { return fmt.Errorf("informer was not initialized, impossible to start") } - // starting a new ctx for the informer - // WithCancel copies the parent ctx and creates a new done() channel - informerCtx, cancel := context.WithCancel(g.ctx) - g.informerCtx = informerCtx - g.informerCancel = cancel - // attach WatchErrorHandler, it needs to be set before starting an informer err := g.informer.SetWatchErrorHandler(func(r *k8scache.Reflector, err error) { if strings.Contains(fmt.Sprintf("%s", err), "the server could not find the requested resource") { @@ -279,8 +271,6 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { } else { log.Printf("datagatherer informer for %q has failed and is backing off due to error: %s", g.groupVersionResource, err) } - // cancel the informer ctx to stop the informer in case of error - cancel() }) if err != nil { return fmt.Errorf("failed to SetWatchErrorHandler on informer: %s", err) @@ -302,7 +292,7 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { // before collecting the resources. func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error { if !k8scache.WaitForCacheSync(stopCh, g.informer.HasSynced) { - return fmt.Errorf("timed out waiting for caches to sync, using parent stop channel") + return fmt.Errorf("timed out waiting for Kubernetes caches to sync") } return nil @@ -312,15 +302,14 @@ func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error { // informer func (g *DataGathererDynamic) Delete() error { g.cache.Flush() - g.informerCancel() return nil } // Fetch will fetch the requested data from the apiserver, or return an error // if fetching the data fails. -func (g *DataGathererDynamic) Fetch() (interface{}, error) { +func (g *DataGathererDynamic) Fetch() (interface{}, int, error) { if g.groupVersionResource.String() == "" { - return nil, fmt.Errorf("resource type must be specified") + return nil, -1, fmt.Errorf("resource type must be specified") } var list = map[string]interface{}{} @@ -344,19 +333,19 @@ func (g *DataGathererDynamic) Fetch() (interface{}, error) { } continue } - return nil, fmt.Errorf("failed to parse cached resource") + return nil, -1, fmt.Errorf("failed to parse cached resource") } // Redact Secret data err := redactList(items) if err != nil { - return nil, errors.WithStack(err) + return nil, -1, errors.WithStack(err) } // add gathered resources to items list["items"] = items - return list, nil + return list, len(items), nil } func redactList(list []*api.GatheredResource) error { diff --git a/pkg/datagatherer/k8s/dynamic_test.go b/pkg/datagatherer/k8s/dynamic_test.go index d4b01005..150304ff 100644 --- a/pkg/datagatherer/k8s/dynamic_test.go +++ b/pkg/datagatherer/k8s/dynamic_test.go @@ -633,7 +633,7 @@ func TestDynamicGatherer_Fetch(t *testing.T) { if waitTimeout(&wg, 30*time.Second) { t.Fatalf("unexpected timeout") } - res, err := dynamiDg.Fetch() + res, count, err := dynamiDg.Fetch() if err != nil && !tc.err { t.Errorf("expected no error but got: %v", err) } @@ -662,6 +662,10 @@ func TestDynamicGatherer_Fetch(t *testing.T) { gotJSON, _ := json.MarshalIndent(list, "", " ") t.Fatalf("unexpected JSON: \ngot \n%s\nwant\n%s", string(gotJSON), expectedJSON) } + + if len(list) != count { + t.Errorf("wrong count of resources reported: got %d, want %d", count, len(list)) + } } }) } @@ -922,7 +926,7 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) { if waitTimeout(&wg, 5*time.Second) { t.Fatalf("unexpected timeout") } - res, err := dynamiDg.Fetch() + res, count, err := dynamiDg.Fetch() if err != nil && !tc.err { t.Errorf("expected no error but got: %v", err) } @@ -951,6 +955,10 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) { gotJSON, _ := json.MarshalIndent(list, "", " ") t.Fatalf("unexpected JSON: \ngot \n%s\nwant\n%s", string(gotJSON), expectedJSON) } + + if len(list) != count { + t.Errorf("wrong count of resources reported: got %d, want %d", count, len(list)) + } } }) } diff --git a/pkg/datagatherer/local/local.go b/pkg/datagatherer/local/local.go index 1e2512fe..248fcb46 100644 --- a/pkg/datagatherer/local/local.go +++ b/pkg/datagatherer/local/local.go @@ -54,10 +54,10 @@ func (g *DataGatherer) WaitForCacheSync(stopCh <-chan struct{}) error { } // Fetch loads and returns the data from the LocalDatagatherer's dataPath -func (g *DataGatherer) Fetch() (interface{}, error) { +func (g *DataGatherer) Fetch() (interface{}, int, error) { dataBytes, err := ioutil.ReadFile(g.dataPath) if err != nil { - return nil, err + return nil, -1, err } - return dataBytes, nil + return dataBytes, -1, nil }