Skip to content

Add scraper for NVME Container Insights prometheus metrics in EKS (#296) #309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ const (
TypePodEFA = "PodEFA"
TypeNodeEFA = "NodeEFA"
TypeHyperPodNode = "HyperPodNode"
TypeNodeNVME = "NodeNVME"

// unit
UnitBytes = "Bytes"
Expand Down
1 change: 1 addition & 0 deletions internal/aws/containerinsight/k8sconst.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
GpuDevice = "GpuDevice"
EfaDevice = "EfaDevice"
EniID = "NetworkInterfaceId"
VolumeID = "VolumeId"

PodStatus = "pod_status"
ContainerStatus = "container_status"
Expand Down
36 changes: 36 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/nvme/metric_unit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package nvme // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/gpu"
import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
)

const (
// Original Metric Names
ebsReadOpsTotal = "aws_ebs_csi_read_ops_total"
ebsWriteOpsTotal = "aws_ebs_csi_write_ops_total"
ebsReadBytesTotal = "aws_ebs_csi_read_bytes_total"
ebsWriteBytesTotal = "aws_ebs_csi_write_bytes_total"
ebsReadTime = "aws_ebs_csi_read_seconds_total"
ebsWriteTime = "aws_ebs_csi_write_seconds_total"
ebsExceededIOPSTime = "aws_ebs_csi_exceeded_iops_seconds_total"
ebsExceededTPTime = "aws_ebs_csi_exceeded_tp_seconds_total"
ebsExceededEC2IOPSTime = "aws_ebs_csi_ec2_exceeded_iops_seconds_total"
ebsExceededEC2TPTime = "aws_ebs_csi_ec2_exceeded_tp_seconds_total"
ebsVolumeQueueLength = "aws_ebs_csi_volume_queue_length"
)

var MetricToUnit = map[string]string{
ebsReadOpsTotal: containerinsight.UnitCount,
ebsWriteOpsTotal: containerinsight.UnitCount,
ebsReadBytesTotal: containerinsight.UnitBytes,
ebsWriteBytesTotal: containerinsight.UnitBytes,
ebsReadTime: containerinsight.UnitSecond,
ebsWriteTime: containerinsight.UnitSecond,
ebsExceededIOPSTime: containerinsight.UnitSecond,
ebsExceededTPTime: containerinsight.UnitSecond,
ebsExceededEC2IOPSTime: containerinsight.UnitSecond,
ebsExceededEC2TPTime: containerinsight.UnitSecond,
ebsVolumeQueueLength: containerinsight.UnitCount,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package nvme // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/gpu"

import (
"os"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/prometheus/prometheus/model/relabel"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
)

const (
collectionInterval = 60 * time.Second
jobName = "containerInsightsNVMeExporterScraper"
scraperMetricsPath = "/metrics"
scraperK8sServiceSelector = "app=ebs-csi-node"
)

type hostInfoProvider interface {
GetClusterName() string
GetInstanceID() string
GetInstanceType() string
}

func GetScraperConfig(hostInfoProvider hostInfoProvider) *config.ScrapeConfig {
return &config.ScrapeConfig{
ScrapeInterval: model.Duration(collectionInterval),
ScrapeTimeout: model.Duration(collectionInterval),
ScrapeProtocols: config.DefaultScrapeProtocols,
JobName: jobName,
Scheme: "http",
MetricsPath: scraperMetricsPath,
ServiceDiscoveryConfigs: discovery.Configs{
&kubernetes.SDConfig{
Role: kubernetes.RoleService,
NamespaceDiscovery: kubernetes.NamespaceDiscovery{
Names: []string{"kube-system"},
},
Selectors: []kubernetes.SelectorConfig{
{
Role: kubernetes.RoleService,
Label: scraperK8sServiceSelector,
},
},
},
},
MetricRelabelConfigs: getMetricRelabelConfig(hostInfoProvider),
}
}

func getMetricRelabelConfig(hostInfoProvider hostInfoProvider) []*relabel.Config {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need relabel configs here? Can't we just take all the metrics scraped from the endpoint?

Sorry if this was answered before -- I can't remember the previous PR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for relabeling the dimensions to the values we actually want to see on CloudWatch. You could scrape them as it is and use another processor to relabel them, but just following DCGM pattern:

func getMetricRelabelConfig(hostInfoProvider hostInfoProvider) []*relabel.Config {

return []*relabel.Config{
{
SourceLabels: model.LabelNames{"__name__"},
Regex: relabel.MustNewRegexp("aws_ebs_csi_.*"),
Action: relabel.Keep,
},

// Below metrics are histogram type which are not supported for container insights yet
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we have a TODO item in the backlog for this. Once we support prom histograms we should add these back

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @duhminick did investigations on this and it was proposed to not pursue this metric because the emitted metric isn't actually useful.

{
SourceLabels: model.LabelNames{"__name__"},
Regex: relabel.MustNewRegexp(".*_bucket|.*_sum|.*_count.*"),
Action: relabel.Drop,
},
// Hacky way to inject static values (clusterName/instanceId/nodeName/volumeID)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind me -- do we do this anywhere else?

Where we use relabel configs to inject clustername, instanceid, etc?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we do similar stuff in DCGM scraper:

// hacky way to inject static values (clusterName/instanceId/instanceType)

{
SourceLabels: model.LabelNames{"instance_id"},
TargetLabel: ci.NodeNameKey,
Regex: relabel.MustNewRegexp(".*"),
Replacement: os.Getenv("HOST_NAME"),
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"instance_id"},
TargetLabel: ci.ClusterNameKey,
Regex: relabel.MustNewRegexp(".*"),
Replacement: hostInfoProvider.GetClusterName(),
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"instance_id"},
TargetLabel: ci.InstanceID,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"volume_id"},
TargetLabel: ci.VolumeID,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
}
}
220 changes: 220 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/nvme/nvmescraper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package nvme

import (
"context"
"strings"
"testing"

configutil "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/mocks"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
)

const renameMetric = `
# HELP aws_ebs_csi_read_ops_total The total number of completed read operations.
# TYPE aws_ebs_csi_read_ops_total counter
aws_ebs_csi_read_ops_total{instance_id="i-0131bee5395cc4317",volume_id="vol-0281cf921f3dbb69b"} 55592
# HELP aws_ebs_csi_read_seconds_total The total time spent, in seconds, by all completed read operations.
# TYPE aws_ebs_csi_read_seconds_total counter
aws_ebs_csi_read_seconds_total{instance_id="i-0131bee5395cc4317",volume_id="vol-0281cf921f3dbb69b"} 34.52
`

const (
dummyInstanceID = "i-0000000000"
dummyClusterName = "cluster-name"
dummyInstanceType = "instance-type"
dummyNodeName = "hostname"
)

type mockHostInfoProvider struct{}

func (m mockHostInfoProvider) GetClusterName() string {
return dummyClusterName
}

func (m mockHostInfoProvider) GetInstanceID() string {
return dummyInstanceID
}

func (m mockHostInfoProvider) GetInstanceType() string {
return dummyInstanceType
}

type mockConsumer struct {
t *testing.T
called *bool
expected map[string]struct {
value float64
labels map[string]string
}
}

func (m mockConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: false,
}
}

func (m mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
assert.Equal(m.t, 1, md.ResourceMetrics().Len())

scrapedMetricCnt := 0
scopeMetrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
for i := 0; i < scopeMetrics.Len(); i++ {
metric := scopeMetrics.At(i)
// skip prometheus metadata metrics including "up"
if !strings.HasPrefix(metric.Name(), "aws_ebs") {
continue
}
metadata, ok := m.expected[metric.Name()]
assert.True(m.t, ok)
assert.Equal(m.t, metadata.value, metric.Sum().DataPoints().At(0).DoubleValue())
for k, v := range metadata.labels {
gauge := metric.Sum().DataPoints().At(0)
m.t.Logf("%v", gauge)
lv, found := metric.Sum().DataPoints().At(0).Attributes().Get(k)
assert.True(m.t, found)
assert.Equal(m.t, v, lv.AsString())
}
scrapedMetricCnt++
}
assert.Equal(m.t, len(m.expected), scrapedMetricCnt)
*m.called = true
return nil
}

func TestNewNVMEScraperEndToEnd(t *testing.T) {
t.Setenv("HOST_NAME", dummyNodeName)
expected := map[string]struct {
value float64
labels map[string]string
}{
"aws_ebs_csi_read_seconds_total": {
value: 34.52,
labels: map[string]string{
ci.NodeNameKey: "hostname",
ci.ClusterNameKey: dummyClusterName,
ci.InstanceID: "i-0131bee5395cc4317",
ci.VolumeID: "vol-0281cf921f3dbb69b",
},
},
"aws_ebs_csi_read_ops_total": {
value: 55592,
labels: map[string]string{
ci.NodeNameKey: "hostname",
ci.ClusterNameKey: dummyClusterName,
ci.InstanceID: "i-0131bee5395cc4317",
ci.VolumeID: "vol-0281cf921f3dbb69b",
},
},
}

consumerCalled := false
mConsumer := mockConsumer{
t: t,
called: &consumerCalled,
expected: expected,
}

settings := componenttest.NewNopTelemetrySettings()
settings.Logger, _ = zap.NewDevelopment()

scraper, err := prometheusscraper.NewSimplePrometheusScraper(prometheusscraper.SimplePrometheusScraperOpts{
Ctx: context.TODO(),
TelemetrySettings: settings,
Consumer: mConsumer,
Host: componenttest.NewNopHost(),
HostInfoProvider: mockHostInfoProvider{},
ScraperConfigs: GetScraperConfig(mockHostInfoProvider{}),
Logger: settings.Logger,
})
assert.NoError(t, err)
assert.Equal(t, mockHostInfoProvider{}, scraper.HostInfoProvider)

// build up a new PR
promFactory := prometheusreceiver.NewFactory()

targets := []*mocks.TestData{
{
Name: "nvme",
Pages: []mocks.MockPrometheusResponse{
{Code: 200, Data: renameMetric},
},
},
}
mp, cfg, err := mocks.SetupMockPrometheus(targets...)
assert.NoError(t, err)

scrapeConfig := scraper.ScraperConfigs
scrapeConfig.ScrapeProtocols = cfg.ScrapeConfigs[0].ScrapeProtocols
scrapeConfig.ScrapeInterval = cfg.ScrapeConfigs[0].ScrapeInterval
scrapeConfig.ScrapeTimeout = cfg.ScrapeConfigs[0].ScrapeInterval
scrapeConfig.Scheme = "http"
scrapeConfig.MetricsPath = cfg.ScrapeConfigs[0].MetricsPath
scrapeConfig.HTTPClientConfig = configutil.HTTPClientConfig{
TLSConfig: configutil.TLSConfig{
InsecureSkipVerify: true,
},
}
scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{
// using dummy static config to avoid service discovery initialization
&discovery.StaticConfig{
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(strings.Split(mp.Srv.URL, "http://")[1]),
},
},
},
},
}

promConfig := prometheusreceiver.Config{
PrometheusConfig: &prometheusreceiver.PromConfig{
ScrapeConfigs: []*config.ScrapeConfig{scrapeConfig},
},
}

// replace the prom receiver
params := receiver.Settings{
TelemetrySettings: scraper.Settings,
}
scraper.PrometheusReceiver, err = promFactory.CreateMetrics(scraper.Ctx, params, &promConfig, mConsumer)

assert.NoError(t, err)
assert.NotNil(t, mp)
defer mp.Close()

// perform a single scrape, this will kick off the scraper process for additional scrapes
scraper.GetMetrics()

t.Cleanup(func() {
scraper.Shutdown()
})

// wait for 2 scrapes, one initiated by us, another by the new scraper process
mp.Wg.Wait()
mp.Wg.Wait()
// make sure the consumer is called at scraping interval
assert.True(t, consumerCalled)
}

func TestNvmeScraperJobName(t *testing.T) {
// needs to start with containerInsights
assert.True(t, strings.HasPrefix(jobName, "containerInsightsNVMeExporterScraper"))
}
Loading
Loading