Skip to content

Add scraper for NVME Container Insights prometheus metrics in EKS #296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/aws/containerinsight/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ const (
TypePodEFA = "PodEFA"
TypeNodeEFA = "NodeEFA"
TypeHyperPodNode = "HyperPodNode"
TypeNodeNVME = "NodeNVME"

// unit
UnitBytes = "Bytes"
Expand Down
1 change: 1 addition & 0 deletions internal/aws/containerinsight/k8sconst.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
GpuDevice = "GpuDevice"
EfaDevice = "EfaDevice"
EniID = "NetworkInterfaceId"
VolumeID = "VolumeId"

PodStatus = "pod_status"
ContainerStatus = "container_status"
Expand Down
36 changes: 36 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/nvme/metric_unit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package nvme // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/gpu"
import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
)

const (
// Original Metric Names
ebsReadOpsTotal = "aws_ebs_csi_read_ops_total"
ebsWriteOpsTotal = "aws_ebs_csi_write_ops_total"
ebsReadBytesTotal = "aws_ebs_csi_read_bytes_total"
ebsWriteBytesTotal = "aws_ebs_csi_write_bytes_total"
ebsReadTime = "aws_ebs_csi_read_seconds_total"
ebsWriteTime = "aws_ebs_csi_write_seconds_total"
ebsExceededIOPSTime = "aws_ebs_csi_exceeded_iops_seconds_total"
ebsExceededTPTime = "aws_ebs_csi_exceeded_tp_seconds_total"
ebsExceededEC2IOPSTime = "aws_ebs_csi_ec2_exceeded_iops_seconds_total"
ebsExceededEC2TPTime = "aws_ebs_csi_ec2_exceeded_tp_seconds_total"
ebsVolumeQueueLength = "aws_ebs_csi_volume_queue_length"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was some discussion if we can convert the histogram metrics:

node_diskio_ebs_read_io_latency
node_diskio_ebs_write_io_latency

to statistics (avg/min/max). I think we already do this for api server metrics?

Seems like we already do this for some histograms - https://github.com/amazon-contributing/opentelemetry-collector-contrib/blob/aws-cwa-dev/exporter/awsemfexporter/datapoint.go#L188

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the NVME metrics one pager I see latency metrics are not included so not adding them here.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but I know there was some discussion on that doc with the container insights team if we could include the latency metrics since that would be valuable to customers and we don't want to have to add it later after we have launched

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can address this in a follow up -- but imo we should investigate if we can include the latency metrics as part of the launch

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This PR can be the baseline and histogram metrics we can implement it separately if needed.

)

var MetricToUnit = map[string]string{
ebsReadOpsTotal: containerinsight.UnitCount,
ebsWriteOpsTotal: containerinsight.UnitCount,
ebsReadBytesTotal: containerinsight.UnitBytes,
ebsWriteBytesTotal: containerinsight.UnitBytes,
ebsReadTime: containerinsight.UnitSecond,
ebsWriteTime: containerinsight.UnitSecond,
ebsExceededIOPSTime: containerinsight.UnitSecond,
ebsExceededTPTime: containerinsight.UnitSecond,
ebsExceededEC2IOPSTime: containerinsight.UnitSecond,
ebsExceededEC2TPTime: containerinsight.UnitSecond,
ebsVolumeQueueLength: containerinsight.UnitCount,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package nvme // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/gpu"

import (
"os"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/kubernetes"
"github.com/prometheus/prometheus/model/relabel"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
)

const (
collectionInterval = 60 * time.Second
jobName = "containerInsightsNVMeExporterScraper"
scraperMetricsPath = "/metrics"
scraperK8sServiceSelector = "app=ebs-csi-node"
)

type hostInfoProvider interface {
GetClusterName() string
GetInstanceID() string
GetInstanceType() string
}

func GetScraperConfig(hostInfoProvider hostInfoProvider) *config.ScrapeConfig {
return &config.ScrapeConfig{
ScrapeInterval: model.Duration(collectionInterval),
ScrapeTimeout: model.Duration(collectionInterval),
ScrapeProtocols: config.DefaultScrapeProtocols,
JobName: jobName,
Scheme: "http",
MetricsPath: scraperMetricsPath,
ServiceDiscoveryConfigs: discovery.Configs{
&kubernetes.SDConfig{
Role: kubernetes.RoleService,
NamespaceDiscovery: kubernetes.NamespaceDiscovery{
Names: []string{"kube-system"},
},
Selectors: []kubernetes.SelectorConfig{
{
Role: kubernetes.RoleService,
Label: scraperK8sServiceSelector,
},
},
},
},
MetricRelabelConfigs: getMetricRelabelConfig(hostInfoProvider),
}
}

func getMetricRelabelConfig(hostInfoProvider hostInfoProvider) []*relabel.Config {
return []*relabel.Config{
{
SourceLabels: model.LabelNames{"__name__"},
Regex: relabel.MustNewRegexp("aws_ebs_csi_.*"),
Action: relabel.Keep,
},

// Below metrics are histogram type which are not supported for container insights yet
{
SourceLabels: model.LabelNames{"__name__"},
Regex: relabel.MustNewRegexp(".*_bucket|.*_sum|.*_count.*"),
Action: relabel.Drop,
},
// Hacky way to inject static values (clusterName/instanceId/nodeName/volumeID)
{
SourceLabels: model.LabelNames{"instance_id"},
TargetLabel: ci.NodeNameKey,
Regex: relabel.MustNewRegexp(".*"),
Replacement: os.Getenv("HOST_NAME"),
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"instance_id"},
TargetLabel: ci.ClusterNameKey,
Regex: relabel.MustNewRegexp(".*"),
Replacement: hostInfoProvider.GetClusterName(),
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"instance_id"},
TargetLabel: ci.InstanceID,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
{
SourceLabels: model.LabelNames{"volume_id"},
TargetLabel: ci.VolumeID,
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "${1}",
Action: relabel.Replace,
},
}
}
220 changes: 220 additions & 0 deletions receiver/awscontainerinsightreceiver/internal/nvme/nvmescraper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package nvme

import (
"context"
"strings"
"testing"

configutil "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

ci "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/containerinsight"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/mocks"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/prometheusscraper"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver"
)

const renameMetric = `
# HELP aws_ebs_csi_read_ops_total The total number of completed read operations.
# TYPE aws_ebs_csi_read_ops_total counter
aws_ebs_csi_read_ops_total{instance_id="i-0131bee5395cc4317",volume_id="vol-0281cf921f3dbb69b"} 55592
# HELP aws_ebs_csi_read_seconds_total The total time spent, in seconds, by all completed read operations.
# TYPE aws_ebs_csi_read_seconds_total counter
aws_ebs_csi_read_seconds_total{instance_id="i-0131bee5395cc4317",volume_id="vol-0281cf921f3dbb69b"} 34.52
`

const (
dummyInstanceID = "i-0000000000"
dummyClusterName = "cluster-name"
dummyInstanceType = "instance-type"
dummyNodeName = "hostname"
)

type mockHostInfoProvider struct{}

func (m mockHostInfoProvider) GetClusterName() string {
return dummyClusterName
}

func (m mockHostInfoProvider) GetInstanceID() string {
return dummyInstanceID
}

func (m mockHostInfoProvider) GetInstanceType() string {
return dummyInstanceType
}

type mockConsumer struct {
t *testing.T
called *bool
expected map[string]struct {
value float64
labels map[string]string
}
}

func (m mockConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: false,
}
}

func (m mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
assert.Equal(m.t, 1, md.ResourceMetrics().Len())

scrapedMetricCnt := 0
scopeMetrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
for i := 0; i < scopeMetrics.Len(); i++ {
metric := scopeMetrics.At(i)
// skip prometheus metadata metrics including "up"
if !strings.HasPrefix(metric.Name(), "node_") {
continue
}
metadata, ok := m.expected[metric.Name()]
assert.True(m.t, ok)
assert.Equal(m.t, metadata.value, metric.Gauge().DataPoints().At(0).DoubleValue())
for k, v := range metadata.labels {
gauge := metric.Gauge().DataPoints().At(0)
m.t.Logf("%v", gauge)
lv, found := metric.Gauge().DataPoints().At(0).Attributes().Get(k)
assert.True(m.t, found)
assert.Equal(m.t, v, lv.AsString())
}
scrapedMetricCnt++
}
assert.Equal(m.t, len(m.expected), scrapedMetricCnt)
*m.called = true
return nil
}

func TestNewNVMEScraperEndToEnd(t *testing.T) {
t.Setenv("HOST_NAME", dummyNodeName)
expected := map[string]struct {
value float64
labels map[string]string
}{
"node_diskio_ebs_total_read_time": {
value: 34.52,
labels: map[string]string{
ci.NodeNameKey: "hostname",
ci.ClusterNameKey: dummyClusterName,
ci.InstanceID: "i-0131bee5395cc4317",
ci.VolumeID: "vol-0281cf921f3dbb69b",
},
},
"node_diskio_ebs_total_read_ops": {
value: 55592,
labels: map[string]string{
ci.NodeNameKey: "hostname",
ci.ClusterNameKey: dummyClusterName,
ci.InstanceID: "i-0131bee5395cc4317",
ci.VolumeID: "vol-0281cf921f3dbb69b",
},
},
}

consumerCalled := false
mConsumer := mockConsumer{
t: t,
called: &consumerCalled,
expected: expected,
}

settings := componenttest.NewNopTelemetrySettings()
settings.Logger, _ = zap.NewDevelopment()

scraper, err := prometheusscraper.NewSimplePrometheusScraper(prometheusscraper.SimplePrometheusScraperOpts{
Ctx: context.TODO(),
TelemetrySettings: settings,
Consumer: mConsumer,
Host: componenttest.NewNopHost(),
HostInfoProvider: mockHostInfoProvider{},
ScraperConfigs: GetScraperConfig(mockHostInfoProvider{}),
Logger: settings.Logger,
})
assert.NoError(t, err)
assert.Equal(t, mockHostInfoProvider{}, scraper.HostInfoProvider)

// build up a new PR
promFactory := prometheusreceiver.NewFactory()

targets := []*mocks.TestData{
{
Name: "nvme",
Pages: []mocks.MockPrometheusResponse{
{Code: 200, Data: renameMetric},
},
},
}
mp, cfg, err := mocks.SetupMockPrometheus(targets...)
assert.NoError(t, err)

scrapeConfig := scraper.ScraperConfigs
scrapeConfig.ScrapeProtocols = cfg.ScrapeConfigs[0].ScrapeProtocols
scrapeConfig.ScrapeInterval = cfg.ScrapeConfigs[0].ScrapeInterval
scrapeConfig.ScrapeTimeout = cfg.ScrapeConfigs[0].ScrapeInterval
scrapeConfig.Scheme = "http"
scrapeConfig.MetricsPath = cfg.ScrapeConfigs[0].MetricsPath
scrapeConfig.HTTPClientConfig = configutil.HTTPClientConfig{
TLSConfig: configutil.TLSConfig{
InsecureSkipVerify: true,
},
}
scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{
// using dummy static config to avoid service discovery initialization
&discovery.StaticConfig{
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(strings.Split(mp.Srv.URL, "http://")[1]),
},
},
},
},
}

promConfig := prometheusreceiver.Config{
PrometheusConfig: &prometheusreceiver.PromConfig{
ScrapeConfigs: []*config.ScrapeConfig{scrapeConfig},
},
}

// replace the prom receiver
params := receiver.Settings{
TelemetrySettings: scraper.Settings,
}
scraper.PrometheusReceiver, err = promFactory.CreateMetrics(scraper.Ctx, params, &promConfig, mConsumer)

assert.NoError(t, err)
assert.NotNil(t, mp)
defer mp.Close()

// perform a single scrape, this will kick off the scraper process for additional scrapes
scraper.GetMetrics()

t.Cleanup(func() {
scraper.Shutdown()
})

// wait for 2 scrapes, one initiated by us, another by the new scraper process
mp.Wg.Wait()
mp.Wg.Wait()
// make sure the consumer is called at scraping interval
assert.True(t, consumerCalled)
}

func TestDcgmScraperJobName(t *testing.T) {
// needs to start with containerInsights
assert.True(t, strings.HasPrefix(jobName, "containerInsightsNVMeExporterScraper"))
}
Loading
Loading