diff --git a/.chloggen/k8scluster-allocatable.yaml b/.chloggen/k8scluster-allocatable.yaml new file mode 100644 index 0000000000000..e6215bc8e1de9 --- /dev/null +++ b/.chloggen/k8scluster-allocatable.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/k8s_cluster + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Sync allocatable metric names with the latest semconv" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [40708] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/k8sclusterreceiver/README.md b/receiver/k8sclusterreceiver/README.md index 23c34c903b953..c213dc765bae1 100644 --- a/receiver/k8sclusterreceiver/README.md +++ b/receiver/k8sclusterreceiver/README.md @@ -93,6 +93,11 @@ Example: The full list of settings exposed for this receiver are documented in [config.go](./config.go) with detailed sample configurations in [testdata/config.yaml](./testdata/config.yaml). +**Note** that with the introduction of the [semconv.k8s.enableStable](#semconvk8senablestable) feature gate, the metrics for the allocatable resource types +(`k8s.node.allocatable.cpu`, `k8s.node.allocatable.ephemeral_storage`, `k8s.node.allocatable.memory`, `k8s.node.allocatable.pods`) are enabled/disabled via the metrics section, and are represented by up/down counters, rather than gauges. +To activate the feature flag, start the collector with `--feature-gates=+semconv.k8s.enableStable`. +To disable the old representation of the allocatable metrics (`k8s.node.allocatable_cpu`, `k8s.node.allocatable_ephemeral_storage`, `k8s.node.allocatable_memory`, `k8s.node.allocatable_pods`) disable the [semconv.k8s.disableLegacy](#semconvk8sdisablelegacy) feature flag with `--feature-gates=-semconv.k8s.disableLegacy` + ### k8s_leader_elector Provide name of the k8s leader elector extension defined in config. This allows multiple instances of k8s cluster receiver to be executed on a cluster. At a given time only the pod which has the is active. @@ -453,3 +458,28 @@ Add the following rules to your ClusterRole: - watch ``` +## Feature Gates + +### `semconv.k8s.enableStable` + +The `semconv.k8s.enableStable` [feature gate](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md#collector-feature-gates) enables the SemConv valid format of the node allocatable metrics reported by the receiver. +The feature gate is in `alpha` stage, which means it is disabled by default. + +If enabled, the SemConv valid format of the node allocatable metrics are reported (if enabled via the metrics section): + +- `k8s.node.allocatable.cpu` +- `k8s.node.allocatable.ephemeral_storage` +- `k8s.node.allocatable.memory` +- `k8s.node.allocatable.pods` + +### `semconv.k8s.disableLegacy` + +The `semconv.k8s.disableLegacy` [feature gate](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md#collector-feature-gates) disables the old, non-SemConv valid format of the node allocatable metrics reported by the receiver. +The feature gate is in `alpha` stage, which means it is disabled by default. + +If disabled, the old format of the node allocatable metrics are reported: + +- `k8s.node.allocatable_cpu` +- `k8s.node.allocatable_ephemeral_storage` +- `k8s.node.allocatable_memory` +- `k8s.node.allocatable_pods` diff --git a/receiver/k8sclusterreceiver/documentation.md b/receiver/k8sclusterreceiver/documentation.md index 2dbede4343bfc..a0127c19d87af 100644 --- a/receiver/k8sclusterreceiver/documentation.md +++ b/receiver/k8sclusterreceiver/documentation.md @@ -228,6 +228,38 @@ The current phase of namespaces (1 for active and 0 for terminating) | ---- | ----------- | ---------- | --------- | | | Gauge | Int | development | +### k8s.node.allocatable.cpu + +Amount of cpu allocatable on the node + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {cpu} | Sum | Double | Unspecified | false | + +### k8s.node.allocatable.ephemeral_storage + +Amount of ephemeral-storage allocatable on the node + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| By | Sum | Int | Unspecified | false | + +### k8s.node.allocatable.memory + +Amount of memory allocatable on the node + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| By | Sum | Int | Unspecified | false | + +### k8s.node.allocatable.pods + +Amount of pods allocatable on the node + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {pod} | Sum | Int | Unspecified | false | + ### k8s.pod.phase Current phase of the pod (1 - Pending, 2 - Running, 3 - Succeeded, 4 - Failed, 5 - Unknown) diff --git a/receiver/k8sclusterreceiver/go.mod b/receiver/k8sclusterreceiver/go.mod index 90354385a9c67..728c55fc9aee5 100644 --- a/receiver/k8sclusterreceiver/go.mod +++ b/receiver/k8sclusterreceiver/go.mod @@ -25,6 +25,7 @@ require ( go.opentelemetry.io/collector/confmap/xconfmap v0.138.1-0.20251021231522-c657d5d4e920 go.opentelemetry.io/collector/consumer v1.44.1-0.20251021231522-c657d5d4e920 go.opentelemetry.io/collector/consumer/consumertest v0.138.1-0.20251021231522-c657d5d4e920 + go.opentelemetry.io/collector/featuregate v1.44.1-0.20251021231522-c657d5d4e920 go.opentelemetry.io/collector/filter v0.138.1-0.20251021231522-c657d5d4e920 go.opentelemetry.io/collector/pdata v1.44.1-0.20251021231522-c657d5d4e920 go.opentelemetry.io/collector/pipeline v1.44.1-0.20251021231522-c657d5d4e920 @@ -111,7 +112,6 @@ require ( go.opentelemetry.io/collector/extension v1.44.1-0.20251021231522-c657d5d4e920 // indirect go.opentelemetry.io/collector/extension/extensionauth v1.44.1-0.20251021231522-c657d5d4e920 // indirect go.opentelemetry.io/collector/extension/extensionmiddleware v0.138.1-0.20251021231522-c657d5d4e920 // indirect - go.opentelemetry.io/collector/featuregate v1.44.1-0.20251021231522-c657d5d4e920 // indirect go.opentelemetry.io/collector/internal/sharedcomponent v0.138.1-0.20251021231522-c657d5d4e920 // indirect go.opentelemetry.io/collector/internal/telemetry v0.138.1-0.20251021231522-c657d5d4e920 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.138.1-0.20251021231522-c657d5d4e920 // indirect diff --git a/receiver/k8sclusterreceiver/internal/metadata/generated_config.go b/receiver/k8sclusterreceiver/internal/metadata/generated_config.go index a78ba787f9b80..2e4b5b6e07e5e 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/generated_config.go +++ b/receiver/k8sclusterreceiver/internal/metadata/generated_config.go @@ -57,6 +57,10 @@ type MetricsConfig struct { K8sJobMaxParallelPods MetricConfig `mapstructure:"k8s.job.max_parallel_pods"` K8sJobSuccessfulPods MetricConfig `mapstructure:"k8s.job.successful_pods"` K8sNamespacePhase MetricConfig `mapstructure:"k8s.namespace.phase"` + K8sNodeAllocatableCPU MetricConfig `mapstructure:"k8s.node.allocatable.cpu"` + K8sNodeAllocatableEphemeralStorage MetricConfig `mapstructure:"k8s.node.allocatable.ephemeral_storage"` + K8sNodeAllocatableMemory MetricConfig `mapstructure:"k8s.node.allocatable.memory"` + K8sNodeAllocatablePods MetricConfig `mapstructure:"k8s.node.allocatable.pods"` K8sNodeCondition MetricConfig `mapstructure:"k8s.node.condition"` K8sPodPhase MetricConfig `mapstructure:"k8s.pod.phase"` K8sPodStatusReason MetricConfig `mapstructure:"k8s.pod.status_reason"` @@ -165,6 +169,18 @@ func DefaultMetricsConfig() MetricsConfig { K8sNamespacePhase: MetricConfig{ Enabled: true, }, + K8sNodeAllocatableCPU: MetricConfig{ + Enabled: true, + }, + K8sNodeAllocatableEphemeralStorage: MetricConfig{ + Enabled: true, + }, + K8sNodeAllocatableMemory: MetricConfig{ + Enabled: true, + }, + K8sNodeAllocatablePods: MetricConfig{ + Enabled: true, + }, K8sNodeCondition: MetricConfig{ Enabled: false, }, diff --git a/receiver/k8sclusterreceiver/internal/metadata/generated_config_test.go b/receiver/k8sclusterreceiver/internal/metadata/generated_config_test.go index f43108accc135..d0553829aafb6 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/generated_config_test.go +++ b/receiver/k8sclusterreceiver/internal/metadata/generated_config_test.go @@ -56,6 +56,10 @@ func TestMetricsBuilderConfig(t *testing.T) { K8sJobMaxParallelPods: MetricConfig{Enabled: true}, K8sJobSuccessfulPods: MetricConfig{Enabled: true}, K8sNamespacePhase: MetricConfig{Enabled: true}, + K8sNodeAllocatableCPU: MetricConfig{Enabled: true}, + K8sNodeAllocatableEphemeralStorage: MetricConfig{Enabled: true}, + K8sNodeAllocatableMemory: MetricConfig{Enabled: true}, + K8sNodeAllocatablePods: MetricConfig{Enabled: true}, K8sNodeCondition: MetricConfig{Enabled: true}, K8sPodPhase: MetricConfig{Enabled: true}, K8sPodStatusReason: MetricConfig{Enabled: true}, @@ -151,6 +155,10 @@ func TestMetricsBuilderConfig(t *testing.T) { K8sJobMaxParallelPods: MetricConfig{Enabled: false}, K8sJobSuccessfulPods: MetricConfig{Enabled: false}, K8sNamespacePhase: MetricConfig{Enabled: false}, + K8sNodeAllocatableCPU: MetricConfig{Enabled: false}, + K8sNodeAllocatableEphemeralStorage: MetricConfig{Enabled: false}, + K8sNodeAllocatableMemory: MetricConfig{Enabled: false}, + K8sNodeAllocatablePods: MetricConfig{Enabled: false}, K8sNodeCondition: MetricConfig{Enabled: false}, K8sPodPhase: MetricConfig{Enabled: false}, K8sPodStatusReason: MetricConfig{Enabled: false}, diff --git a/receiver/k8sclusterreceiver/internal/metadata/generated_metrics.go b/receiver/k8sclusterreceiver/internal/metadata/generated_metrics.go index 37d2a1db6573e..5f99dd851ee37 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/generated_metrics.go +++ b/receiver/k8sclusterreceiver/internal/metadata/generated_metrics.go @@ -185,6 +185,18 @@ var MetricsInfo = metricsInfo{ K8sNamespacePhase: metricInfo{ Name: "k8s.namespace.phase", }, + K8sNodeAllocatableCPU: metricInfo{ + Name: "k8s.node.allocatable.cpu", + }, + K8sNodeAllocatableEphemeralStorage: metricInfo{ + Name: "k8s.node.allocatable.ephemeral_storage", + }, + K8sNodeAllocatableMemory: metricInfo{ + Name: "k8s.node.allocatable.memory", + }, + K8sNodeAllocatablePods: metricInfo{ + Name: "k8s.node.allocatable.pods", + }, K8sNodeCondition: metricInfo{ Name: "k8s.node.condition", }, @@ -268,6 +280,10 @@ type metricsInfo struct { K8sJobMaxParallelPods metricInfo K8sJobSuccessfulPods metricInfo K8sNamespacePhase metricInfo + K8sNodeAllocatableCPU metricInfo + K8sNodeAllocatableEphemeralStorage metricInfo + K8sNodeAllocatableMemory metricInfo + K8sNodeAllocatablePods metricInfo K8sNodeCondition metricInfo K8sPodPhase metricInfo K8sPodStatusReason metricInfo @@ -1720,6 +1736,210 @@ func newMetricK8sNamespacePhase(cfg MetricConfig) metricK8sNamespacePhase { return m } +type metricK8sNodeAllocatableCPU struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.allocatable.cpu metric with initial data. +func (m *metricK8sNodeAllocatableCPU) init() { + m.data.SetName("k8s.node.allocatable.cpu") + m.data.SetDescription("Amount of cpu allocatable on the node") + m.data.SetUnit("{cpu}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityUnspecified) +} + +func (m *metricK8sNodeAllocatableCPU) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetDoubleValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeAllocatableCPU) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeAllocatableCPU) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeAllocatableCPU(cfg MetricConfig) metricK8sNodeAllocatableCPU { + m := metricK8sNodeAllocatableCPU{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeAllocatableEphemeralStorage struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.allocatable.ephemeral_storage metric with initial data. +func (m *metricK8sNodeAllocatableEphemeralStorage) init() { + m.data.SetName("k8s.node.allocatable.ephemeral_storage") + m.data.SetDescription("Amount of ephemeral-storage allocatable on the node") + m.data.SetUnit("By") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityUnspecified) +} + +func (m *metricK8sNodeAllocatableEphemeralStorage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeAllocatableEphemeralStorage) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeAllocatableEphemeralStorage) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeAllocatableEphemeralStorage(cfg MetricConfig) metricK8sNodeAllocatableEphemeralStorage { + m := metricK8sNodeAllocatableEphemeralStorage{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeAllocatableMemory struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.allocatable.memory metric with initial data. +func (m *metricK8sNodeAllocatableMemory) init() { + m.data.SetName("k8s.node.allocatable.memory") + m.data.SetDescription("Amount of memory allocatable on the node") + m.data.SetUnit("By") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityUnspecified) +} + +func (m *metricK8sNodeAllocatableMemory) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeAllocatableMemory) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeAllocatableMemory) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeAllocatableMemory(cfg MetricConfig) metricK8sNodeAllocatableMemory { + m := metricK8sNodeAllocatableMemory{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricK8sNodeAllocatablePods struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.node.allocatable.pods metric with initial data. +func (m *metricK8sNodeAllocatablePods) init() { + m.data.SetName("k8s.node.allocatable.pods") + m.data.SetDescription("Amount of pods allocatable on the node") + m.data.SetUnit("{pod}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityUnspecified) +} + +func (m *metricK8sNodeAllocatablePods) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sNodeAllocatablePods) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sNodeAllocatablePods) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sNodeAllocatablePods(cfg MetricConfig) metricK8sNodeAllocatablePods { + m := metricK8sNodeAllocatablePods{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + type metricK8sNodeCondition struct { data pmetric.Metric // data buffer for generated metric. config MetricConfig // metric config provided by user. @@ -2608,6 +2828,10 @@ type MetricsBuilder struct { metricK8sJobMaxParallelPods metricK8sJobMaxParallelPods metricK8sJobSuccessfulPods metricK8sJobSuccessfulPods metricK8sNamespacePhase metricK8sNamespacePhase + metricK8sNodeAllocatableCPU metricK8sNodeAllocatableCPU + metricK8sNodeAllocatableEphemeralStorage metricK8sNodeAllocatableEphemeralStorage + metricK8sNodeAllocatableMemory metricK8sNodeAllocatableMemory + metricK8sNodeAllocatablePods metricK8sNodeAllocatablePods metricK8sNodeCondition metricK8sNodeCondition metricK8sPodPhase metricK8sPodPhase metricK8sPodStatusReason metricK8sPodStatusReason @@ -2679,6 +2903,10 @@ func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, opt metricK8sJobMaxParallelPods: newMetricK8sJobMaxParallelPods(mbc.Metrics.K8sJobMaxParallelPods), metricK8sJobSuccessfulPods: newMetricK8sJobSuccessfulPods(mbc.Metrics.K8sJobSuccessfulPods), metricK8sNamespacePhase: newMetricK8sNamespacePhase(mbc.Metrics.K8sNamespacePhase), + metricK8sNodeAllocatableCPU: newMetricK8sNodeAllocatableCPU(mbc.Metrics.K8sNodeAllocatableCPU), + metricK8sNodeAllocatableEphemeralStorage: newMetricK8sNodeAllocatableEphemeralStorage(mbc.Metrics.K8sNodeAllocatableEphemeralStorage), + metricK8sNodeAllocatableMemory: newMetricK8sNodeAllocatableMemory(mbc.Metrics.K8sNodeAllocatableMemory), + metricK8sNodeAllocatablePods: newMetricK8sNodeAllocatablePods(mbc.Metrics.K8sNodeAllocatablePods), metricK8sNodeCondition: newMetricK8sNodeCondition(mbc.Metrics.K8sNodeCondition), metricK8sPodPhase: newMetricK8sPodPhase(mbc.Metrics.K8sPodPhase), metricK8sPodStatusReason: newMetricK8sPodStatusReason(mbc.Metrics.K8sPodStatusReason), @@ -3038,6 +3266,10 @@ func (mb *MetricsBuilder) EmitForResource(options ...ResourceMetricsOption) { mb.metricK8sJobMaxParallelPods.emit(ils.Metrics()) mb.metricK8sJobSuccessfulPods.emit(ils.Metrics()) mb.metricK8sNamespacePhase.emit(ils.Metrics()) + mb.metricK8sNodeAllocatableCPU.emit(ils.Metrics()) + mb.metricK8sNodeAllocatableEphemeralStorage.emit(ils.Metrics()) + mb.metricK8sNodeAllocatableMemory.emit(ils.Metrics()) + mb.metricK8sNodeAllocatablePods.emit(ils.Metrics()) mb.metricK8sNodeCondition.emit(ils.Metrics()) mb.metricK8sPodPhase.emit(ils.Metrics()) mb.metricK8sPodStatusReason.emit(ils.Metrics()) @@ -3231,6 +3463,26 @@ func (mb *MetricsBuilder) RecordK8sNamespacePhaseDataPoint(ts pcommon.Timestamp, mb.metricK8sNamespacePhase.recordDataPoint(mb.startTime, ts, val) } +// RecordK8sNodeAllocatableCPUDataPoint adds a data point to k8s.node.allocatable.cpu metric. +func (mb *MetricsBuilder) RecordK8sNodeAllocatableCPUDataPoint(ts pcommon.Timestamp, val float64) { + mb.metricK8sNodeAllocatableCPU.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeAllocatableEphemeralStorageDataPoint adds a data point to k8s.node.allocatable.ephemeral_storage metric. +func (mb *MetricsBuilder) RecordK8sNodeAllocatableEphemeralStorageDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeAllocatableEphemeralStorage.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeAllocatableMemoryDataPoint adds a data point to k8s.node.allocatable.memory metric. +func (mb *MetricsBuilder) RecordK8sNodeAllocatableMemoryDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeAllocatableMemory.recordDataPoint(mb.startTime, ts, val) +} + +// RecordK8sNodeAllocatablePodsDataPoint adds a data point to k8s.node.allocatable.pods metric. +func (mb *MetricsBuilder) RecordK8sNodeAllocatablePodsDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricK8sNodeAllocatablePods.recordDataPoint(mb.startTime, ts, val) +} + // RecordK8sNodeConditionDataPoint adds a data point to k8s.node.condition metric. func (mb *MetricsBuilder) RecordK8sNodeConditionDataPoint(ts pcommon.Timestamp, val int64, conditionAttributeValue string) { mb.metricK8sNodeCondition.recordDataPoint(mb.startTime, ts, val, conditionAttributeValue) diff --git a/receiver/k8sclusterreceiver/internal/metadata/generated_metrics_test.go b/receiver/k8sclusterreceiver/internal/metadata/generated_metrics_test.go index 5cfc6948f3004..dedeff4b4153a 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/k8sclusterreceiver/internal/metadata/generated_metrics_test.go @@ -182,6 +182,22 @@ func TestMetricsBuilder(t *testing.T) { allMetricsCount++ mb.RecordK8sNamespacePhaseDataPoint(ts, 1) + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeAllocatableCPUDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeAllocatableEphemeralStorageDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeAllocatableMemoryDataPoint(ts, 1) + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordK8sNodeAllocatablePodsDataPoint(ts, 1) + allMetricsCount++ mb.RecordK8sNodeConditionDataPoint(ts, 1, "condition-val") @@ -669,6 +685,62 @@ func TestMetricsBuilder(t *testing.T) { assert.Equal(t, ts, dp.Timestamp()) assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.allocatable.cpu": + assert.False(t, validatedMetrics["k8s.node.allocatable.cpu"], "Found a duplicate in the metrics slice: k8s.node.allocatable.cpu") + validatedMetrics["k8s.node.allocatable.cpu"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Amount of cpu allocatable on the node", ms.At(i).Description()) + assert.Equal(t, "{cpu}", ms.At(i).Unit()) + assert.False(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityUnspecified, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "k8s.node.allocatable.ephemeral_storage": + assert.False(t, validatedMetrics["k8s.node.allocatable.ephemeral_storage"], "Found a duplicate in the metrics slice: k8s.node.allocatable.ephemeral_storage") + validatedMetrics["k8s.node.allocatable.ephemeral_storage"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Amount of ephemeral-storage allocatable on the node", ms.At(i).Description()) + assert.Equal(t, "By", ms.At(i).Unit()) + assert.False(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityUnspecified, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.allocatable.memory": + assert.False(t, validatedMetrics["k8s.node.allocatable.memory"], "Found a duplicate in the metrics slice: k8s.node.allocatable.memory") + validatedMetrics["k8s.node.allocatable.memory"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Amount of memory allocatable on the node", ms.At(i).Description()) + assert.Equal(t, "By", ms.At(i).Unit()) + assert.False(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityUnspecified, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.node.allocatable.pods": + assert.False(t, validatedMetrics["k8s.node.allocatable.pods"], "Found a duplicate in the metrics slice: k8s.node.allocatable.pods") + validatedMetrics["k8s.node.allocatable.pods"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Amount of pods allocatable on the node", ms.At(i).Description()) + assert.Equal(t, "{pod}", ms.At(i).Unit()) + assert.False(t, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityUnspecified, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) case "k8s.node.condition": assert.False(t, validatedMetrics["k8s.node.condition"], "Found a duplicate in the metrics slice: k8s.node.condition") validatedMetrics["k8s.node.condition"] = true diff --git a/receiver/k8sclusterreceiver/internal/metadata/testdata/config.yaml b/receiver/k8sclusterreceiver/internal/metadata/testdata/config.yaml index e35374f5d1d0a..502c4378bcacb 100644 --- a/receiver/k8sclusterreceiver/internal/metadata/testdata/config.yaml +++ b/receiver/k8sclusterreceiver/internal/metadata/testdata/config.yaml @@ -59,6 +59,14 @@ all_set: enabled: true k8s.namespace.phase: enabled: true + k8s.node.allocatable.cpu: + enabled: true + k8s.node.allocatable.ephemeral_storage: + enabled: true + k8s.node.allocatable.memory: + enabled: true + k8s.node.allocatable.pods: + enabled: true k8s.node.condition: enabled: true k8s.pod.phase: @@ -234,6 +242,14 @@ none_set: enabled: false k8s.namespace.phase: enabled: false + k8s.node.allocatable.cpu: + enabled: false + k8s.node.allocatable.ephemeral_storage: + enabled: false + k8s.node.allocatable.memory: + enabled: false + k8s.node.allocatable.pods: + enabled: false k8s.node.condition: enabled: false k8s.pod.phase: diff --git a/receiver/k8sclusterreceiver/internal/node/nodes.go b/receiver/k8sclusterreceiver/internal/node/nodes.go index 41a7d2fd995a2..c681c8feb75a5 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes.go @@ -19,6 +19,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/maps" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) const ( @@ -60,6 +61,24 @@ func RecordMetrics(mb *metadata.MetricsBuilder, node *corev1.Node, ts pcommon.Ti rb.SetK8sNodeName(node.Name) rb.SetK8sKubeletVersion(node.Status.NodeInfo.KubeletVersion) + if utils.EnableStableMetrics.IsEnabled() { + if cpuVal, ok := node.Status.Allocatable[corev1.ResourceCPU]; ok { + mb.RecordK8sNodeAllocatableCPUDataPoint(ts, float64(cpuVal.MilliValue())/1000.0) + } + + if ephemeralMemoryVal, ok := node.Status.Allocatable[corev1.ResourceEphemeralStorage]; ok { + mb.RecordK8sNodeAllocatableEphemeralStorageDataPoint(ts, ephemeralMemoryVal.Value()) + } + + if memoryVal, ok := node.Status.Allocatable[corev1.ResourceMemory]; ok { + mb.RecordK8sNodeAllocatableMemoryDataPoint(ts, memoryVal.Value()) + } + + if podVal, ok := node.Status.Allocatable[corev1.ResourcePods]; ok { + mb.RecordK8sNodeAllocatablePodsDataPoint(ts, podVal.Value()) + } + } + mb.EmitForResource(metadata.WithResource(rb.Emit())) } @@ -83,22 +102,24 @@ func CustomMetrics(set receiver.Settings, rb *metadata.ResourceBuilder, node *co } // Adding 'node allocatable type' metrics - for _, nodeAllocatableTypeValue := range allocatableTypesToReport { - v1NodeAllocatableTypeValue := corev1.ResourceName(nodeAllocatableTypeValue) - quantity, ok := node.Status.Allocatable[v1NodeAllocatableTypeValue] - if !ok { - set.Logger.Debug(fmt.Errorf("allocatable type %v not found in node %v", nodeAllocatableTypeValue, - node.GetName()).Error()) - continue + if !utils.DisableLegacyMetrics.IsEnabled() { + for _, nodeAllocatableTypeValue := range allocatableTypesToReport { + v1NodeAllocatableTypeValue := corev1.ResourceName(nodeAllocatableTypeValue) + quantity, ok := node.Status.Allocatable[v1NodeAllocatableTypeValue] + if !ok { + set.Logger.Debug(fmt.Errorf("allocatable type %v not found in node %v", nodeAllocatableTypeValue, + node.GetName()).Error()) + continue + } + m := sm.Metrics().AppendEmpty() + m.SetName(getNodeAllocatableMetric(nodeAllocatableTypeValue)) + m.SetDescription(fmt.Sprintf("Amount of %v allocatable on the node", nodeAllocatableTypeValue)) + m.SetUnit(getNodeAllocatableUnit(v1NodeAllocatableTypeValue)) + g := m.SetEmptyGauge() + dp := g.DataPoints().AppendEmpty() + setNodeAllocatableValue(dp, v1NodeAllocatableTypeValue, quantity) + dp.SetTimestamp(ts) } - m := sm.Metrics().AppendEmpty() - m.SetName(getNodeAllocatableMetric(nodeAllocatableTypeValue)) - m.SetDescription(fmt.Sprintf("Amount of %v allocatable on the node", nodeAllocatableTypeValue)) - m.SetUnit(getNodeAllocatableUnit(v1NodeAllocatableTypeValue)) - g := m.SetEmptyGauge() - dp := g.DataPoints().AppendEmpty() - setNodeAllocatableValue(dp, v1NodeAllocatableTypeValue, quantity) - dp.SetTimestamp(ts) } if sm.Metrics().Len() == 0 { diff --git a/receiver/k8sclusterreceiver/internal/node/nodes_test.go b/receiver/k8sclusterreceiver/internal/node/nodes_test.go index 7fb04725db08f..fc9be5a9189a7 100644 --- a/receiver/k8sclusterreceiver/internal/node/nodes_test.go +++ b/receiver/k8sclusterreceiver/internal/node/nodes_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" @@ -23,6 +24,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) func TestNodeMetricsReportCPUMetrics(t *testing.T) { @@ -64,6 +66,35 @@ func TestNodeMetricsReportCPUMetrics(t *testing.T) { ) } +func TestNodeAllocatableNamespaceMetrics(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(utils.EnableStableMetrics.ID(), true)) + defer func() { + require.NoError(t, featuregate.GlobalRegistry().Set(utils.EnableStableMetrics.ID(), false)) + }() + n := testutils.NewNode("1") + + ts := pcommon.Timestamp(time.Now().UnixNano()) + mbc := metadata.DefaultMetricsBuilderConfig() + mbc.Metrics.K8sNodeAllocatableCPU.Enabled = true + mbc.Metrics.K8sNodeAllocatableMemory.Enabled = true + mbc.Metrics.K8sNodeAllocatableEphemeralStorage.Enabled = true + mbc.Metrics.K8sNodeAllocatablePods.Enabled = true + mb := metadata.NewMetricsBuilder(mbc, receivertest.NewNopSettings(metadata.Type)) + RecordMetrics(mb, n, ts) + m := mb.Emit() + + expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected_allocatable.yaml")) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(expected, m, + pmetrictest.IgnoreTimestamp(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreResourceMetricsOrder(), + pmetrictest.IgnoreMetricsOrder(), + pmetrictest.IgnoreScopeMetricsOrder(), + ), + ) +} + func TestNodeOptionalMetrics(t *testing.T) { n := testutils.NewNode("2") rac := metadata.DefaultResourceAttributesConfig() diff --git a/receiver/k8sclusterreceiver/internal/node/testdata/expected_allocatable.yaml b/receiver/k8sclusterreceiver/internal/node/testdata/expected_allocatable.yaml new file mode 100644 index 0000000000000..ce2d207fc67b7 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/node/testdata/expected_allocatable.yaml @@ -0,0 +1,40 @@ +resourceMetrics: + - resource: + attributes: + - key: k8s.node.name + value: + stringValue: test-node-1 + - key: k8s.node.uid + value: + stringValue: test-node-1-uid + schemaUrl: https://opentelemetry.io/schemas/1.18.0 + scopeMetrics: + - metrics: + - description: Amount of cpu allocatable on the node + sum: + dataPoints: + - asDouble: 0.123 + name: k8s.node.allocatable.cpu + unit: '{cpu}' + - description: Amount of ephemeral-storage allocatable on the node + sum: + dataPoints: + - asInt: "1234" + name: k8s.node.allocatable.ephemeral_storage + unit: By + - description: Amount of memory allocatable on the node + sum: + dataPoints: + - asInt: "456" + name: k8s.node.allocatable.memory + unit: By + - description: Amount of pods allocatable on the node + sum: + dataPoints: + - asInt: "12" + name: k8s.node.allocatable.pods + unit: "{pod}" + + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver + version: latest diff --git a/receiver/k8sclusterreceiver/internal/utils/set.go b/receiver/k8sclusterreceiver/internal/utils/set.go index 2758d5792b4c3..f233caa7b24bb 100644 --- a/receiver/k8sclusterreceiver/internal/utils/set.go +++ b/receiver/k8sclusterreceiver/internal/utils/set.go @@ -3,6 +3,22 @@ package utils // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" +import "go.opentelemetry.io/collector/featuregate" + +var EnableStableMetrics = featuregate.GlobalRegistry().MustRegister( + "semconv.k8s.enableStable", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, semconv stable metrics are enabled."), + featuregate.WithRegisterFromVersion("v0.139.0"), +) + +var DisableLegacyMetrics = featuregate.GlobalRegistry().MustRegister( + "semconv.k8s.disableLegacy", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, semconv legacy metrics are disabled."), + featuregate.WithRegisterFromVersion("v0.139.0"), +) + // StringSliceToMap converts a slice of strings into a map with keys from the slice func StringSliceToMap(strings []string) map[string]bool { ret := map[string]bool{} diff --git a/receiver/k8sclusterreceiver/metadata.yaml b/receiver/k8sclusterreceiver/metadata.yaml index 3e4cd38dc3311..6882cb3077e8f 100644 --- a/receiver/k8sclusterreceiver/metadata.yaml +++ b/receiver/k8sclusterreceiver/metadata.yaml @@ -492,6 +492,38 @@ metrics: level: development gauge: value_type: int + + k8s.node.allocatable.cpu: + enabled: true + description: Amount of cpu allocatable on the node + unit: "{cpu}" + sum: + monotonic: false + value_type: double + k8s.node.allocatable.ephemeral_storage: + enabled: true + description: Amount of ephemeral-storage allocatable on the node + unit: "By" + sum: + monotonic: false + value_type: int + k8s.node.allocatable.memory: + enabled: true + description: Amount of memory allocatable on the node + unit: "By" + sum: + monotonic: false + value_type: int + k8s.node.allocatable.pods: + enabled: true + description: Amount of pods allocatable on the node + unit: "{pod}" + sum: + monotonic: false + value_type: int + + # k8s.node.condition_* metrics (k8s.node.condition_ready, k8s.node.condition_memory_pressure, etc) are controlled + # by node_conditions_to_report config option. By default, only k8s.node.condition_ready is enabled. k8s.node.condition: enabled: false diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index d06cb6a9c513c..fefda9a5a550e 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -19,6 +19,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) const ( @@ -103,6 +104,10 @@ func (kr *kubernetesReceiver) startReceiver(ctx context.Context, host component. func (kr *kubernetesReceiver) Start(ctx context.Context, host component.Host) error { ctx, kr.cancel = context.WithCancel(ctx) + if utils.DisableLegacyMetrics.IsEnabled() && !utils.EnableStableMetrics.IsEnabled() { + return errors.New("cannot disable legacy metrics without enabling stable metrics") + } + // if extension is defined start with k8s leader elector if kr.config.K8sLeaderElector != nil { kr.settings.Logger.Info("Starting k8sClusterReceiver with leader election") diff --git a/receiver/k8sclusterreceiver/receiver_test.go b/receiver/k8sclusterreceiver/receiver_test.go index 94b666122d1a0..3e879a13f83f4 100644 --- a/receiver/k8sclusterreceiver/receiver_test.go +++ b/receiver/k8sclusterreceiver/receiver_test.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/receiver" semconv "go.opentelemetry.io/otel/semconv/v1.27.0" @@ -30,6 +31,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) type nopHost struct { @@ -231,6 +233,30 @@ func TestNamespacedReceiverWithMultipleNamespaces(t *testing.T) { require.NoError(t, r.Shutdown(ctx)) } +func TestReceiverFeatureGatesCombination(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(utils.DisableLegacyMetrics.ID(), true)) + defer func() { + require.NoError(t, featuregate.GlobalRegistry().Set(utils.DisableLegacyMetrics.ID(), false)) + }() + + tt := componenttest.NewTelemetry() + defer func() { + require.NoError(t, tt.Shutdown(t.Context())) + }() + + client := newFakeClientWithAllResources() + osQuotaClient := fakeQuota.NewSimpleClientset() + sink := new(consumertest.MetricsSink) + + observedNamespaces := []string{"test-0", "test-1"} + r := setupReceiver(client, osQuotaClient, sink, nil, 10*time.Second, tt, observedNamespaces, component.MustNewID("foo")) + + ctx := t.Context() + require.ErrorContains(t, r.Start(ctx, newNopHost()), "cannot disable legacy metrics without enabling stable metrics") + + require.NoError(t, r.Shutdown(ctx)) +} + func TestReceiverTimesOutAfterStartup(t *testing.T) { tt := componenttest.NewTelemetry() defer func() {