Skip to content

[exporter/awsemfexporter] Add user agent string when EBS metrics exist #312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/appsignals"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/useragent"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
)
Expand Down Expand Up @@ -52,6 +52,7 @@ type emfExporter struct {
collectorID string

processResourceLabels func(map[string]string)
processMetrics func(pmetric.Metrics)
}

// newEmfExporter creates a new exporter using exporterhelper
Expand All @@ -75,11 +76,7 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)
collectorID: collectorIdentifier.String(),
pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
processResourceLabels: func(map[string]string) {},
}

if config.IsAppSignalsEnabled() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in upstream, so it will cause merge conflicts. Is there a way we can keep this in? Or at least make note that this diverges from upstream

Copy link
Author

@zhihonl zhihonl May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has to be moved because of this line

emf.svcStructuredLog.Handlers().Build.PushFrontNamed(userAgent.Handler())

In a previous PR We moved svcStructuredLog variable around which is what we use to access the useragent. It is not available in newEmfExporter function anymore. We never added back PushFrontNamed function anywhere else. This basically means we stopped emitting appsignal useragent since then.

userAgent := appsignals.NewUserAgent()
emfExporter.processResourceLabels = userAgent.Process
processMetrics: func(pmetric.Metrics) {},
}

config.logger.Warn("the default value for DimensionRollupOption will be changing to NoDimensionRollup" +
Expand All @@ -104,6 +101,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
}
emf.config.logger.Debug("Start processing resource metrics", zap.Any("labels", labels))
emf.processResourceLabels(labels)
emf.processMetrics(md)

groupedMetrics := make(map[any]*groupedMetric)
defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
Expand Down Expand Up @@ -232,6 +230,21 @@ func (emf *emfExporter) start(_ context.Context, host component.Host) error {
awsmiddleware.TryConfigure(emf.config.logger, host, *emf.config.MiddlewareID, awsmiddleware.SDKv1(svcStructuredLog.Handlers()))
}

// Below are optimizatons to minimize amoount of
// metrics processing. We have two scearios
// 1. AppSignal - Only run Process function for AppSignal related useragent
// 2. Enhanced Container Insights - Only run ProcessMetrics function for CI EBS related useragent
if emf.config.IsAppSignalsEnabled() || emf.config.IsEnhancedContainerInsights() {
userAgent := useragent.NewUserAgent()
emf.svcStructuredLog.Handlers().Build.PushFrontNamed(userAgent.Handler())
if emf.config.IsAppSignalsEnabled() {
emf.processResourceLabels = userAgent.Process
}
if emf.config.IsEnhancedContainerInsights() {
emf.processMetrics = userAgent.ProcessMetrics
}
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package appsignals // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/appsignals"
package useragent // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/appsignals"

import (
"context"
Expand All @@ -13,6 +13,7 @@ import (

"github.com/aws/aws-sdk-go/aws/request"
"github.com/jellydator/ttlcache/v3"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
)

Expand All @@ -27,12 +28,16 @@ const (

// TODO: Available in semconv/v1.21.0+. Replace after collector dependency is v0.91.0+.
attributeTelemetryDistroVersion = "telemetry.distro.version"

attributeEBS = "ci_ebs"
ebsMetricPrefix = "node_diskio_ebs"
)

type UserAgent struct {
mu sync.RWMutex
prebuiltStr string
cache *ttlcache.Cache[string, string]
featureList map[string]struct{}
}

func NewUserAgent() *UserAgent {
Expand All @@ -45,6 +50,7 @@ func newUserAgent(ttl time.Duration) *UserAgent {
ttlcache.WithTTL[string, string](ttl),
ttlcache.WithCapacity[string, string](cacheSize),
),
featureList: make(map[string]struct{}),
}
ua.cache.OnEviction(func(context.Context, ttlcache.EvictionReason, *ttlcache.Item[string, string]) {
ua.build()
Expand Down Expand Up @@ -87,6 +93,30 @@ func (ua *UserAgent) Process(labels map[string]string) {
}
}

// ProcessMetrics checks metric names for specific patterns and updates user agent accordingly
func (ua *UserAgent) ProcessMetrics(metrics pmetric.Metrics) {
// Check if we've already detected NVME
if _, exists := ua.featureList[attributeEBS]; exists {
return
}

rms := metrics.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
ilms := rms.At(i).ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
ms := ilms.At(j).Metrics()
for k := 0; k < ms.Len(); k++ {
metric := ms.At(k)
if strings.HasPrefix(metric.Name(), ebsMetricPrefix) {
ua.featureList[attributeEBS] = struct{}{}
ua.build()
return
}
}
}
}
}

// build the user agent string from the items in the cache. Format is telemetry-sdk (<lang1>/<ver1>;<lang2>/<ver2>).
func (ua *UserAgent) build() {
ua.mu.Lock()
Expand All @@ -100,6 +130,18 @@ func (ua *UserAgent) build() {
sort.Strings(items)
ua.prebuiltStr = fmt.Sprintf("telemetry-sdk (%s)", strings.Join(items, ";"))
}

if len(ua.featureList) > 0 {
if ua.prebuiltStr != "" {
ua.prebuiltStr += " "
}
var metricTypes []string
for metricType := range ua.featureList {
metricTypes = append(metricTypes, metricType)
}
sort.Strings(metricTypes)
ua.prebuiltStr += fmt.Sprintf("feature:(%s)", strings.Join(metricTypes, " "))
}
}

// formatStr formats the telemetry SDK language and version into the user agent format.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package appsignals
package useragent

import (
"net/http"
Expand All @@ -10,12 +10,14 @@ import (

"github.com/aws/aws-sdk-go/aws/request"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
)

func TestUserAgent(t *testing.T) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a test just for the build() function to ensure it can build multiple feature flags in 1 section?

testCases := map[string]struct {
labelSets []map[string]string
metrics []string // Add metric names to test
want string
}{
"WithEmpty": {},
Expand Down Expand Up @@ -69,13 +71,41 @@ func TestUserAgent(t *testing.T) {
},
want: "telemetry-sdk (incrediblyverboselan/notsemanticversionin)",
},
"WithEBSMetrics": {
metrics: []string{"node_diskio_ebs_something"},
want: "feature:(ci_ebs)",
},
"WithBothTelemetryAndEBS": {
labelSets: []map[string]string{
{
semconv.AttributeTelemetrySDKLanguage: "test",
attributeTelemetryDistroVersion: "1.0",
},
},
metrics: []string{"node_diskio_ebs_something"},
want: "telemetry-sdk (test/1.0) feature:(ci_ebs)",
},
"WithNonEBSMetrics": {
metrics: []string{"some_other_metric"},
want: "",
},
"WithMultipleFeatures": {
metrics: []string{"node_diskio_ebs_something", "node_diskio_ebs_something_else"},
want: "feature:(ci_ebs)",
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
userAgent := NewUserAgent()
for _, labelSet := range testCase.labelSets {
userAgent.Process(labelSet)
}

if len(testCase.metrics) > 0 {
metrics := createTestMetrics(testCase.metrics)
userAgent.ProcessMetrics(metrics)
}

req := &request.Request{
HTTPRequest: &http.Request{
Header: http.Header{},
Expand Down Expand Up @@ -109,3 +139,16 @@ func TestUserAgentExpiration(t *testing.T) {
userAgent.handle(req)
assert.Empty(t, req.HTTPRequest.Header.Get("User-Agent"))
}

func createTestMetrics(metricNames []string) pmetric.Metrics {
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
ilm := rm.ScopeMetrics().AppendEmpty()

for _, name := range metricNames {
metric := ilm.Metrics().AppendEmpty()
metric.SetName(name)
}

return metrics
}
Loading