Skip to content

Add service entity support for EMF exporter #291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type Config struct {
// retaining all the fields / labels in the EMF log except for the section responsible for extraction of metrics.
DisableMetricExtraction bool `mapstructure:"disable_metric_extraction"`

// AddEntity is an option to add entity to the EMF log to correlate related telemetry.
// Setting this to true adds fields such as Service, Environment, etc.
AddEntity bool `mapstructure:"add_entity"`

// ResourceToTelemetrySettings is an option for converting resource attrihutes to telemetry attributes.
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
// If enabled, all the resource attributes will be converted to metric labels by default.
Expand Down
8 changes: 0 additions & 8 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"math"
"strconv"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -19,8 +18,6 @@ import (
)

const (
AWSEntityPrefix = "com.amazonaws.cloudwatch.entity.internal."

summaryCountSuffix = "_count"
summarySumSuffix = "_sum"
)
Expand Down Expand Up @@ -533,11 +530,6 @@ func (dps summaryDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
func createLabels(attributes pcommon.Map) map[string]string {
labels := make(map[string]string, attributes.Len()+1)
attributes.Range(func(k string, v pcommon.Value) bool {
// we don't want to export entity related attributes as dimensions, so we skip these
if strings.HasPrefix(k, AWSEntityPrefix) {
return true
}

labels[k] = v.AsString()
return true
})
Expand Down
1 change: 0 additions & 1 deletion exporter/awsemfexporter/datapoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,6 @@ func TestCreateLabels(t *testing.T) {
"a": "A",
"b": "B",
"c": "C",
"com.amazonaws.cloudwatch.entity.internal.A": "A",
}))

labels := createLabels(labelsMap)
Expand Down
64 changes: 64 additions & 0 deletions exporter/awsemfexporter/internal/entity/entityattributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package entity // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/entity"

const (
// Entity resource attributes in OTLP payload
AWSEntityPrefix = "com.amazonaws.cloudwatch.entity.internal."
AttributeEntityServiceName = AWSEntityPrefix + "service.name"
AttributeEntityDeploymentEnvironment = AWSEntityPrefix + "deployment.environment"
AttributeEntityK8sClusterName = AWSEntityPrefix + "k8s.cluster.name"
AttributeEntityK8sNamespaceName = AWSEntityPrefix + "k8s.namespace.name"
AttributeEntityK8sWorkloadName = AWSEntityPrefix + "k8s.workload.name"
AttributeEntityK8sNodeName = AWSEntityPrefix + "k8s.node.name"
AttributeEntityServiceNameSource = AWSEntityPrefix + "service.name.source"
AttributeEntityPlatformType = AWSEntityPrefix + "platform.type"
AttributeEntityInstanceID = AWSEntityPrefix + "instance.id"

// Entity fields in EMF log
Service = "Service"
Environment = "Environment"
EksCluster = "EKS.Cluster"
K8sCluster = "K8s.Cluster"
K8sNamespace = "K8s.Namespace"
K8sWorkload = "K8s.Workload"
K8sNode = "K8s.Node"
AWSServiceNameSource = "AWS.ServiceNameSource"
PlatformType = "PlatformType"
InstanceID = "EC2.InstanceId"

// Possible values for PlatformType
AttributeEntityEKSPlatform = "AWS::EKS"
AttributeEntityK8sPlatform = "K8s"
)

// attributeEntityToFieldMap maps attribute entity resource attributes to entity fields
var attributeEntityToFieldMap = map[string]string{
AttributeEntityServiceName: Service,
AttributeEntityDeploymentEnvironment: Environment,
AttributeEntityK8sNamespaceName: K8sNamespace,
AttributeEntityK8sWorkloadName: K8sWorkload,
AttributeEntityK8sNodeName: K8sNode,
AttributeEntityPlatformType: PlatformType,
AttributeEntityInstanceID: InstanceID,
AttributeEntityServiceNameSource: AWSServiceNameSource,
}

// GetEntityField returns entity field for provided attribute
func GetEntityField(attribute string, platform string) string {
if attribute == AttributeEntityK8sClusterName {
switch platform {
case AttributeEntityEKSPlatform:
return EksCluster
case AttributeEntityK8sPlatform:
return K8sCluster
}
}

if field, ok := attributeEntityToFieldMap[attribute]; ok {
return field
}

return ""
}
107 changes: 107 additions & 0 deletions exporter/awsemfexporter/internal/entity/entityattributes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package entity

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetEntityField(t *testing.T) {
tests := []struct {
name string
attribute string
value string
want string
}{
{
name: "AttributeEntityServiceName from map",
attribute: AttributeEntityServiceName,
value: "",
want: Service,
},
{
name: "AttributeEntityDeploymentEnvironment from map",
attribute: AttributeEntityDeploymentEnvironment,
value: "",
want: Environment,
},
{
name: "AttributeEntityK8sNamespaceName from map",
attribute: AttributeEntityK8sNamespaceName,
value: "",
want: K8sNamespace,
},
{
name: "AttributeEntityK8sWorkloadName from map",
attribute: AttributeEntityK8sWorkloadName,
value: "",
want: K8sWorkload,
},
{
name: "AttributeEntityK8sNodeName from map",
attribute: AttributeEntityK8sNodeName,
value: "",
want: K8sNode,
},
{
name: "AttributeEntityPlatformType from map",
attribute: AttributeEntityPlatformType,
value: "",
want: PlatformType,
},
{
name: "AttributeEntityInstanceID from map",
attribute: AttributeEntityInstanceID,
value: "",
want: InstanceID,
},
{
name: "AttributeEntityServiceNameSource from map",
attribute: AttributeEntityServiceNameSource,
value: "",
want: AWSServiceNameSource,
},
{
name: "K8sClusterName with EKSPlatform",
attribute: AttributeEntityK8sClusterName,
value: AttributeEntityEKSPlatform,
want: EksCluster,
},
{
name: "K8sClusterName with K8sPlatform",
attribute: AttributeEntityK8sClusterName,
value: AttributeEntityK8sPlatform,
want: K8sCluster,
},
{
name: "K8sClusterName with unknown platform",
attribute: AttributeEntityK8sClusterName,
value: "unknown",
want: "",
},
{
name: "Unknown attribute",
attribute: "unknown",
value: "",
want: "",
},
{
name: "K8sClusterName with no values provided",
attribute: AttributeEntityK8sClusterName,
value: "",
want: "",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := GetEntityField(tc.attribute, tc.value)
assert.Equalf(t, tc.want, got,
"GetEntityField(%q, %v) = %q; want %q",
tc.attribute, tc.value, got, tc.want)
})
}
}
32 changes: 26 additions & 6 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter/internal/entity"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
)
Expand Down Expand Up @@ -184,7 +185,7 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri

// translateGroupedMetricToCWMetric converts Grouped Metric format to CloudWatch Metric format.
func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Config) *cWMetrics {
labels := filterAWSEMFAttributes(groupedMetric.labels)
labels := filterAWSEMFAttributes(groupedMetric.labels, false)
fieldsLength := len(labels) + len(groupedMetric.metrics)

isPrometheusMetric := groupedMetric.metadata.receiver == prometheusReceiver
Expand All @@ -195,7 +196,24 @@ func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Conf

// Add labels to fields
for k, v := range labels {
fields[k] = v
if !strings.HasPrefix(k, entity.AWSEntityPrefix) {
fields[k] = v
continue
}

if config.AddEntity {
// This check is needed to determine whether to use EKS.Cluster or K8s.Cluster
if k == entity.AttributeEntityK8sClusterName {
if entityField := entity.GetEntityField(k, labels[entity.AttributeEntityPlatformType]); entityField != "" {
fields[entityField] = v
}
continue
}

if entityField := entity.GetEntityField(k, ""); entityField != "" {
fields[entityField] = v
}
}
}
// Add metrics to fields
for metricName, metricInfo := range groupedMetric.metrics {
Expand Down Expand Up @@ -228,7 +246,8 @@ func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Conf

// groupedMetricToCWMeasurement creates a single CW Measurement from a grouped metric.
func groupedMetricToCWMeasurement(groupedMetric *groupedMetric, config *Config) cWMeasurement {
labels := filterAWSEMFAttributes(groupedMetric.labels)
labels := filterAWSEMFAttributes(groupedMetric.labels, true)

dimensionRollupOption := config.DimensionRollupOption

// Create a dimension set containing list of label names
Expand Down Expand Up @@ -287,7 +306,7 @@ func groupedMetricToCWMeasurement(groupedMetric *groupedMetric, config *Config)
// groupedMetricToCWMeasurementsWithFilters filters the grouped metric using the given list of metric
// declarations and returns the corresponding list of CW Measurements.
func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, config *Config) (cWMeasurements []cWMeasurement) {
labels := filterAWSEMFAttributes(groupedMetric.labels)
labels := filterAWSEMFAttributes(groupedMetric.labels, true)

// Filter metric declarations by labels
metricDeclarations := make([]*MetricDeclaration, 0, len(config.MetricDeclarations))
Expand Down Expand Up @@ -539,11 +558,12 @@ func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, d
return event, nil
}

func filterAWSEMFAttributes(labels map[string]string) map[string]string {
func filterAWSEMFAttributes(labels map[string]string, removeEntity bool) map[string]string {
// remove any labels that are attributes specific to AWS EMF Exporter
filteredLabels := make(map[string]string)
for labelName := range labels {
if labelName != emfStorageResolutionAttribute {
if labelName != emfStorageResolutionAttribute &&
(!removeEntity || !strings.HasPrefix(labelName, entity.AWSEntityPrefix)) {
filteredLabels[labelName] = labels[labelName]
}
}
Expand Down
Loading
Loading