diff --git a/src/Makefile b/src/Makefile index 258861d5..b3bd3939 100644 --- a/src/Makefile +++ b/src/Makefile @@ -48,7 +48,6 @@ SRCS=\ configuration.cc \ updater.cc \ instance.cc \ - docker.cc \ kubernetes.cc \ resource.cc \ oauth2.cc \ diff --git a/src/configuration.cc b/src/configuration.cc index 41c56731..184d327d 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -47,8 +47,7 @@ constexpr const int kMetadataReporterDefaultPurgeDeleted = false; constexpr const char kMetadataReporterDefaultUserAgent[] = "metadata-agent/" STRINGIFY(AGENT_VERSION); constexpr const char kMetadataIngestionDefaultEndpointFormat[] = - "https://stackdriver.googleapis.com/v1beta2/projects/{{project_id}}" - "/resourceMetadata:batchUpdate"; + "https://stackdriver.googleapis.com/batch/resourceMetadata"; constexpr const int kMetadataIngestionDefaultRequestSizeLimitBytes = 8*1024*1024; constexpr const int kMetadataIngestionDefaultRequestSizeLimitCount = 1000; diff --git a/src/docker.cc b/src/docker.cc deleted file mode 100644 index 3040fbfb..00000000 --- a/src/docker.cc +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Copyright 2017 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#include "docker.h" - -#include "local_stream_http.h" -#include -#include -#include - -#include "configuration.h" -#include "format.h" -#include "instance.h" -#include "json.h" -#include "logging.h" -#include "resource.h" -#include "store.h" -#include "time.h" -#include "util.h" - -namespace http = boost::network::http; - -namespace google { - -namespace { - -constexpr const int kDockerValidationRetryLimit = 10; -constexpr const int kDockerValidationRetryDelaySeconds = 1; - -#if 0 -constexpr const char kDockerEndpointHost[] = "unix://%2Fvar%2Frun%2Fdocker.sock/"; -constexpr const char kDockerApiVersion[] = "1.23"; -#endif -constexpr const char kDockerEndpointPath[] = "/containers"; -constexpr const char kDockerContainerResourcePrefix[] = "container"; - -} - -// A subclass of QueryException to represent non-retriable errors. -class DockerReader::NonRetriableError : public DockerReader::QueryException { - public: - NonRetriableError(const std::string& what) : QueryException(what) {} -}; - -DockerReader::DockerReader(const Configuration& config) - : config_(config), environment_(config) {} - -void DockerReader::ValidateDynamicConfiguration() const - throw(MetadataUpdater::ConfigurationValidationError) { - const std::string container_filter( - config_.DockerContainerFilter().empty() - ? "" : "&" + config_.DockerContainerFilter()); - - try { - util::Retry( - kDockerValidationRetryLimit, - time::seconds(kDockerValidationRetryDelaySeconds), - [this, &container_filter]() { - // A limit may exist in the container_filter, however, the docker API only - // uses the first limit provided in the query params. - (void) QueryDocker(std::string(kDockerEndpointPath) + - "/json?all=true&limit=1" + container_filter); - }); - } catch (const NonRetriableError& e) { - throw MetadataUpdater::ConfigurationValidationError( - "Docker query validation failed: " + e.what()); - } catch (const QueryException& e) { - // Already logged. - throw MetadataUpdater::ConfigurationValidationError( - "Docker query validation retry limit reached: " + e.what()); - } -} - -MetadataUpdater::ResourceMetadata DockerReader::GetContainerMetadata( - const json::Object* container, Timestamp collected_at) const - throw(json::Exception) { - const std::string zone = environment_.InstanceZone(); - - const std::string id = container->Get("Id"); - // Inspect the container. - try { - json::value raw_container = QueryDocker( - std::string(kDockerEndpointPath) + "/" + id + "/json"); - if (config_.VerboseLogging()) { - LOG(INFO) << "Parsed metadata: " << *raw_container; - } - - const json::Object* container_desc = raw_container->As(); - const std::string name = container_desc->Get("Name"); - - const std::string created_str = - container_desc->Get("Created"); - Timestamp created_at = time::rfc3339::FromString(created_str); - - const json::Object* state = container_desc->Get("State"); - bool is_deleted = state->Get("Dead"); - - const MonitoredResource resource("docker_container", { - {"location", zone}, - {"container_id", id}, - }); - - json::value instance_resource = - InstanceReader::InstanceResource(environment_).ToJSON(); - - json::value raw_metadata = json::object({ - {"blobs", json::object({ - {"association", json::object({ - {"version", json::string(config_.MetadataIngestionRawContentVersion())}, - {"raw", json::object({ - {"infrastructureResource", std::move(instance_resource)}, - })}, - })}, - {"api", json::object({ - {"version", json::string(config_.DockerApiVersion())}, - {"raw", std::move(raw_container)}, - })}, - })}, - }); - if (config_.VerboseLogging()) { - LOG(INFO) << "Raw docker metadata: " << *raw_metadata; - } - - const std::string resource_id = boost::algorithm::join( - std::vector{kDockerContainerResourcePrefix, id}, - config_.MetadataApiResourceTypeSeparator()); - const std::string resource_name = boost::algorithm::join( - // The container name reported by Docker will always have a leading '/'. - std::vector{kDockerContainerResourcePrefix, name.substr(1)}, - config_.MetadataApiResourceTypeSeparator()); - return MetadataUpdater::ResourceMetadata( - std::vector{resource_id, resource_name}, - resource, -#ifdef ENABLE_DOCKER_METADATA - MetadataStore::Metadata(config_.MetadataIngestionRawContentVersion(), - is_deleted, created_at, collected_at, - std::move(raw_metadata)) -#else - MetadataStore::Metadata::IGNORED() -#endif - ); - } catch (const QueryException& e) { - throw json::Exception("Container " + id + - " disappeared before we could inspect it"); - } -} - -std::vector - DockerReader::MetadataQuery() const { - if (config_.VerboseLogging()) { - LOG(INFO) << "Docker Query called"; - } - const std::string zone = environment_.InstanceZone(); - const std::string docker_endpoint(config_.DockerEndpointHost() + - "v" + config_.DockerApiVersion() + - kDockerEndpointPath); - const std::string container_filter( - config_.DockerContainerFilter().empty() - ? "" : "&" + config_.DockerContainerFilter()); - std::vector result; - try { - json::value parsed_list = QueryDocker( - std::string(kDockerEndpointPath) + "/json?all=true" + container_filter); - Timestamp collected_at = std::chrono::system_clock::now(); - if (config_.VerboseLogging()) { - LOG(INFO) << "Parsed list: " << *parsed_list; - } - const json::Array* container_list = parsed_list->As(); - for (const json::value& raw_container : *container_list) { - try { - const json::Object* container = raw_container->As(); - result.emplace_back(GetContainerMetadata(container, collected_at)); - } catch (const json::Exception& e) { - LOG(ERROR) << e.what(); - continue; - } - } - } catch (const json::Exception& e) { - LOG(ERROR) << e.what(); - } catch (const QueryException& e) { - // Already logged. - } - return result; -} - -json::value DockerReader::QueryDocker(const std::string& path) const - throw(QueryException, json::Exception) { - const std::string endpoint(config_.DockerEndpointHost() + - "v" + config_.DockerApiVersion() + - path); - http::local_client client; - http::local_client::request request(endpoint); - if (config_.VerboseLogging()) { - LOG(INFO) << "QueryDocker: Contacting " << endpoint; - } - try { - http::local_client::response response = client.get(request); - if (status(response) >= 400 && status(response) <= 403) { - throw NonRetriableError( - format::Substitute("Server responded with '{{message}}' ({{code}})", - {{"message", status_message(response)}, - {"code", format::str(status(response))}})); - } else if (status(response) >= 300) { - throw boost::system::system_error( - boost::system::errc::make_error_code(boost::system::errc::not_connected), - format::Substitute("Server responded with '{{message}}' ({{code}})", - {{"message", status_message(response)}, - {"code", format::str(status(response))}})); - } -#ifdef VERBOSE - LOG(DEBUG) << "QueryDocker: Response: " << body(response); -#endif - return json::Parser::FromString(body(response)); - } catch (const boost::system::system_error& e) { - LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what(); - throw QueryException(endpoint + " -> " + e.what()); - } -} - -void DockerUpdater::ValidateDynamicConfiguration() const - throw(ConfigurationValidationError) { - PollingMetadataUpdater::ValidateDynamicConfiguration(); - reader_.ValidateDynamicConfiguration(); -} - -} diff --git a/src/docker.h b/src/docker.h deleted file mode 100644 index 0177ed0e..00000000 --- a/src/docker.h +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2017 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -#ifndef DOCKER_H_ -#define DOCKER_H_ - -#include - -#include "environment.h" -#include "updater.h" - -namespace google { - -// Configuration object. -class Configuration; - -// Storage for the metadata mapping. -class MetadataStore; - -class DockerReader { - public: - DockerReader(const Configuration& config); - // A Docker metadata query function. - std::vector MetadataQuery() const; - - // Validates the relevant dynamic configuration and throws if it's incorrect. - void ValidateDynamicConfiguration() const - throw(MetadataUpdater::ConfigurationValidationError); - - private: - // A representation of all query-related errors. - class QueryException { - public: - QueryException(const std::string& what) : explanation_(what) {} - const std::string& what() const { return explanation_; } - private: - std::string explanation_; - }; - class NonRetriableError; - - // Issues a Docker API query at a given path and returns a parsed - // JSON response. The path has to start with "/". - json::value QueryDocker(const std::string& path) const - throw(QueryException, json::Exception); - - // Given a container object, return the associated metadata. - MetadataUpdater::ResourceMetadata GetContainerMetadata( - const json::Object* container, Timestamp collected_at) const - throw(json::Exception); - - const Configuration& config_; - Environment environment_; -}; - -class DockerUpdater : public PollingMetadataUpdater { - public: - DockerUpdater(const Configuration& config, MetadataStore* store) - : reader_(config), PollingMetadataUpdater( - config, store, "DockerUpdater", - config.DockerUpdaterIntervalSeconds(), - [=]() { return reader_.MetadataQuery(); }) { } - - protected: - void ValidateDynamicConfiguration() const throw(ConfigurationValidationError); - - private: - DockerReader reader_; -}; - -} - -#endif // DOCKER_H_ diff --git a/src/instance.cc b/src/instance.cc index 1ed377ca..e3fc4bce 100644 --- a/src/instance.cc +++ b/src/instance.cc @@ -55,6 +55,7 @@ std::vector result.emplace_back( std::vector{"", environment_.InstanceId()}, instance_resource, + /*full_resource_name=*/"", // TODO: Send actual instance metadata. MetadataStore::Metadata::IGNORED() ); diff --git a/src/kubernetes.cc b/src/kubernetes.cc index e634e871..31271cdc 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -50,7 +51,6 @@ constexpr const int kKubernetesValidationRetryDelaySeconds = 1; #if 0 constexpr const char kKubernetesEndpointHost[] = "https://kubernetes.default.svc"; #endif -constexpr const char kKubernetesApiVersion[] = "1.6"; constexpr const char kKubernetesEndpointPath[] = "/api/v1"; constexpr const char kGkeContainerResourcePrefix[] = "gke_container"; @@ -67,6 +67,13 @@ constexpr const char kDockerIdPrefix[] = "docker://"; constexpr const char kServiceAccountDirectory[] = "/var/run/secrets/kubernetes.io/serviceaccount"; +constexpr const char kKubernetesSchemaNameFormat[] = + "//container.googleapis.com/resourceTypes/{{type}}/versions/{{version}}"; + +constexpr const char kKubernetesClusterFullNameFormat[] = + "//container.googleapis.com/projects/{{project_id}}/{{location_type}}/" + "{{location}}/clusters/{{cluster_name}}"; + // Returns the full path to the secret filename. std::string SecretPath(const std::string& secret) { return std::string(kServiceAccountDirectory) + "/" + secret; @@ -100,6 +107,83 @@ class KubernetesReader::NonRetriableError NonRetriableError(const std::string& what) : QueryException(what) {} }; +const std::pair TypeAndVersion( + const std::string api_version, const std::string kind) { + const int group_name_len = api_version.find('/'); + if (group_name_len == std::string::npos) { + return std::make_pair("io.k8s." + kind, api_version); + } else { + std::vector slash_split; + boost::algorithm::split( + slash_split, api_version, boost::algorithm::is_any_of("/")); + const std::string group_name = slash_split[0]; + const std::string version = slash_split[1]; + return std::make_pair("io.k8s." + group_name + "." + kind, version); + } +} + +const std::string KubernetesReader::FullResourceName( + const std::string& self_link) const { + std::vector slash_split; + boost::algorithm::split( + slash_split, self_link, boost::algorithm::is_any_of("/")); + + std::vector link_components; + if(slash_split[1] == "api") { + // Core resources, start with "/api//..." + link_components.assign(slash_split.begin() + 3, slash_split.end()); + } else { + // Non-core resources, start with "/apis///..." + const std::string group_name = slash_split[2]; + link_components.push_back(group_name); + link_components.insert(link_components.end(), + slash_split.begin() + 4, slash_split.end()); + + } + const std::string relative_link = + boost::algorithm::join(link_components, "/"); + const std::string cluster_full_name = ClusterFullName(); + return cluster_full_name + "/k8s/" + relative_link; +} + +MetadataUpdater::ResourceMetadata KubernetesReader::GetResourceMetadata( + const json::Object* resource, Timestamp collected_at, bool is_deleted) const + throw(json::Exception) { + const std::string cluster_location = environment_.KubernetesClusterLocation(); + + const std::string kind = resource->Get("kind"); + const std::string api_version = resource->Get("apiVersion"); + const json::Object* metadata = resource->Get("metadata"); + const std::string self_link = metadata->Get("selfLink"); + const std::pair type_and_version = + TypeAndVersion(api_version, kind); + const std::string type = type_and_version.first; + const std::string version = type_and_version.second; + + const std::string schema = + format::Substitute(std::string(kKubernetesSchemaNameFormat), + {{"type", type}, {"version", version}}); + const std::string resource_full_name = FullResourceName(self_link); + + if (config_.VerboseLogging()) { + LOG(INFO) << "Raw resource metadata for full name: " << resource_full_name + << ": " << *resource; + } + + const MonitoredResource dummy_mr("", {}); + return MetadataUpdater::ResourceMetadata( + std::vector{}, dummy_mr, + resource_full_name, +#ifdef ENABLE_KUBERNETES_METADATA + MetadataStore::Metadata(type, cluster_location, version, + schema, is_deleted, collected_at, + resource->Clone()) +#else + MetadataStore::Metadata::IGNORED() +#endif + ); +} + KubernetesReader::KubernetesReader(const Configuration& config, HealthChecker* health_checker) : config_(config), environment_(config), health_checker_(health_checker) {} @@ -108,149 +192,69 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetNodeMetadata( const json::Object* node, Timestamp collected_at, bool is_deleted) const throw(json::Exception) { const std::string cluster_name = environment_.KubernetesClusterName(); - const std::string location = environment_.KubernetesClusterLocation(); + const std::string cluster_location = environment_.KubernetesClusterLocation(); const json::Object* metadata = node->Get("metadata"); const std::string node_name = metadata->Get("name"); - const std::string created_str = - metadata->Get("creationTimestamp"); - Timestamp created_at = time::rfc3339::FromString(created_str); + const std::string node_version = node->Get("apiVersion"); + const std::string node_type = "io.k8s.Node"; + const std::string node_schema = + format::Substitute(std::string(kKubernetesSchemaNameFormat), + {{"type", node_type}, {"version", node_version}}); const MonitoredResource k8s_node("k8s_node", { {"cluster_name", cluster_name}, {"node_name", node_name}, - {"location", location}, + {"location", cluster_location}, }); - json::value instance_resource = - InstanceReader::InstanceResource(environment_).ToJSON(); - - json::value node_raw_metadata = json::object({ - {"blobs", json::object({ - {"association", json::object({ - {"version", json::string(config_.MetadataIngestionRawContentVersion())}, - {"raw", json::object({ - {"infrastructureResource", std::move(instance_resource)}, - })}, - })}, - {"api", json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", node->Clone()}, - })}, - })}, - }); if (config_.VerboseLogging()) { - LOG(INFO) << "Raw node metadata: " << *node_raw_metadata; + LOG(INFO) << "Raw node metadata: " << *node; } const std::string k8s_node_name = boost::algorithm::join( std::vector{kK8sNodeResourcePrefix, node_name}, config_.MetadataApiResourceTypeSeparator()); + const std::string node_full_name = + ClusterFullName() + "/k8s/nodes/" + node_name; return MetadataUpdater::ResourceMetadata( std::vector{k8s_node_name}, - k8s_node, + k8s_node, node_full_name, #ifdef ENABLE_KUBERNETES_METADATA - MetadataStore::Metadata(config_.MetadataIngestionRawContentVersion(), - is_deleted, created_at, collected_at, - std::move(node_raw_metadata)) + MetadataStore::Metadata(node_type, cluster_location, node_version, + node_schema, is_deleted, collected_at, + node->Clone()) #else MetadataStore::Metadata::IGNORED() #endif ); } -json::value KubernetesReader::ComputePodAssociations(const json::Object* pod) - const throw(json::Exception) { - const json::Object* metadata = pod->Get("metadata"); - const std::string namespace_name = metadata->Get("namespace"); - - json::value instance_resource = - InstanceReader::InstanceResource(environment_).ToJSON(); - - std::unique_ptr raw_associations(new json::Object({ - {"infrastructureResource", std::move(instance_resource)}, - })); - - try { - const json::value top_level = FindTopLevelController( - namespace_name, pod->Clone()); - const json::Object* top_level_controller = top_level->As(); - const json::Object* top_level_metadata = - top_level_controller->Get("metadata"); - const std::string top_level_name = - top_level_metadata->Get("name"); - const std::string pod_id = metadata->Get("uid"); - if (!top_level_controller->Has("kind") && - top_level_metadata->Get("uid") != pod_id) { - LOG(ERROR) << "Internal error; top-level controller without 'kind' " - << *top_level_controller - << " not the same as pod " << *pod; - } - const std::string top_level_kind = - top_level_controller->Has("kind") - ? top_level_controller->Get("kind") - : "Pod"; - - raw_associations->emplace(std::make_pair( - "controllers", - json::object({ - {"topLevelControllerType", json::string(top_level_kind)}, - {"topLevelControllerName", json::string(top_level_name)}, - }) - )); - } catch (const QueryException& e) { - LOG(ERROR) << "Error while finding top-level controller for " - << namespace_name << "." << metadata->Get("name") - << ": " << e.what(); - } - - const json::Object* spec = pod->Get("spec"); - if (spec->Has("nodeName")) { - // Pods that have been scheduled will have a nodeName. - raw_associations->emplace(std::make_pair( - "nodeName", - json::string(spec->Get("nodeName")) - )); - } - - return json::object({ - {"version", json::string(config_.MetadataIngestionRawContentVersion())}, - {"raw", json::value(std::move(raw_associations))}, - }); -} - MetadataUpdater::ResourceMetadata KubernetesReader::GetPodMetadata( - const json::Object* pod, json::value associations, Timestamp collected_at, - bool is_deleted) const throw(json::Exception) { + const json::Object* pod, Timestamp collected_at, bool is_deleted) const + throw(json::Exception) { const std::string cluster_name = environment_.KubernetesClusterName(); - const std::string location = environment_.KubernetesClusterLocation(); + const std::string cluster_location = environment_.KubernetesClusterLocation(); const json::Object* metadata = pod->Get("metadata"); const std::string namespace_name = metadata->Get("namespace"); const std::string pod_name = metadata->Get("name"); const std::string pod_id = metadata->Get("uid"); - const std::string created_str = - metadata->Get("creationTimestamp"); - Timestamp created_at = time::rfc3339::FromString(created_str); + const std::string pod_version = pod->Get("apiVersion"); + const std::string pod_type = "io.k8s.Pod"; + const std::string pod_schema = + format::Substitute(std::string(kKubernetesSchemaNameFormat), + {{"type", pod_type}, {"version", pod_version}}); const MonitoredResource k8s_pod("k8s_pod", { {"cluster_name", cluster_name}, {"namespace_name", namespace_name}, {"pod_name", pod_name}, - {"location", location}, + {"location", cluster_location}, }); - json::value pod_raw_metadata = json::object({ - {"blobs", json::object({ - {"association", std::move(associations)}, - {"api", json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", pod->Clone()}, - })}, - })}, - }); if (config_.VerboseLogging()) { - LOG(INFO) << "Raw pod metadata: " << *pod_raw_metadata; + LOG(INFO) << "Raw pod metadata: " << *pod; } const std::string k8s_pod_id = boost::algorithm::join( @@ -259,13 +263,16 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetPodMetadata( const std::string k8s_pod_name = boost::algorithm::join( std::vector{kK8sPodResourcePrefix, namespace_name, pod_name}, config_.MetadataApiResourceTypeSeparator()); + const std::string pod_full_name = + ClusterFullName() + "/k8s/namespaces/" + namespace_name + "/pods/" + + pod_name; return MetadataUpdater::ResourceMetadata( std::vector{k8s_pod_id, k8s_pod_name}, - k8s_pod, + k8s_pod, pod_full_name, #ifdef ENABLE_KUBERNETES_METADATA - MetadataStore::Metadata(config_.MetadataIngestionRawContentVersion(), - is_deleted, created_at, collected_at, - std::move(pod_raw_metadata)) + MetadataStore::Metadata(pod_type, cluster_location, pod_version, + pod_schema, is_deleted, collected_at, + pod->Clone()) #else MetadataStore::Metadata::IGNORED() #endif @@ -274,8 +281,8 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetPodMetadata( MetadataUpdater::ResourceMetadata KubernetesReader::GetContainerMetadata( const json::Object* pod, const json::Object* container_spec, - const json::Object* container_status, json::value associations, - Timestamp collected_at, bool is_deleted) const throw(json::Exception) { + const json::Object* container_status, Timestamp collected_at, + bool is_deleted) const throw(json::Exception) { const std::string cluster_name = environment_.KubernetesClusterName(); const std::string location = environment_.KubernetesClusterLocation(); @@ -283,16 +290,6 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetContainerMetadata( const std::string namespace_name = metadata->Get("namespace"); const std::string pod_name = metadata->Get("name"); const std::string pod_id = metadata->Get("uid"); - const std::string created_str = - metadata->Get("creationTimestamp"); - Timestamp created_at = time::rfc3339::FromString(created_str); - const json::Object* labels; - if (!metadata->Has("labels")) { - labels = nullptr; - } else { - labels = metadata->Get("labels"); - } - const std::string container_name = container_spec->Get("name"); const MonitoredResource k8s_container("k8s_container", { @@ -303,38 +300,6 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetContainerMetadata( {"location", location}, }); - std::unique_ptr blobs(new json::Object({ - {"association", std::move(associations)}, - {"spec", json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", container_spec->Clone()}, - })}, - })); - if (container_status) { - blobs->emplace(std::make_pair( - "status", - json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", container_status->Clone()}, - }) - )); - } - if (labels) { - blobs->emplace(std::make_pair( - "labels", - json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", labels->Clone()}, - }) - )); - } - json::value container_raw_metadata = json::object({ - {"blobs", json::value(std::move(blobs))}, - }); - if (config_.VerboseLogging()) { - LOG(INFO) << "Raw container metadata: " << *container_raw_metadata; - } - const std::string k8s_container_pod = boost::algorithm::join( std::vector{kK8sContainerResourcePrefix, pod_id, container_name}, config_.MetadataApiResourceTypeSeparator()); @@ -370,13 +335,8 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetContainerMetadata( return MetadataUpdater::ResourceMetadata( std::move(local_resource_ids), k8s_container, -#ifdef ENABLE_KUBERNETES_METADATA - MetadataStore::Metadata(kKubernetesApiVersion, - is_deleted, created_at, collected_at, - std::move(container_raw_metadata)) -#else + /*full_resource_name=*/"", MetadataStore::Metadata::IGNORED() -#endif ); } @@ -410,6 +370,7 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetLegacyResource( return MetadataUpdater::ResourceMetadata( std::vector{gke_container_pod_id, gke_container_name}, gke_container, + /*full_resource_name=*/"", MetadataStore::Metadata::IGNORED()); } @@ -419,8 +380,6 @@ KubernetesReader::GetPodAndContainerMetadata( throw(json::Exception) { std::vector result; - json::value associations = ComputePodAssociations(pod); - const json::Object* metadata = pod->Get("metadata"); const std::string pod_name = metadata->Get("name"); const std::string pod_id = metadata->Get("uid"); @@ -466,83 +425,13 @@ KubernetesReader::GetPodAndContainerMetadata( result.emplace_back(GetLegacyResource(pod, name)); result.emplace_back( GetContainerMetadata(pod, container_spec, container_status, - associations->Clone(), collected_at, is_deleted)); + collected_at, is_deleted)); } - result.emplace_back( - GetPodMetadata(pod, std::move(associations), collected_at, is_deleted)); + result.emplace_back(GetPodMetadata(pod, collected_at, is_deleted)); return std::move(result); } -std::vector KubernetesReader::GetServiceList( - const std::string& cluster_name, const std::string& location) const - throw(json::Exception) { - std::lock_guard lock(service_mutex_); - std::vector service_list; - for (const auto& metadata_it : service_to_metadata_) { - // A service key consists of a namespace name and a service name. - const ServiceKey& service_key = metadata_it.first; - const std::string namespace_name = service_key.first; - const json::value& service_metadata = metadata_it.second; - auto pods_it = service_to_pods_.find(service_key); - const std::vector& pod_names = - (pods_it != service_to_pods_.end()) ? pods_it->second : kNoPods; - std::vector pod_resources; - for (const std::string& pod_name : pod_names) { - const MonitoredResource k8s_pod("k8s_pod", { - {"cluster_name", cluster_name}, - {"namespace_name", namespace_name}, - {"pod_name", pod_name}, - {"location", location}, - }); - pod_resources.emplace_back(k8s_pod.ToJSON()); - } - service_list.emplace_back(json::object({ - {"api", json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", service_metadata->Clone()}, - {"pods", json::array(std::move(pod_resources))}, - })}, - })); - } - return service_list; -} - -MetadataUpdater::ResourceMetadata KubernetesReader::GetClusterMetadata( - Timestamp collected_at) const throw(json::Exception) { - const std::string cluster_name = environment_.KubernetesClusterName(); - const std::string location = environment_.KubernetesClusterLocation(); - std::vector service_list = - GetServiceList(cluster_name, location); - const MonitoredResource k8s_cluster("k8s_cluster", { - {"cluster_name", cluster_name}, - {"location", location}, - }); - json::value cluster_raw_metadata = json::object({ - {"blobs", json::object({ - {"services", json::array(std::move(service_list))}, - })}, - }); - if (config_.VerboseLogging()) { - LOG(INFO) << "Raw cluster metadata: " << *cluster_raw_metadata; - } - - // There is no created_at for the cluster since the metadata contains - // ALL current services. - Timestamp created_at = time_point(); - return MetadataUpdater::ResourceMetadata( - std::vector{}, - k8s_cluster, -#ifdef ENABLE_KUBERNETES_METADATA - MetadataStore::Metadata(config_.MetadataIngestionRawContentVersion(), - /*is_deleted=*/false, created_at, collected_at, - std::move(cluster_raw_metadata)) -#else - MetadataStore::Metadata::IGNORED() -#endif - ); -} - std::vector KubernetesReader::MetadataQuery() const { if (config_.VerboseLogging()) { @@ -777,6 +666,20 @@ void WatchEventCallback( } } +const std::string KubernetesReader::ClusterFullName() const { + const std::string project_id = environment_.NumericProjectId(); + const std::string cluster_name = environment_.KubernetesClusterName(); + const std::string location = environment_.KubernetesClusterLocation(); + int num_dashes = std::count(location.begin(), location.end(), '-'); + const std::string location_type = num_dashes == 2 ? "zones": "locations"; + return format::Substitute( + std::string(kKubernetesClusterFullNameFormat), + {{"project_id", project_id}, + {"location_type", location_type}, + {"location", location}, + {"cluster_name", cluster_name}}); +} + void KubernetesReader::WatchMaster( const std::string& name, const std::string& path, @@ -916,172 +819,6 @@ std::pair KubernetesReader::KindPath( return {query_path, name_it->second}; } -json::value KubernetesReader::GetOwner( - const std::string& ns, const json::Object* owner_ref) const - throw(QueryException, json::Exception) { -#ifdef VERBOSE - LOG(DEBUG) << "GetOwner(" << ns << ", " << *owner_ref << ")"; -#endif - const std::string api_version = owner_ref->Get("apiVersion"); - const std::string kind = owner_ref->Get("kind"); - const std::string name = owner_ref->Get("name"); - const std::string uid = owner_ref->Get("uid"); - - // Even though we query by name, we should look up the owner by uid, - // to handle the case when an object is deleted and re-constructed. - const std::string encoded_ref = boost::algorithm::join( - std::vector{api_version, kind, uid}, "/"); - - std::lock_guard lock(mutex_); - auto owner_it = owners_.find(encoded_ref); - if (owner_it == owners_.end()) { // Not found, add new. - const auto path_component = KindPath(api_version, kind); -#ifdef VERBOSE - LOG(DEBUG) << "KindPath returned {" << path_component.first << ", " - << path_component.second << "}"; -#endif - json::value owner = - QueryMaster(path_component.first + "/namespaces/" + ns + "/" + - path_component.second + "/" + name); - // Sanity check: because we are looking up by name, the object we get - // back might have a different uid. - const json::Object* owner_obj = owner->As(); - const json::Object* metadata = owner_obj->Get("metadata"); - const std::string owner_uid = metadata->Get("uid"); - if (owner_uid != uid) { - LOG(WARNING) << "Owner " << kind << "'" << name << "' (id " << uid - << ") disappeared before we could query it. Found id " - << owner_uid << " instead."; - throw QueryException("Owner " + kind + " " + name + " (id " + uid + - ") disappeared"); - } - auto inserted = owners_.emplace(encoded_ref, std::move(owner)); - owner_it = inserted.first; - } - return owner_it->second->Clone(); -} - -json::value KubernetesReader::FindTopLevelController( - const std::string& ns, json::value object) const - throw(QueryException, json::Exception) { - const json::Object* obj = object->As(); -#ifdef VERBOSE - LOG(DEBUG) << "Looking for the top-level owner for " << *obj; -#endif - const json::Object* metadata = obj->Get("metadata"); -#ifdef VERBOSE - LOG(DEBUG) << "FindTopLevelController: metadata is " << *metadata; -#endif - if (!metadata->Has("ownerReferences")) { -#ifdef VERBOSE - LOG(DEBUG) << "FindTopLevelController: no owner references in " - << *metadata; -#endif - return object; - } - const json::Array* refs = metadata->Get("ownerReferences"); -#ifdef VERBOSE - LOG(DEBUG) << "FindTopLevelController: refs is " << *refs; -#endif - - // Kubernetes objects are supposed to have at most one controller: - // https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#objectmeta-v1-meta. - const json::Object* controller_ref = nullptr; - for (const json::value& ref : *refs) { - const json::Object* ref_obj = ref->As(); - if (ref_obj->Has("controller") && - ref_obj->Get("controller")) { - controller_ref = ref_obj; - break; - } - } - if (!controller_ref) { -#ifdef VERBOSE - LOG(DEBUG) << "FindTopLevelController: no controller references in " - << *refs; -#endif - return object; - } -#ifdef VERBOSE - LOG(DEBUG) << "FindTopLevelController: controller_ref is " << *controller_ref; -#endif - return FindTopLevelController(ns, GetOwner(ns, controller_ref)); -} - -void KubernetesReader::UpdateServiceToMetadataCache( - const json::Object* service, bool is_deleted) throw(json::Exception) { -#ifdef VERBOSE - LOG(DEBUG) << "UpdateServiceToMetadataCache(" << *service << ")"; -#endif - const json::Object* metadata = service->Get("metadata"); - const std::string namespace_name = metadata->Get("namespace"); - const std::string service_name = metadata->Get("name"); - const ServiceKey service_key(namespace_name, service_name); - - std::lock_guard lock(service_mutex_); - if (is_deleted) { - service_to_metadata_.erase(service_key); - } else { - auto it_inserted = - service_to_metadata_.emplace(service_key, json::value()); - it_inserted.first->second = service->Clone(); - } -} - -void KubernetesReader::UpdateServiceToPodsCache( - const json::Object* endpoints, bool is_deleted) throw(json::Exception) { -#ifdef VERBOSE - LOG(DEBUG) << "UpdateServiceToPodsCache(" << *endpoints << ")"; -#endif - const json::Object* metadata = endpoints->Get("metadata"); - const std::string namespace_name = metadata->Get("namespace"); - // Endpoints name is the same as the matching service name. - const std::string service_name = metadata->Get("name"); - const ServiceKey service_key(namespace_name, service_name); - - std::vector pod_names; - // Only extract the pod names when this is not a deletion. In the case of - // a deletion, we delete the mapping below. - if (!is_deleted && endpoints->Has("subsets") && - !endpoints->at("subsets")->Is()) { - const json::Array* subsets = endpoints->Get("subsets"); - for (const json::value& subset : *subsets) { - const json::Object* subset_obj = subset->As(); - if (!subset_obj->Has("addresses")) { - continue; - } - const json::Array* addresses = subset_obj->Get("addresses"); - for (const json::value& address : *addresses) { - const json::Object* address_obj = address->As(); - if (!address_obj->Has("targetRef")) { - continue; - } - const json::Object* ref = address_obj->Get("targetRef"); - if (!(ref->Has("kind") && ref->Has("name"))) { - continue; - } - const std::string target_kind = ref->Get("kind"); - if (target_kind != "Pod") { - LOG(INFO) << "Found a resource other than a pod in Endpoint " - << service_name << "'s targetRef: " << target_kind; - continue; - } - const std::string pod_name = ref->Get("name"); - pod_names.push_back(pod_name); - } - } - } - - std::lock_guard lock(service_mutex_); - if (is_deleted) { - service_to_pods_.erase(service_key); - } else { - auto it_inserted = - service_to_pods_.emplace(service_key, std::vector()); - it_inserted.first->second = pod_names; - } -} - void KubernetesReader::ValidateDynamicConfiguration() const throw(MetadataUpdater::ConfigurationValidationError) { try { @@ -1128,6 +865,41 @@ void KubernetesReader::ValidateDynamicConfiguration() const } } +void KubernetesReader::ResourceCallback( + MetadataUpdater::UpdateCallback callback, + const json::Object* resource, Timestamp collected_at, bool is_deleted) const + throw(json::Exception) { + std::vector result_vector; + result_vector.emplace_back( + GetResourceMetadata(resource, collected_at, is_deleted)); + callback(std::move(result_vector)); +} + +void KubernetesReader::WatchResources( + const std::string& api_path, const std::string& name, + MetadataUpdater::UpdateCallback callback) const { + LOG(INFO) << "Watch thread (" << name << ") started"; + + try { + // TODO: There seems to be a Kubernetes API bug with watch=true. + WatchMaster( + name, api_path, + [=](const json::Object* resource, Timestamp collected_at, + bool is_deleted) { + ResourceCallback(callback, resource, collected_at, is_deleted); + }); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + LOG(ERROR) << "No more " << name << " metadata will be collected"; + } catch (const KubernetesReader::QueryException& e) { + LOG(ERROR) << "No more " << name << " metadata will be collected"; + } + if (health_checker_) { + health_checker_->SetUnhealthy("kubernetes_node_thread"); + } + LOG(INFO) << "Watch thread (" << name << ") exiting"; +} + void KubernetesReader::PodCallback( MetadataUpdater::UpdateCallback callback, const json::Object* pod, Timestamp collected_at, bool is_deleted) const @@ -1203,73 +975,6 @@ void KubernetesReader::WatchNodes( LOG(INFO) << "Watch thread (node) exiting"; } -void KubernetesReader::ServiceCallback( - MetadataUpdater::UpdateCallback callback, - const json::Object* service, Timestamp collected_at, bool is_deleted) - throw(json::Exception) { - UpdateServiceToMetadataCache(service, is_deleted); - - // TODO: using a temporary did not work here. - std::vector result_vector; - result_vector.emplace_back(GetClusterMetadata(collected_at)); - callback(std::move(result_vector)); -} - -void KubernetesReader::WatchServices(MetadataUpdater::UpdateCallback callback) { - LOG(INFO) << "Watch thread started for services"; - - try { - WatchMaster( - "Service", - std::string(kKubernetesEndpointPath) + "/watch/services/", - [=](const json::Object* service, Timestamp collected_at, - bool is_deleted) { - ServiceCallback(callback, service, collected_at, is_deleted); - }); - } catch (const json::Exception& e) { - LOG(ERROR) << e.what(); - LOG(ERROR) << "No more service metadata will be collected"; - } catch (const KubernetesReader::QueryException& e) { - LOG(ERROR) << "No more service metadata will be collected"; - } - health_checker_->SetUnhealthy("kubernetes_service_thread"); - LOG(INFO) << "Watch thread (service) exiting"; -} - -void KubernetesReader::EndpointsCallback( - MetadataUpdater::UpdateCallback callback, - const json::Object* endpoints, Timestamp collected_at, bool is_deleted) - throw(json::Exception) { - UpdateServiceToPodsCache(endpoints, is_deleted); - - // TODO: using a temporary did not work here. - std::vector result_vector; - result_vector.emplace_back(GetClusterMetadata(collected_at)); - callback(std::move(result_vector)); -} - -void KubernetesReader::WatchEndpoints( - MetadataUpdater::UpdateCallback callback) { - LOG(INFO) << "Watch thread started for endpoints"; - - try { - WatchMaster( - "Endpoints", - std::string(kKubernetesEndpointPath) + "/watch/endpoints/", - [=](const json::Object* endpoints, Timestamp collected_at, - bool is_deleted) { - EndpointsCallback(callback, endpoints, collected_at, is_deleted); - }); - } catch (const json::Exception& e) { - LOG(ERROR) << e.what(); - LOG(ERROR) << "No more endpoints metadata will be collected"; - } catch (const KubernetesReader::QueryException& e) { - LOG(ERROR) << "No more endpoints metadata will be collected"; - } - health_checker_->SetUnhealthy("kubernetes_endpoints_thread"); - LOG(INFO) << "Watch thread (endpoints) exiting"; -} - KubernetesUpdater::KubernetesUpdater(const Configuration& config, HealthChecker* health_checker, MetadataStore* store) @@ -1319,10 +1024,10 @@ void KubernetesUpdater::StartUpdater() { if (config().KubernetesClusterLevelMetadata() && config().KubernetesServiceMetadata()) { service_watch_thread_ = std::thread([=]() { - reader_.WatchServices(cb); + reader_.WatchResources("/api/v1/services", "Service", cb); }); endpoints_watch_thread_ = std::thread([=]() { - reader_.WatchEndpoints(cb); + reader_.WatchResources("/api/v1/endpoints", "Endpoints", cb); }); } } else { diff --git a/src/kubernetes.h b/src/kubernetes.h index 2e3d50be..c7774e42 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -55,11 +55,9 @@ class KubernetesReader { void WatchPods(const std::string& node_name, MetadataUpdater::UpdateCallback callback) const; - // Service watcher. - void WatchServices(MetadataUpdater::UpdateCallback callback); - - // Endpoints watcher. - void WatchEndpoints(MetadataUpdater::UpdateCallback callback); + // Generic Kubernetes resource watcher. + void WatchResources(const std::string& api_path, const std::string& name, + MetadataUpdater::UpdateCallback callback) const; // Gets the name of the node the agent is running on. // Returns an empty string if unable to find the current node. @@ -77,11 +75,18 @@ class KubernetesReader { }; class NonRetriableError; + // Computes the full resource name given the self link. + const std::string FullResourceName(const std::string& self_link) const; + // Issues a Kubernetes master API query at a given path and // returns a parsed JSON response. The path has to start with "/". json::value QueryMaster(const std::string& path) const throw(QueryException, json::Exception); + // Builds the cluster full name by reading in cluster related environment + // variables. + const std::string ClusterFullName() const; + // Issues a Kubernetes master API query at a given path and // watches for parsed JSON responses. The path has to start with "/". // Invokes callback for every notification. @@ -102,32 +107,28 @@ class KubernetesReader { MetadataUpdater::UpdateCallback callback, const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception); - // Service watch callback. - void ServiceCallback( - MetadataUpdater::UpdateCallback callback, const json::Object* service, - Timestamp collected_at, bool is_deleted) throw(json::Exception); - - // Endpoints watch callback. - void EndpointsCallback( - MetadataUpdater::UpdateCallback callback, const json::Object* endpoints, - Timestamp collected_at, bool is_deleted) throw(json::Exception); + // Kubernetes resource watch callback. + void ResourceCallback( + MetadataUpdater::UpdateCallback callback, const json::Object* resource, + Timestamp collected_at, bool is_deleted) const throw(json::Exception); - // Compute the associations for a given pod. - json::value ComputePodAssociations(const json::Object* pod) const - throw(json::Exception); + // Given a generic kubernetes object, return the associated metadata. + MetadataUpdater::ResourceMetadata GetResourceMetadata( + const json::Object* resource, Timestamp collected_at, bool is_deleted) + const throw(json::Exception); // Given a node object, return the associated metadata. MetadataUpdater::ResourceMetadata GetNodeMetadata( const json::Object* node, Timestamp collected_at, bool is_deleted) const throw(json::Exception); // Given a pod object, return the associated metadata. MetadataUpdater::ResourceMetadata GetPodMetadata( - const json::Object* pod, json::value associations, Timestamp collected_at, - bool is_deleted) const throw(json::Exception); + const json::Object* pod, Timestamp collected_at, bool is_deleted) const + throw(json::Exception); // Given a pod object and container info, return the container metadata. MetadataUpdater::ResourceMetadata GetContainerMetadata( const json::Object* pod, const json::Object* container_spec, - const json::Object* container_status, json::value associations, - Timestamp collected_at, bool is_deleted) const throw(json::Exception); + const json::Object* container_status, Timestamp collected_at, + bool is_deleted) const throw(json::Exception); // Given a pod object and container name, return the legacy resource. // The returned "metadata" field will be Metadata::IGNORED. MetadataUpdater::ResourceMetadata GetLegacyResource( @@ -137,14 +138,6 @@ class KubernetesReader { std::vector GetPodAndContainerMetadata( const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception); - // Get a list of service metadata based on the service level caches. - std::vector GetServiceList( - const std::string& cluster_name, const std::string& location) - const throw(json::Exception); - // Return the cluster metadata based on the cached values for - // service_to_metadta_ and service_to_pods_. - MetadataUpdater::ResourceMetadata GetClusterMetadata(Timestamp collected_at) - const throw(json::Exception); // Gets the Kubernetes master API token. // Returns an empty string if unable to find the token. @@ -158,24 +151,6 @@ class KubernetesReader { std::pair KindPath(const std::string& version, const std::string& kind) const; - // Follows the owner reference to get the corresponding object. - json::value GetOwner(const std::string& ns, const json::Object* owner_ref) - const throw(QueryException, json::Exception); - - // For a given object, returns the top-level controller object. - json::value FindTopLevelController(const std::string& ns, json::value object) - const throw(QueryException, json::Exception); - - // Update service_to_metadata_ cache based on a newly updated service. - void UpdateServiceToMetadataCache( - const json::Object* service, bool is_deleted) throw(json::Exception); - - // Update service_to_pods_ cache based on a newly updated endpoints. The - // Endpoints resource provides a mapping from a single service to its pods: - // https://kubernetes.io/docs/concepts/services-networking/service/ - void UpdateServiceToPodsCache( - const json::Object* endpoints, bool is_deleted) throw(json::Exception); - // An empty vector value for endpoints that have no pods. const std::vector kNoPods; @@ -187,20 +162,6 @@ class KubernetesReader { // A memoized map from version to a map from kind to name. mutable std::map> version_to_kind_to_name_; - // A memoized map from an encoded owner reference to the owner object. - mutable std::map owners_; - - // ServiceKey is a pair of the namespace name and the service name that - // uniquely identifies a service in a cluster. - using ServiceKey = std::pair; - // Mutex for the service related caches. - mutable std::mutex service_mutex_; - // Map from service key to service metadata. This map is built based on the - // response from WatchServices. - mutable std::map service_to_metadata_; - // Map from service key to names of pods in the service. This map is built - // based on the response from WatchEndpoints. - mutable std::map> service_to_pods_; const Configuration& config_; HealthChecker* health_checker_; diff --git a/src/metadatad.cc b/src/metadatad.cc index 7363d2f5..2b1a68e4 100644 --- a/src/metadatad.cc +++ b/src/metadatad.cc @@ -21,7 +21,6 @@ #include "agent.h" #include "configuration.h" -#include "docker.h" #include "instance.h" #include "kubernetes.h" #include "time.h" @@ -78,16 +77,14 @@ int main(int ac, char** av) { google::MetadataAgent server(config); google::InstanceUpdater instance_updater(config, server.mutable_store()); - google::DockerUpdater docker_updater(config, server.mutable_store()); google::KubernetesUpdater kubernetes_updater(config, server.health_checker(), server.mutable_store()); google::cleanup_state = new google::CleanupState( - {&instance_updater, &docker_updater, &kubernetes_updater}, + {&instance_updater, &kubernetes_updater}, &server); std::signal(SIGTERM, handle_sigterm); instance_updater.Start(); - docker_updater.Start(); kubernetes_updater.Start(); server.Start(); diff --git a/src/reporter.cc b/src/reporter.cc index a0072d69..4993ea6f 100644 --- a/src/reporter.cc +++ b/src/reporter.cc @@ -29,6 +29,13 @@ namespace http = boost::network::http; namespace google { +constexpr const char kMultipartBoundary[] = "publishMultipartPost"; +constexpr const char kPublishPathFormat[] = + "/v1beta3/projects/{{project_id}}/resourceMetadata:publish"; + +constexpr const char kGcpLocationFormat[] = + "//cloud.google.com/locations/{{location_type}}/{{location}}"; + MetadataReporter::MetadataReporter(const Configuration& config, MetadataStore* store, double period_s) : store_(store), @@ -69,50 +76,112 @@ void MetadataReporter::ReportMetadata() { } namespace { - void SendMetadataRequest(std::vector&& entries, - const std::string& endpoint, + const std::string& batch_uri, + const std::string& publish_path, const std::string& auth_header, const std::string& user_agent, bool verbose_logging) throw (boost::system::system_error) { - json::value update_metadata_request = json::object({ - {"entries", json::array(std::move(entries))}, - }); - if (verbose_logging) { - LOG(INFO) << "About to send request: POST " << endpoint - << " User-Agent: " << user_agent - << " " << *update_metadata_request; + if (entries.size() == 1) { + // Add a copy of the single entry, except for the `views` field. This allows + // us to sent a request to the batch endpoint when we have a single request. + const json::Object* single_request = entries[0]->As(); + json::value duplicate_request = json::object({ // ResourceMetadata + {"name", json::string(single_request->Get("name"))}, + {"type", json::string(single_request->Get("type"))}, + {"location", json::string(single_request->Get("location"))}, + {"state", json::string(single_request->Get("state"))}, + {"eventTime", + json::string(single_request->Get("eventTime"))}, + }); + entries.emplace_back(std::move(duplicate_request)); } - + const std::string content_type = + std::string("multipart/mixed; boundary=") + kMultipartBoundary; http::client client; - http::client::request request(endpoint); - std::string request_body = update_metadata_request->ToString(); + http::client::request request(batch_uri); request << boost::network::header("User-Agent", user_agent); - request << boost::network::header("Content-Length", - std::to_string(request_body.size())); - request << boost::network::header("Content-Type", "application/json"); + request << boost::network::header("Content-Type", content_type); request << boost::network::header("Authorization", auth_header); - request << boost::network::body(request_body); + request << boost::network::header("Expect", "100-continue"); + std::ostringstream out; + out << std::endl; + + for (json::value& entry : entries) { + const json::Object* single_request = entry->As(); + std::string request_body = single_request->ToString(); + out << "--" << kMultipartBoundary << std::endl; + out << "Content-Type: application/http" << std::endl; + out << "Content-Transfer-Encoding: binary" << std::endl; + out << "Content-ID: " << single_request->Get("name") + << std::endl; + out << std::endl; + out << "POST " << publish_path << std::endl; + out << "Content-Type: application/json; charset=UTF-8" << std::endl; + out << "Content-Length: " << std::to_string(request_body.size()) + << std::endl; + out << std::endl << request_body << std::endl; + } + out << "--" << kMultipartBoundary << "--" << std::endl; + std::string multipart_body = out.str(); + const int total_length = multipart_body.size(); + + request << boost::network::header("Content-Length", + std::to_string(total_length)); + request << boost::network::body(multipart_body); + + if (verbose_logging) { + LOG(INFO) << "About to send request: POST " << batch_uri; + LOG(INFO) << "Headers:"; + http::client::request::headers_container_type head = (headers(request)); + for (auto it = head.begin(); it != head.end(); ++it) { + if (it->first == "Authorization") { + continue; + } + LOG(INFO) << it->first << ": " << it->second; + } + LOG(INFO) << "Body:" << std::endl << body(request); + } + http::client::response response = client.post(request); if (status(response) >= 300) { throw boost::system::system_error( - boost::system::errc::make_error_code(boost::system::errc::not_connected), + boost::system::errc::make_error_code( + boost::system::errc::not_connected), format::Substitute("Server responded with '{{message}}' ({{code}})", {{"message", status_message(response)}, {"code", format::str(status(response))}})); } if (verbose_logging) { - LOG(INFO) << "Server responded with " << body(response); + LOG(INFO) << format::Substitute( + "Server responded with '{{message}}' ({{code}})", + {{"message", status_message(response)}, + {"code", format::str(status(response))}}); + LOG(INFO) << "Headers:"; + http::client::response::headers_container_type head = (headers(response)); + for (auto it = head.begin(); it != head.end(); ++it) { + LOG(INFO) << it->first << ": " << it->second; + } + LOG(INFO) << "Body:" << std::endl << body(response); } // TODO: process response. } } +const std::string MetadataReporter::FullyQualifiedResourceLocation( + const std::string location) const { + int num_dashes = std::count(location.begin(), location.end(), '-'); + const std::string location_type = num_dashes == 2 ? "zones" : "regions"; + return format::Substitute( + std::string(kGcpLocationFormat), + {{"location_type", location_type}, {"location", location}}); +} + void MetadataReporter::SendMetadata( - std::map&& metadata) + std::map&& metadata) throw (boost::system::system_error) { if (metadata.empty()) { if (config_.VerboseLogging()) { @@ -125,11 +194,9 @@ void MetadataReporter::SendMetadata( LOG(INFO) << "Sending request to the server"; } const std::string project_id = environment_.NumericProjectId(); - // The endpoint template is expected to be of the form - // "https://stackdriver.googleapis.com/.../projects/{{project_id}}/...". - const std::string endpoint = - format::Substitute(config_.MetadataIngestionEndpointFormat(), - {{"project_id", project_id}}); + const std::string& batch_uri = config_.MetadataIngestionEndpointFormat(); + const std::string& publish_path = + format::Substitute(kPublishPathFormat, {{"project_id", project_id}}); const std::string auth_header = auth_.GetAuthHeaderValue(); const std::string user_agent = config_.MetadataReporterUserAgent(); @@ -144,20 +211,29 @@ void MetadataReporter::SendMetadata( std::vector entries; for (auto& entry : metadata) { - const MonitoredResource& resource = entry.first; + const std::string& full_resource_name = entry.first; MetadataStore::Metadata& metadata = entry.second; - if (metadata.ignore) { - continue; - } + const std::string resource_location = + FullyQualifiedResourceLocation(metadata.location); json::value metadata_entry = - json::object({ // MonitoredResourceMetadata - {"resource", resource.ToJSON()}, - {"rawContentVersion", json::string(metadata.version)}, - {"rawContent", std::move(metadata.metadata)}, - {"state", json::string(metadata.is_deleted ? "DELETED" : "ACTIVE")}, - {"createTime", json::string(time::rfc3339::ToString(metadata.created_at))}, - {"collectTime", json::string(time::rfc3339::ToString(metadata.collected_at))}, - }); + json::object({ + {"name", json::string(full_resource_name)}, + {"type", json::string(metadata.type)}, + {"location", json::string(resource_location)}, + {"state", json::string(metadata.is_deleted ? "DELETED" : "EXISTS")}, + {"eventTime", json::string( + time::rfc3339::ToString(metadata.collected_at))}, + {"views", + json::object({ + {metadata.version, + json::object({ + {"schemaName", json::string(metadata.schema_name)}, + {"stringContent", json::string(metadata.metadata->ToString())}, + }) + } + }) + } + }); // TODO: This is probably all kinds of inefficient... const int size = metadata_entry->ToString().size(); if (empty_size + size > limit_bytes) { @@ -167,8 +243,8 @@ void MetadataReporter::SendMetadata( continue; } if (entries.size() == limit_count || total_size + size > limit_bytes) { - SendMetadataRequest(std::move(entries), endpoint, auth_header, user_agent, - config_.VerboseLogging()); + SendMetadataRequest(std::move(entries), batch_uri, publish_path, + auth_header, user_agent, config_.VerboseLogging()); entries.clear(); total_size = empty_size; } @@ -176,8 +252,8 @@ void MetadataReporter::SendMetadata( total_size += size; } if (!entries.empty()) { - SendMetadataRequest(std::move(entries), endpoint, auth_header, user_agent, - config_.VerboseLogging()); + SendMetadataRequest(std::move(entries), batch_uri, publish_path, + auth_header, user_agent, config_.VerboseLogging()); } } diff --git a/src/reporter.h b/src/reporter.h index 6a5b8e42..089caa79 100644 --- a/src/reporter.h +++ b/src/reporter.h @@ -46,9 +46,13 @@ class MetadataReporter { // Send the given set of metadata. void SendMetadata( - std::map&& metadata) + std::map&& metadata) throw (boost::system::system_error); + // Compute the fully qualified name of a GCP location. + const std::string FullyQualifiedResourceLocation(const std::string location) + const; + const Configuration& config_; MetadataStore* store_; Environment environment_; diff --git a/src/sample_agent_config.yaml b/src/sample_agent_config.yaml index 2a095a82..4febc96a 100644 --- a/src/sample_agent_config.yaml +++ b/src/sample_agent_config.yaml @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -MetadataIngestionEndpointFormat: "https://staging-stackdriver.sandbox.googleapis.com/v1beta2/projects/{{project_id}}/resourceMetadata:batchUpdate" +MetadataIngestionEndpointFormat: "https://staging-stackdriver.sandbox.googleapis.com/batch/resourceMetadata" +KubernetesUseWatch: true #CredentialsFile: /tmp/token.json #ProjectId: "1234567890" #InstanceZone: "us-central1-a" diff --git a/src/store.cc b/src/store.cc index adfe8899..76315c0c 100644 --- a/src/store.cc +++ b/src/store.cc @@ -47,14 +47,23 @@ void MetadataStore::UpdateResource(const std::vector& resource_ids, } } -void MetadataStore::UpdateMetadata(const MonitoredResource& resource, +void MetadataStore::UpdateMetadata(const std::string& full_resource_name, Metadata&& entry) { + if (full_resource_name.empty() || entry.ignore) { + if (config_.VerboseLogging()) { + LOG(INFO) << "Dropping metadata entry " << full_resource_name << "->{" + << "ignore: " << entry.ignore; + } + return; + } std::lock_guard lock(metadata_mu_); if (config_.VerboseLogging()) { - LOG(INFO) << "Updating metadata map " << resource << "->{" + LOG(INFO) << "Updating metadata map " << full_resource_name << "->{" + << "type: " << entry.type << ", " + << "location: " << entry.location << ", " << "version: " << entry.version << ", " + << "schema name: " << entry.schema_name << ", " << "is_deleted: " << entry.is_deleted << ", " - << "created_at: " << time::rfc3339::ToString(entry.created_at) << ", " << "collected_at: " << time::rfc3339::ToString(entry.collected_at) << ", " << "metadata: " << *entry.metadata << ", " @@ -63,19 +72,19 @@ void MetadataStore::UpdateMetadata(const MonitoredResource& resource, } // Force value update. The repeated search is inefficient, but shouldn't // be a huge deal. - metadata_map_.erase(resource); - metadata_map_.emplace(resource, std::move(entry)); + metadata_map_.erase(full_resource_name); + metadata_map_.emplace(full_resource_name, std::move(entry)); } -std::map +std::map MetadataStore::GetMetadataMap() const { std::lock_guard lock(metadata_mu_); - std::map result; + std::map result; for (const auto& kv : metadata_map_) { - const MonitoredResource& resource = kv.first; + const std::string& full_resource_name = kv.first; const Metadata& metadata = kv.second; - result.emplace(resource, metadata.Clone()); + result.emplace(full_resource_name, metadata.Clone()); } return result; } @@ -84,15 +93,16 @@ void MetadataStore::PurgeDeletedEntries() { std::lock_guard lock(metadata_mu_); for (auto it = metadata_map_.begin(); it != metadata_map_.end(); ) { - const MonitoredResource& resource = it->first; + const std::string& full_resource_name = it->first; const Metadata& entry = it->second; if (entry.is_deleted) { if (config_.VerboseLogging()) { - LOG(INFO) << "Purging metadata entry " << resource << "->{" + LOG(INFO) << "Purging metadata entry " << full_resource_name << "->{" + << "type: " << entry.type << ", " + << "location: " << entry.location << ", " << "version: " << entry.version << ", " + << "schema name: " << entry.schema_name << ", " << "is_deleted: " << entry.is_deleted << ", " - << "created_at: " << time::rfc3339::ToString(entry.created_at) - << ", " << "collected_at: " << time::rfc3339::ToString(entry.collected_at) << ", " << "metadata: " << *entry.metadata << ", " diff --git a/src/store.h b/src/store.h index 5b1c25ce..bb572f55 100644 --- a/src/store.h +++ b/src/store.h @@ -37,46 +37,54 @@ using Timestamp = time_point; class MetadataStore { public: struct Metadata { - Metadata(const std::string& version_, + Metadata(const std::string& type_, + const std::string& location_, + const std::string& version_, + const std::string& schema_name_, bool is_deleted_, - const Timestamp& created_at_, const Timestamp& collected_at_, json::value metadata_) - : version(version_), is_deleted(is_deleted_), created_at(created_at_), - collected_at(collected_at_), metadata(std::move(metadata_)), - ignore(false) {} + : type(type_), location(location_), version(version_), + schema_name(schema_name_), is_deleted(is_deleted_), + collected_at(collected_at_), + metadata(std::move(metadata_)), ignore(false) {} Metadata(Metadata&& other) - : version(other.version), is_deleted(other.is_deleted), - created_at(other.created_at), collected_at(other.collected_at), - metadata(std::move(other.metadata)), ignore(other.ignore) {} + : type(other.type), location(other.location), version(other.version), + schema_name(other.schema_name), is_deleted(other.is_deleted), + collected_at(other.collected_at), + metadata(std::move(other.metadata)), ignore(other.ignore) {} Metadata Clone() const { if (ignore) { return IGNORED(); } - return {version, is_deleted, created_at, collected_at, metadata->Clone()}; + return {type, location, version, schema_name, is_deleted, + collected_at, metadata->Clone()}; } static Metadata IGNORED(); + const std::string type; + const std::string location; const std::string version; + const std::string schema_name; const bool is_deleted; - const Timestamp created_at; const Timestamp collected_at; json::value metadata; const bool ignore; private: Metadata() - : version(), is_deleted(false), created_at(), collected_at(), - metadata(json::object({})), ignore(true) {} + : type(), location(), version(), schema_name(), is_deleted(false), + collected_at(), metadata(json::object({})), + ignore(true) {} }; MetadataStore(const Configuration& config); - // Returns a copy of the mapping from a monitored resource to the metadata + // Returns a copy of the mapping from a full resource name to the metadata // associated with that resource. - std::map GetMetadataMap() const; + std::map GetMetadataMap() const; // Looks up the local resource map entry for a given resource id. // Throws an exception if the resource is not found. @@ -90,9 +98,9 @@ class MetadataStore { const MonitoredResource& resource); // Updates metadata for a given resource. - // Adds a metadata mapping from the `resource` to the metadata `entry`. - void UpdateMetadata(const MonitoredResource& resource, - Metadata&& entry); + // Adds a metadata mapping from the `full_resource_name` to the metadata + // `entry`. If the `full_resource_name` is blank, the entry is dropped. + void UpdateMetadata(const std::string& full_resource_name, Metadata&& entry); private: friend class MetadataReporter; @@ -108,8 +116,8 @@ class MetadataStore { std::map resource_map_; // A lock that guards access to the metadata map. mutable std::mutex metadata_mu_; - // A map from MonitoredResource to (JSON) resource metadata. - std::map metadata_map_; + // A map from Full Resource Name to (JSON) resource metadata. + std::map metadata_map_; }; } diff --git a/src/updater.h b/src/updater.h index 3d21f86a..bef681eb 100644 --- a/src/updater.h +++ b/src/updater.h @@ -37,21 +37,27 @@ class MetadataUpdater { struct ResourceMetadata { ResourceMetadata(const std::vector& ids, const MonitoredResource& resource, + const std::string& full_resource_name, MetadataStore::Metadata&& metadata) - : ids_(ids), resource_(resource), metadata_(std::move(metadata)) {} + : ids_(ids), resource_(resource), + full_resource_name_(full_resource_name), + metadata_(std::move(metadata)) {} ResourceMetadata(ResourceMetadata&& other) : ResourceMetadata(other.ids_, other.resource_, + other.full_resource_name_, std::move(other.metadata_)) {} const MetadataStore::Metadata& metadata() const { return metadata_; } const MonitoredResource& resource() const { return resource_; } const std::vector& ids() const { return ids_; } + const std::string& full_resource_name() const { return full_resource_name_; } private: friend class MetadataUpdater; // Needs write access to metadata_. std::vector ids_; MonitoredResource resource_; + std::string full_resource_name_; MetadataStore::Metadata metadata_; }; @@ -114,7 +120,8 @@ class MetadataUpdater { // Updates the metadata in the store. Consumes result. void UpdateMetadataCallback(ResourceMetadata&& result) { - store_->UpdateMetadata(result.resource_, std::move(result.metadata_)); + store_->UpdateMetadata( + result.full_resource_name_, std::move(result.metadata_)); } const std::string& name() const { diff --git a/test/configuration_unittest.cc b/test/configuration_unittest.cc index e755cb36..8d107cec 100644 --- a/test/configuration_unittest.cc +++ b/test/configuration_unittest.cc @@ -14,8 +14,7 @@ void VerifyDefaultConfig(const Configuration& config) { EXPECT_EQ(false, config.MetadataReporterPurgeDeleted()); EXPECT_THAT(config.MetadataReporterUserAgent(), ::testing::StartsWith("metadata-agent/")); - EXPECT_EQ("https://stackdriver.googleapis.com/" - "v1beta2/projects/{{project_id}}/resourceMetadata:batchUpdate", + EXPECT_EQ("https://stackdriver.googleapis.com/batch/resourceMetadata", config.MetadataIngestionEndpointFormat()); EXPECT_EQ(8*1024*1024, config.MetadataIngestionRequestSizeLimitBytes()); EXPECT_EQ(1000, config.MetadataIngestionRequestSizeLimitCount()); diff --git a/test/kubernetes_unittest.cc b/test/kubernetes_unittest.cc index cc373dc1..951f1ae6 100644 --- a/test/kubernetes_unittest.cc +++ b/test/kubernetes_unittest.cc @@ -17,20 +17,18 @@ class KubernetesTest : public ::testing::Test { static MetadataUpdater::ResourceMetadata GetPodMetadata( const KubernetesReader& reader, const json::Object* pod, - json::value associations, Timestamp collected_at, bool is_deleted) + Timestamp collected_at, bool is_deleted) throw(json::Exception) { - return reader.GetPodMetadata( - pod, std::move(associations), collected_at, is_deleted); + return reader.GetPodMetadata(pod, collected_at, is_deleted); } static MetadataUpdater::ResourceMetadata GetContainerMetadata( const KubernetesReader& reader, const json::Object* pod, const json::Object* container_spec, const json::Object* container_status, - json::value associations, Timestamp collected_at, bool is_deleted) + Timestamp collected_at, bool is_deleted) throw(json::Exception) { return reader.GetContainerMetadata(pod, container_spec, container_status, - std::move(associations), collected_at, - is_deleted); + collected_at, is_deleted); } static std::vector GetPodAndContainerMetadata( @@ -44,48 +42,19 @@ class KubernetesTest : public ::testing::Test { const std::string& container_name) throw(json::Exception) { return reader.GetLegacyResource(pod, container_name); } - - static json::value ComputePodAssociations(const KubernetesReader& reader, - const json::Object* pod) { - return reader.ComputePodAssociations(pod); - } - - static void UpdateOwnersCache(KubernetesReader* reader, const std::string& key, - const json::value& value) { - reader->owners_[key] = value->Clone(); - } - - static MetadataUpdater::ResourceMetadata GetClusterMetadata( - const KubernetesReader& reader, Timestamp collected_at) - throw(json::Exception) { - return reader.GetClusterMetadata(collected_at); - } - - static void UpdateServiceToMetadataCache( - KubernetesReader* reader, const json::Object* service, bool is_deleted) - throw(json::Exception) { - return reader->UpdateServiceToMetadataCache(service, is_deleted); - } - - static void UpdateServiceToPodsCache( - KubernetesReader* reader, const json::Object* endpoints, bool is_deleted) - throw(json::Exception) { - return reader->UpdateServiceToPodsCache(endpoints, is_deleted); - } }; TEST_F(KubernetesTest, GetNodeMetadata) { Configuration config(std::istringstream( + "ProjectId: TestProjectId\n" "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataIngestionRawContentVersion: TestVersion\n" - "InstanceResourceType: gce_instance\n" "InstanceZone: TestZone\n" - "InstanceId: TestID\n" )); Environment environment(config); KubernetesReader reader(config, nullptr); // Don't need HealthChecker. json::value node = json::object({ + {"apiVersion", json::string("NodeVersion")}, {"metadata", json::object({ {"name", json::string("testname")}, {"creationTimestamp", json::string("2018-03-03T01:23:45.678901234Z")}, @@ -100,104 +69,81 @@ TEST_F(KubernetesTest, GetNodeMetadata) { {"node_name", "testname"}, {"location", "TestClusterLocation"}, }), m.resource()); - EXPECT_EQ("TestVersion", m.metadata().version); + EXPECT_EQ( + "//container.googleapis.com/projects/TestProjectId/locations/" + "TestClusterLocation/clusters/TestClusterName/k8s/nodes/testname", + m.full_resource_name()); + EXPECT_EQ("io.k8s.Node", m.metadata().type); + EXPECT_EQ("TestClusterLocation", m.metadata().location); + EXPECT_EQ("NodeVersion", m.metadata().version); + EXPECT_EQ( + "//container.googleapis.com/resourceTypes/io.k8s.Node/versions/" + "NodeVersion", + m.metadata().schema_name); EXPECT_FALSE(m.metadata().is_deleted); - EXPECT_EQ(time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), - m.metadata().created_at); EXPECT_EQ(Timestamp(), m.metadata().collected_at); - json::value big = json::object({ - {"blobs", json::object({ - {"association", json::object({ - {"version", json::string("TestVersion")}, - {"raw", json::object({ - {"infrastructureResource", json::object({ - {"type", json::string("gce_instance")}, - {"labels", json::object({ - {"instance_id", json::string("TestID")}, - {"zone", json::string("TestZone")}, - })}, - })}, - })}, - })}, - {"api", json::object({ - {"version", json::string("1.6")}, // Hard-coded in kubernetes.cc. - {"raw", std::move(node)}, - })}, - })}, - }); - EXPECT_EQ(big->ToString(), m.metadata().metadata->ToString()); + EXPECT_EQ(node->ToString(), m.metadata().metadata->ToString()); } -TEST_F(KubernetesTest, ComputePodAssociations) { +TEST_F(KubernetesTest, GetPodMetadata) { Configuration config(std::stringstream( + "ProjectId: TestProjectId\n" "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataIngestionRawContentVersion: TestVersion\n" "InstanceZone: TestZone\n" - "InstanceId: TestID\n" )); Environment environment(config); KubernetesReader reader(config, nullptr); // Don't need HealthChecker. - json::value controller = json::object({ - {"controller", json::boolean(true)}, - {"apiVersion", json::string("1.2.3")}, - {"kind", json::string("TestKind")}, - {"name", json::string("TestName")}, - {"uid", json::string("TestUID1")}, - {"metadata", json::object({ - {"name", json::string("InnerTestName")}, - {"kind", json::string("InnerTestKind")}, - {"uid", json::string("InnerTestUID1")}, - })}, - }); - UpdateOwnersCache(&reader, "1.2.3/TestKind/TestUID1", controller); + json::value pod = json::object({ + {"apiVersion", json::string("PodVersion")}, {"metadata", json::object({ {"namespace", json::string("TestNamespace")}, - {"uid", json::string("TestUID0")}, - {"ownerReferences", json::array({ - json::object({{"no_controller", json::boolean(true)}}), - std::move(controller), - })}, - })}, - {"spec", json::object({ - {"nodeName", json::string("TestSpecNodeName")}, + {"name", json::string("TestName")}, + {"uid", json::string("TestUid")}, + {"creationTimestamp", json::string("2018-03-03T01:23:45.678901234Z")}, })}, }); + const auto m = GetPodMetadata(reader, pod->As(), + Timestamp(), false); - json::value expected_associations = json::object({ - {"raw", json::object({ - {"controllers", json::object({ - {"topLevelControllerName", json::string("InnerTestName")}, - {"topLevelControllerType", json::string("TestKind")}, - })}, - {"infrastructureResource", json::object({ - {"labels", json::object({ - {"instance_id", json::string("TestID")}, - {"zone", json::string("TestZone")}, - })}, - {"type", json::string("gce_instance")}, - })}, - {"nodeName", json::string("TestSpecNodeName")}, - })}, - {"version", json::string("TestVersion")}, - }); - const auto associations = - ComputePodAssociations(reader, pod->As()); - EXPECT_EQ(expected_associations->ToString(), associations->ToString()); + EXPECT_EQ(std::vector( + {"k8s_pod.TestUid", "k8s_pod.TestNamespace.TestName"}), m.ids()); + EXPECT_EQ(MonitoredResource("k8s_pod", { + {"cluster_name", "TestClusterName"}, + {"pod_name", "TestName"}, + {"location", "TestClusterLocation"}, + {"namespace_name", "TestNamespace"}, + }), m.resource()); + EXPECT_EQ( + "//container.googleapis.com/projects/TestProjectId/locations/" + "TestClusterLocation/clusters/TestClusterName/k8s/namespaces/" + "TestNamespace/pods/TestName", + m.full_resource_name()); + EXPECT_EQ("io.k8s.Pod", m.metadata().type); + EXPECT_EQ("TestClusterLocation", m.metadata().location); + EXPECT_EQ("PodVersion", m.metadata().version); + EXPECT_EQ( + "//container.googleapis.com/resourceTypes/io.k8s.Pod/versions/PodVersion", + m.metadata().schema_name); + EXPECT_FALSE(m.metadata().is_deleted); + EXPECT_EQ(Timestamp(), m.metadata().collected_at); + EXPECT_FALSE(m.metadata().ignore); + EXPECT_EQ(pod->ToString(), m.metadata().metadata->ToString()); } -TEST_F(KubernetesTest, GetPodMetadata) { +TEST_F(KubernetesTest, GetPodMetadataUnscheduled) { Configuration config(std::stringstream( + "ProjectId: TestProjectId\n" "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataApiResourceTypeSeparator: \".\"\n" - "MetadataIngestionRawContentVersion: TestVersion\n" + "KubernetesClusterLevelMetadata: true\n" )); Environment environment(config); KubernetesReader reader(config, nullptr); // Don't need HealthChecker. json::value pod = json::object({ + {"apiVersion", json::string("PodVersion")}, {"metadata", json::object({ {"namespace", json::string("TestNamespace")}, {"name", json::string("TestName")}, @@ -206,41 +152,20 @@ TEST_F(KubernetesTest, GetPodMetadata) { })}, }); const auto m = GetPodMetadata(reader, pod->As(), - json::string("TestAssociations"), Timestamp(), - false); + Timestamp(), false); - EXPECT_EQ(std::vector( - {"k8s_pod.TestUid", "k8s_pod.TestNamespace.TestName"}), m.ids()); EXPECT_EQ(MonitoredResource("k8s_pod", { {"cluster_name", "TestClusterName"}, {"pod_name", "TestName"}, {"location", "TestClusterLocation"}, {"namespace_name", "TestNamespace"}, }), m.resource()); - EXPECT_EQ("TestVersion", m.metadata().version); - EXPECT_FALSE(m.metadata().is_deleted); - EXPECT_EQ(time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), - m.metadata().created_at); - EXPECT_EQ(Timestamp(), m.metadata().collected_at); - EXPECT_FALSE(m.metadata().ignore); - json::value expected_metadata = json::object({ - {"blobs", json::object({ - {"api", json::object({ - {"raw", json::object({ - {"metadata", json::object({ - {"creationTimestamp", - json::string("2018-03-03T01:23:45.678901234Z")}, - {"name", json::string("TestName")}, - {"namespace", json::string("TestNamespace")}, - {"uid", json::string("TestUid")}, - })}, - })}, - {"version", json::string("1.6")}, - })}, - {"association", json::string("TestAssociations")}, - })}, - }); - EXPECT_EQ(expected_metadata->ToString(), m.metadata().metadata->ToString()); + EXPECT_EQ( + "//container.googleapis.com/projects/TestProjectId/locations/" + "TestClusterLocation/clusters/TestClusterName/k8s/namespaces/" + "TestNamespace/pods/TestName", + m.full_resource_name()); + EXPECT_EQ("TestClusterLocation", m.metadata().location); } TEST_F(KubernetesTest, GetLegacyResource) { @@ -273,180 +198,14 @@ TEST_F(KubernetesTest, GetLegacyResource) { {"pod_id", "TestUid"}, {"zone", "TestZone"}, }), m.resource()); + EXPECT_EQ("", m.full_resource_name()); EXPECT_TRUE(m.metadata().ignore); } -TEST_F(KubernetesTest, GetClusterMetadataEmpty) { - Configuration config(std::istringstream( - "KubernetesClusterName: TestClusterName\n" - "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataIngestionRawContentVersion: TestVersion\n" - )); - Environment environment(config); - KubernetesReader reader(config, nullptr); // Don't need HealthChecker. - const auto m = GetClusterMetadata(reader, Timestamp()); - EXPECT_TRUE(m.ids().empty()); - EXPECT_EQ(MonitoredResource("k8s_cluster", { - {"cluster_name", "TestClusterName"}, - {"location", "TestClusterLocation"}, - }), m.resource()); - EXPECT_EQ("TestVersion", m.metadata().version); - EXPECT_FALSE(m.metadata().is_deleted); - EXPECT_EQ(Timestamp(), m.metadata().created_at); - EXPECT_EQ(Timestamp(), m.metadata().collected_at); - json::value empty_cluster = json::object({ - {"blobs", json::object({ - {"services", json::array({})}, - })}, - }); - EXPECT_EQ(empty_cluster->ToString(), m.metadata().metadata->ToString()); -} - -TEST_F(KubernetesTest, GetClusterMetadataEmptyService) { - Configuration config(std::istringstream( - "KubernetesClusterName: TestClusterName\n" - "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataIngestionRawContentVersion: TestVersion\n" - )); - Environment environment(config); - json::value service = json::object({ - {"metadata", json::object({ - {"name", json::string("testname")}, - {"namespace", json::string("testnamespace")}, - })}, - }); - KubernetesReader reader(config, nullptr); // Don't need HealthChecker. - UpdateServiceToMetadataCache( - &reader, service->As(), /*is_deleted=*/false); - const auto m = GetClusterMetadata(reader, Timestamp()); - EXPECT_TRUE(m.ids().empty()); - EXPECT_EQ(MonitoredResource("k8s_cluster", { - {"cluster_name", "TestClusterName"}, - {"location", "TestClusterLocation"}, - }), m.resource()); - EXPECT_EQ("TestVersion", m.metadata().version); - EXPECT_FALSE(m.metadata().is_deleted); - EXPECT_EQ(Timestamp(), m.metadata().created_at); - EXPECT_EQ(Timestamp(), m.metadata().collected_at); - json::value expected_cluster = json::object({ - {"blobs", json::object({ - {"services", json::array({ - json::object({ - {"api", json::object({ - {"pods", json::array({})}, - {"raw", std::move(service)}, - {"version", json::string("1.6")}, // Hard-coded in kubernetes.cc. - })}, - }), - })}, - })}, - }); - EXPECT_EQ(expected_cluster->ToString(), m.metadata().metadata->ToString()); -} - -TEST_F(KubernetesTest, GetClusterMetadataServiceWithPods) { - Configuration config(std::istringstream( - "KubernetesClusterName: TestClusterName\n" - "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataIngestionRawContentVersion: TestVersion\n" - )); - Environment environment(config); - json::value service = json::object({ - {"metadata", json::object({ - {"name", json::string("testname")}, - {"namespace", json::string("testnamespace")}, - })}, - }); - json::value endpoints = json::object({ - {"metadata", json::object({ - {"name", json::string("testname")}, - {"namespace", json::string("testnamespace")}, - })}, - {"subsets", json::array({ - json::object({ - {"addresses", json::array({ - json::object({ - {"targetRef", json::object({ - {"kind", json::string("Pod")}, - {"name", json::string("my-pod")}, - })}, - }), - })}, - }), - })}, - }); - KubernetesReader reader(config, nullptr); // Don't need HealthChecker. - UpdateServiceToMetadataCache( - &reader, service->As(), /*is_deleted=*/false); - UpdateServiceToPodsCache( - &reader, endpoints->As(), /*is_deleted=*/false); - const auto m = GetClusterMetadata(reader, Timestamp()); - EXPECT_TRUE(m.ids().empty()); - EXPECT_EQ(MonitoredResource("k8s_cluster", { - {"cluster_name", "TestClusterName"}, - {"location", "TestClusterLocation"}, - }), m.resource()); - EXPECT_EQ("TestVersion", m.metadata().version); - EXPECT_FALSE(m.metadata().is_deleted); - EXPECT_EQ(Timestamp(), m.metadata().created_at); - EXPECT_EQ(Timestamp(), m.metadata().collected_at); - MonitoredResource pod_mr = MonitoredResource("k8s_pod", { - {"cluster_name", "TestClusterName"}, - {"namespace_name", "testnamespace"}, - {"pod_name", "my-pod"}, - {"location", "TestClusterLocation"}, - }); - json::value expected_cluster = json::object({ - {"blobs", json::object({ - {"services", json::array({ - json::object({ - {"api", json::object({ - {"pods", json::array({ - pod_mr.ToJSON(), - })}, - {"raw", std::move(service)}, - {"version", json::string("1.6")}, // Hard-coded in kubernetes.cc. - })}, - }), - })}, - })}, - }); - EXPECT_EQ(expected_cluster->ToString(), m.metadata().metadata->ToString()); -} - -TEST_F(KubernetesTest, GetClusterMetadataDeletedService) { - Configuration config(std::istringstream( - "KubernetesClusterName: TestClusterName\n" - "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataIngestionRawContentVersion: TestVersion\n" - )); - Environment environment(config); - json::value service = json::object({ - {"metadata", json::object({ - {"name", json::string("testname")}, - {"namespace", json::string("testnamespace")}, - })}, - }); - KubernetesReader reader(config, nullptr); // Don't need HealthChecker. - UpdateServiceToMetadataCache( - &reader, service->As(), /*is_deleted=*/false); - UpdateServiceToMetadataCache( - &reader, service->As(), /*is_deleted=*/true); - const auto m = GetClusterMetadata(reader, Timestamp()); - EXPECT_TRUE(m.ids().empty()); - json::value empty_cluster = json::object({ - {"blobs", json::object({ - {"services", json::array({})}, - })}, - }); - EXPECT_EQ(empty_cluster->ToString(), m.metadata().metadata->ToString()); -} - TEST_F(KubernetesTest, GetContainerMetadata) { Configuration config(std::stringstream( "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataApiResourceTypeSeparator: \".\"\n" )); Environment environment(config); KubernetesReader reader(config, nullptr); // Don't need HealthChecker. @@ -468,7 +227,6 @@ TEST_F(KubernetesTest, GetContainerMetadata) { pod->As(), spec->As(), status->As(), - json::string("TestAssociations"), Timestamp(), /*is_deleted=*/false); @@ -484,63 +242,22 @@ TEST_F(KubernetesTest, GetContainerMetadata) { {"namespace_name", "TestNamespace"}, {"pod_name", "TestName"}, }), m.resource()); - EXPECT_EQ("1.6", m.metadata().version); - EXPECT_FALSE(m.metadata().is_deleted); - EXPECT_EQ(time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), - m.metadata().created_at); - EXPECT_EQ(Timestamp(), m.metadata().collected_at); - EXPECT_FALSE(m.metadata().ignore); - json::value expected_metadata = json::object({ - {"blobs", json::object({ - {"association", json::string("TestAssociations")}, - {"labels", json::object({ - {"raw", json::object({ - {"label", json::string("TestLabel")}, - })}, - {"version", json::string("1.6")}, - })}, - {"spec", json::object({ - {"raw", json::object({ - {"name", json::string("TestSpecName")}, - })}, - {"version", json::string("1.6")}, - })}, - {"status", json::object({ - {"raw", json::object({ - {"containerID", json::string("docker://TestContainerID")}, - })}, - {"version", json::string("1.6")}, - })}, - })}, - }); - EXPECT_EQ(expected_metadata->ToString(), m.metadata().metadata->ToString()); + EXPECT_EQ("", m.full_resource_name()); + EXPECT_TRUE(m.metadata().ignore); } + TEST_F(KubernetesTest, GetPodAndContainerMetadata) { Configuration config(std::stringstream( + "ProjectId: TestProjectId\n" "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataApiResourceTypeSeparator: \".\"\n" - "MetadataIngestionRawContentVersion: TestVersion\n" "InstanceZone: TestZone\n" "InstanceId: TestID\n" )); Environment environment(config); KubernetesReader reader(config, nullptr); // Don't need HealthChecker. - - json::value controller = json::object({ - {"controller", json::boolean(true)}, - {"apiVersion", json::string("1.2.3")}, - {"kind", json::string("TestKind")}, - {"name", json::string("TestName")}, - {"uid", json::string("TestUID1")}, - {"metadata", json::object({ - {"name", json::string("InnerTestName")}, - {"kind", json::string("InnerTestKind")}, - {"uid", json::string("InnerTestUID1")}, - })}, - }); - UpdateOwnersCache(&reader, "1.2.3/TestKind/TestUID1", controller); json::value pod = json::object({ + {"apiVersion", json::string("PodVersion")}, {"metadata", json::object({ {"name", json::string("TestPodName")}, {"namespace", json::string("TestNamespace")}, @@ -578,6 +295,7 @@ TEST_F(KubernetesTest, GetPodAndContainerMetadata) { {"pod_id", "TestPodUid"}, {"zone", "TestZone"} }), m[0].resource()); + EXPECT_EQ("", m[0].full_resource_name()); EXPECT_TRUE(m[0].metadata().ignore); EXPECT_EQ(std::vector({ @@ -591,46 +309,8 @@ TEST_F(KubernetesTest, GetPodAndContainerMetadata) { {"namespace_name", "TestNamespace"}, {"pod_name", "TestPodName"}, }), m[1].resource()); - EXPECT_FALSE(m[1].metadata().ignore); - EXPECT_EQ("1.6", m[1].metadata().version); - EXPECT_FALSE(m[1].metadata().is_deleted); - EXPECT_EQ(time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), - m[1].metadata().created_at); - EXPECT_EQ(Timestamp(), m[1].metadata().collected_at); - json::value container_metadata = json::object({ - {"blobs", json::object({ - {"association", json::object({ - {"raw", json::object({ - {"controllers", json::object({ - {"topLevelControllerName", json::string("TestPodName")}, - {"topLevelControllerType", json::string("Pod")}, - })}, - {"infrastructureResource", json::object({ - {"labels", json::object({ - {"instance_id", json::string("TestID")}, - {"zone", json::string("TestZone")}, - })}, - {"type", json::string("gce_instance")}, - })}, - {"nodeName", json::string("TestSpecNodeName")}, - })}, - {"version", json::string("TestVersion")}, - })}, - {"spec", json::object({ - {"raw", json::object({ - {"name", json::string("TestContainerName0")}, - })}, - {"version", json::string("1.6")}, - })}, - {"status", json::object({ - {"raw", json::object({ - {"name", json::string("TestContainerName0")}, - })}, - {"version", json::string("1.6")}, - })}, - })}, - }); - EXPECT_EQ(container_metadata->ToString(), m[1].metadata().metadata->ToString()); + EXPECT_EQ("", m[1].full_resource_name()); + EXPECT_TRUE(m[1].metadata().ignore); EXPECT_EQ(std::vector({ "k8s_pod.TestPodUid", @@ -642,58 +322,16 @@ TEST_F(KubernetesTest, GetPodAndContainerMetadata) { {"namespace_name", "TestNamespace"}, {"pod_name", "TestPodName"}, }), m[2].resource()); + EXPECT_EQ( + "//container.googleapis.com/projects/TestProjectId/locations/" + "TestClusterLocation/clusters/TestClusterName/k8s/namespaces/" + "TestNamespace/pods/TestPodName", + m[2].full_resource_name()); EXPECT_FALSE(m[2].metadata().ignore); - EXPECT_EQ("TestVersion", m[2].metadata().version); + EXPECT_EQ("PodVersion", m[2].metadata().version); EXPECT_FALSE(m[2].metadata().is_deleted); - EXPECT_EQ(time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), - m[2].metadata().created_at); EXPECT_EQ(Timestamp(), m[2].metadata().collected_at); - json::value pod_metadata = json::object({ - {"blobs", json::object({ - {"api", json::object({ - {"raw", json::object({ - {"metadata", json::object({ - {"creationTimestamp", - json::string("2018-03-03T01:23:45.678901234Z")}, - {"name", json::string("TestPodName")}, - {"namespace", json::string("TestNamespace")}, - {"uid", json::string("TestPodUid")}, - })}, - {"spec", json::object({ - {"containers", json::array({ - json::object({{"name", json::string("TestContainerName0")}}) - })}, - {"nodeName", json::string("TestSpecNodeName")}, - })}, - {"status", json::object({ - {"containerID", json::string("docker://TestContainerID")}, - {"containerStatuses", json::array({ - json::object({{"name", json::string("TestContainerName0")}}) - })}, - })}, - })}, - {"version", json::string("1.6")}, - })}, - {"association", json::object({ - {"raw", json::object({ - {"controllers", json::object({ - {"topLevelControllerName", json::string("TestPodName")}, - {"topLevelControllerType", json::string("Pod")}, - })}, - {"infrastructureResource", json::object({ - {"labels", json::object({ - {"instance_id", json::string("TestID")}, - {"zone", json::string("TestZone")}, - })}, - {"type", json::string("gce_instance")}, - })}, - {"nodeName", json::string("TestSpecNodeName")}, - })}, - {"version", json::string("TestVersion")}, - })}, - })}, - }); - EXPECT_EQ(pod_metadata->ToString(), + EXPECT_EQ(pod->ToString(), m[2].metadata().metadata->ToString()); } } // namespace google diff --git a/test/store_unittest.cc b/test/store_unittest.cc index c6917e2a..d203b4d6 100644 --- a/test/store_unittest.cc +++ b/test/store_unittest.cc @@ -90,106 +90,123 @@ TEST_F(MetadataStoreTest, UpdateResourceDoesNotUpdateMetadata) { } TEST_F(MetadataStoreTest, UpdateMetadataChangesMetadataMap) { - MonitoredResource resource("type", {}); + const std::string frn = "/type"; MetadataStore::Metadata m( + "default-type", + "default-location", "default-version", + "default-schema", false, std::chrono::system_clock::now(), - std::chrono::system_clock::now(), json::object({{"f", json::string("hello")}})); - store.UpdateMetadata(resource, std::move(m)); + store.UpdateMetadata(frn, std::move(m)); const auto metadata_map = store.GetMetadataMap(); EXPECT_EQ(1, metadata_map.size()); - EXPECT_EQ("default-version", metadata_map.at(resource).version); + EXPECT_EQ("default-type", metadata_map.at(frn).type); + EXPECT_EQ("default-location", metadata_map.at(frn).location); + EXPECT_EQ("default-version", metadata_map.at(frn).version); + EXPECT_EQ("default-schema", metadata_map.at(frn).schema_name); } TEST_F(MetadataStoreTest, MultipleUpdateMetadataChangesMetadataMap) { - MonitoredResource resource1("type1", {}); - MonitoredResource resource2("type2", {}); + const std::string frn1 = "/type1"; + const std::string frn2 = "/type2"; MetadataStore::Metadata m1( + "default-type1", + "default-location1", "default-version1", + "default-schema1", false, std::chrono::system_clock::now(), - std::chrono::system_clock::now(), json::object({{"f", json::string("hello")}})); MetadataStore::Metadata m2( + "default-type2", + "default-location2", "default-version2", + "default-schema2", false, std::chrono::system_clock::now(), - std::chrono::system_clock::now(), json::object({{"f", json::string("hello")}})); - store.UpdateMetadata(resource1, std::move(m1)); - store.UpdateMetadata(resource2, std::move(m2)); + store.UpdateMetadata(frn1, std::move(m1)); + store.UpdateMetadata(frn2, std::move(m2)); const auto metadata_map = store.GetMetadataMap(); EXPECT_EQ(2, metadata_map.size()); - EXPECT_EQ("default-version1", metadata_map.at(resource1).version); - EXPECT_EQ("default-version2", metadata_map.at(resource2).version); + EXPECT_EQ("default-version1", metadata_map.at(frn1).version); + EXPECT_EQ("default-version2", metadata_map.at(frn2).version); } TEST_F(MetadataStoreTest, UpdateMetadataForResourceChangesMetadataEntry) { - MonitoredResource resource("type", {}); + const std::string frn = "/type"; MetadataStore::Metadata m1( + "default-type1", + "default-location1", "default-version1", + "default-schema1", false, std::chrono::system_clock::now(), - std::chrono::system_clock::now(), json::object({{"f", json::string("hello")}})); - store.UpdateMetadata(resource, std::move(m1)); + store.UpdateMetadata(frn, std::move(m1)); const auto metadata_map_before = store.GetMetadataMap(); EXPECT_EQ(1, metadata_map_before.size()); - EXPECT_EQ("default-version1", metadata_map_before.at(resource).version); + EXPECT_EQ("default-version1", metadata_map_before.at(frn).version); MetadataStore::Metadata m2( + "default-type2", + "default-location2", "default-version2", + "default-schema2", false, std::chrono::system_clock::now(), - std::chrono::system_clock::now(), json::object({{"f", json::string("hello")}})); - store.UpdateMetadata(resource, std::move(m2)); + store.UpdateMetadata(frn, std::move(m2)); const auto metadata_map_after = store.GetMetadataMap(); EXPECT_EQ(1, metadata_map_after.size()); - EXPECT_EQ("default-version2", metadata_map_after.at(resource).version); + EXPECT_EQ("default-version2", metadata_map_after.at(frn).version); } TEST_F(MetadataStoreTest, PurgeDeletedEntriesDeletesCorrectMetadata) { - MonitoredResource resource1("type1", {}); - MonitoredResource resource2("type2", {}); + const std::string frn1 = "/type1"; + const std::string frn2 = "/type2"; MetadataStore::Metadata m1( + "default-type1", + "default-location1", "default-version1", + "default-schema1", false, std::chrono::system_clock::now(), - std::chrono::system_clock::now(), json::object({{"f", json::string("hello")}})); MetadataStore::Metadata m2( + "default-type2", + "default-location2", "default-version2", + "default-schema2", true, std::chrono::system_clock::now(), - std::chrono::system_clock::now(), json::object({{"f", json::string("hello")}})); - store.UpdateMetadata(resource1, std::move(m1)); - store.UpdateMetadata(resource2, std::move(m2)); + store.UpdateMetadata(frn1, std::move(m1)); + store.UpdateMetadata(frn2, std::move(m2)); const auto metadata_map_before = store.GetMetadataMap(); EXPECT_EQ(2, metadata_map_before.size()); - EXPECT_EQ("default-version1", metadata_map_before.at(resource1).version); - EXPECT_EQ("default-version2", metadata_map_before.at(resource2).version); + EXPECT_EQ("default-version1", metadata_map_before.at(frn1).version); + EXPECT_EQ("default-version2", metadata_map_before.at(frn2).version); PurgeDeletedEntries(); const auto metadata_map_after = store.GetMetadataMap(); EXPECT_EQ(1, metadata_map_after.size()); - EXPECT_EQ("default-version1", metadata_map_after.at(resource1).version); - EXPECT_THROW(metadata_map_after.at(resource2), std::out_of_range); + EXPECT_EQ("default-version1", metadata_map_after.at(frn1).version); + EXPECT_THROW(metadata_map_after.at(frn2), std::out_of_range); } TEST(MetadataTest, MetadataCorrectlyConstructed) { MetadataStore::Metadata m( + "default-type", + "default-location", "default-version", + "default-schema", false, - time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), time::rfc3339::FromString("2018-03-03T01:32:45.678901234Z"), json::object({{"f", json::string("hello")}})); EXPECT_FALSE(m.ignore); EXPECT_EQ("default-version", m.version); EXPECT_FALSE(m.is_deleted); - EXPECT_EQ(time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), - m.created_at); EXPECT_EQ(time::rfc3339::FromString("2018-03-03T01:32:45.678901234Z"), m.collected_at); EXPECT_EQ("{\"f\":\"hello\"}", m.metadata->ToString()); @@ -197,16 +214,17 @@ TEST(MetadataTest, MetadataCorrectlyConstructed) { TEST(MetadataTest, MetadataCorrectlyCloned) { MetadataStore::Metadata m( + "default-type", + "default-location", "default-version", + "default-schema", false, - time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), time::rfc3339::FromString("2018-03-03T01:32:45.678901234Z"), json::object({{"f", json::string("hello")}})); MetadataStore::Metadata m_clone = m.Clone(); EXPECT_FALSE(m_clone.ignore); EXPECT_EQ(m.version, m_clone.version); EXPECT_FALSE(m_clone.is_deleted); - EXPECT_EQ(m.created_at, m_clone.created_at); EXPECT_EQ(m.collected_at, m_clone.collected_at); EXPECT_EQ(m.metadata->ToString(), m_clone.metadata->ToString()); } diff --git a/test/updater_unittest.cc b/test/updater_unittest.cc index 2e729325..f32a93e3 100644 --- a/test/updater_unittest.cc +++ b/test/updater_unittest.cc @@ -184,27 +184,31 @@ TEST_F(ValidationOrderingTest, AllChecksPassedInvokesStartUpdater) { TEST_F(UpdaterTest, UpdateMetadataCallback) { MetadataStore::Metadata m( + "test-type", + "test-location", "test-version", + "test-schema-name", false, std::chrono::system_clock::now(), - std::chrono::system_clock::now(), json::object({{"f", json::string("test")}})); MonitoredResource resource("test_resource", {}); + const std::string frn = "/test"; MetadataUpdater::ResourceMetadata metadata( std::vector({"", "test-prefix"}), - resource, std::move(m)); + resource, frn, std::move(m)); PollingMetadataUpdater updater(config, &store, "Test", 60, nullptr); UpdateMetadataCallback(&updater, std::move(metadata)); const auto metadata_map = store.GetMetadataMap(); EXPECT_EQ(1, metadata_map.size()); - EXPECT_EQ("test-version", metadata_map.at(resource).version); - EXPECT_EQ("{\"f\":\"test\"}", metadata_map.at(resource).metadata->ToString()); + EXPECT_EQ("test-version", metadata_map.at(frn).version); + EXPECT_EQ("{\"f\":\"test\"}", metadata_map.at(frn).metadata->ToString()); } TEST_F(UpdaterTest, UpdateResourceCallback) { MetadataUpdater::ResourceMetadata metadata( std::vector({"", "test-prefix"}), MonitoredResource("test_resource", {}), + "/test", MetadataStore::Metadata::IGNORED() ); PollingMetadataUpdater updater(config, &store, "Test", 60, nullptr);