diff --git a/.chloggen/enhance-countconnector.yaml b/.chloggen/enhance-countconnector.yaml new file mode 100644 index 0000000000000..83cc50b2d8950 --- /dev/null +++ b/.chloggen/enhance-countconnector.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: connector/count + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Support for setting attributes from scope and resource levels. Precedence order: Span (or Log Record, etc.) > Scope attributes > Resource attributes." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41859] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/connector/countconnector/README.md b/connector/countconnector/README.md index 7dfb77847b58f..5e2073820f832 100644 --- a/connector/countconnector/README.md +++ b/connector/countconnector/README.md @@ -99,7 +99,8 @@ connectors: #### Attributes -`spans`, `spanevents`, `datapoints`, and `logs` may be counted according to attributes. +`spans`, `spanevents`, `datapoints`, and `logs` may be counted according to attributes. In such cases, attribute precedence follows this order: +span(logRecord, DataPoint, profile) attributes > scope attributes > resource attributes. If attributes are specified for custom metrics, a separate count will be generated for each unique set of attribute values. Each count will be emitted as a data point on the same metric. diff --git a/connector/countconnector/connector.go b/connector/countconnector/connector.go index 4e86d97c6709d..33cfe218215e2 100644 --- a/connector/countconnector/connector.go +++ b/connector/countconnector/connector.go @@ -50,24 +50,26 @@ func (c *count) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { countMetrics.ResourceMetrics().EnsureCapacity(td.ResourceSpans().Len()) for i := 0; i < td.ResourceSpans().Len(); i++ { resourceSpan := td.ResourceSpans().At(i) + resourceAttrs := resourceSpan.Resource().Attributes() spansCounter := newCounter[ottlspan.TransformContext](c.spansMetricDefs) spanEventsCounter := newCounter[ottlspanevent.TransformContext](c.spanEventsMetricDefs) for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ { scopeSpan := resourceSpan.ScopeSpans().At(j) + scopeAttrs := scopeSpan.Scope().Attributes() for k := 0; k < scopeSpan.Spans().Len(); k++ { span := scopeSpan.Spans().At(k) spansCounter.updateTimestamp(span.StartTimestamp()) spansCounter.updateTimestamp(span.EndTimestamp()) sCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan) - multiError = errors.Join(multiError, spansCounter.update(ctx, span.Attributes(), sCtx)) + multiError = errors.Join(multiError, spansCounter.update(ctx, span.Attributes(), scopeAttrs, resourceAttrs, sCtx)) for l := 0; l < span.Events().Len(); l++ { event := span.Events().At(l) spanEventsCounter.updateTimestamp(event.Timestamp()) eCtx := ottlspanevent.NewTransformContext(event, span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan) - multiError = errors.Join(multiError, spanEventsCounter.update(ctx, event.Attributes(), eCtx)) + multiError = errors.Join(multiError, spanEventsCounter.update(ctx, event.Attributes(), scopeAttrs, resourceAttrs, eCtx)) } } } @@ -98,16 +100,18 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { countMetrics.ResourceMetrics().EnsureCapacity(md.ResourceMetrics().Len()) for i := 0; i < md.ResourceMetrics().Len(); i++ { resourceMetric := md.ResourceMetrics().At(i) + resourceAttrs := resourceMetric.Resource().Attributes() metricsCounter := newCounter[ottlmetric.TransformContext](c.metricsMetricDefs) dataPointsCounter := newCounter[ottldatapoint.TransformContext](c.dataPointsMetricDefs) for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ { scopeMetrics := resourceMetric.ScopeMetrics().At(j) + scopeAttrs := scopeMetrics.Scope().Attributes() for k := 0; k < scopeMetrics.Metrics().Len(); k++ { metric := scopeMetrics.Metrics().At(k) mCtx := ottlmetric.NewTransformContext(metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) - multiError = errors.Join(multiError, metricsCounter.update(ctx, pcommon.NewMap(), mCtx)) + multiError = errors.Join(multiError, metricsCounter.update(ctx, pcommon.NewMap(), scopeAttrs, resourceAttrs, mCtx)) //exhaustive:enforce switch metric.Type() { @@ -117,7 +121,7 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { dp := dps.At(i) dataPointsCounter.updateTimestamp(dp.Timestamp()) dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) - multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx)) + multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx)) } case pmetric.MetricTypeSum: dps := metric.Sum().DataPoints() @@ -125,7 +129,7 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { dp := dps.At(i) dataPointsCounter.updateTimestamp(dp.Timestamp()) dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) - multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx)) + multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx)) } case pmetric.MetricTypeSummary: dps := metric.Summary().DataPoints() @@ -133,7 +137,7 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { dp := dps.At(i) dataPointsCounter.updateTimestamp(dp.Timestamp()) dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) - multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx)) + multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx)) } case pmetric.MetricTypeHistogram: dps := metric.Histogram().DataPoints() @@ -141,7 +145,7 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { dp := dps.At(i) dataPointsCounter.updateTimestamp(dp.Timestamp()) dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) - multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx)) + multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx)) } case pmetric.MetricTypeExponentialHistogram: dps := metric.ExponentialHistogram().DataPoints() @@ -149,7 +153,7 @@ func (c *count) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { dp := dps.At(i) dataPointsCounter.updateTimestamp(dp.Timestamp()) dCtx := ottldatapoint.NewTransformContext(dp, metric, scopeMetrics.Metrics(), scopeMetrics.Scope(), resourceMetric.Resource(), scopeMetrics, resourceMetric) - multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), dCtx)) + multiError = errors.Join(multiError, dataPointsCounter.update(ctx, dp.Attributes(), scopeAttrs, resourceAttrs, dCtx)) } case pmetric.MetricTypeEmpty: multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type())) @@ -183,16 +187,18 @@ func (c *count) ConsumeLogs(ctx context.Context, ld plog.Logs) error { countMetrics.ResourceMetrics().EnsureCapacity(ld.ResourceLogs().Len()) for i := 0; i < ld.ResourceLogs().Len(); i++ { resourceLog := ld.ResourceLogs().At(i) + resourceAttrs := resourceLog.Resource().Attributes() counter := newCounter[ottllog.TransformContext](c.logsMetricDefs) for j := 0; j < resourceLog.ScopeLogs().Len(); j++ { scopeLogs := resourceLog.ScopeLogs().At(j) + scopeAttrs := scopeLogs.Scope().Attributes() for k := 0; k < scopeLogs.LogRecords().Len(); k++ { logRecord := scopeLogs.LogRecords().At(k) counter.updateTimestamp(logRecord.Timestamp()) lCtx := ottllog.NewTransformContext(logRecord, scopeLogs.Scope(), resourceLog.Resource(), scopeLogs, resourceLog) - multiError = errors.Join(multiError, counter.update(ctx, logRecord.Attributes(), lCtx)) + multiError = errors.Join(multiError, counter.update(ctx, logRecord.Attributes(), scopeAttrs, resourceAttrs, lCtx)) } } @@ -221,17 +227,19 @@ func (c *count) ConsumeProfiles(ctx context.Context, ld pprofile.Profiles) error countMetrics.ResourceMetrics().EnsureCapacity(ld.ResourceProfiles().Len()) for i := 0; i < ld.ResourceProfiles().Len(); i++ { resourceProfile := ld.ResourceProfiles().At(i) + resourceAttrs := resourceProfile.Resource().Attributes() counter := newCounter[ottlprofile.TransformContext](c.profilesMetricDefs) for j := 0; j < resourceProfile.ScopeProfiles().Len(); j++ { scopeProfile := resourceProfile.ScopeProfiles().At(j) + scopeAttrs := scopeProfile.Scope().Attributes() for k := 0; k < scopeProfile.Profiles().Len(); k++ { profile := scopeProfile.Profiles().At(k) counter.updateTimestamp(profile.Time()) pCtx := ottlprofile.NewTransformContext(profile, ld.Dictionary(), scopeProfile.Scope(), resourceProfile.Resource(), scopeProfile, resourceProfile) attributes := pprofile.FromAttributeIndices(ld.Dictionary().AttributeTable(), profile, ld.Dictionary()) - multiError = errors.Join(multiError, counter.update(ctx, attributes, pCtx)) + multiError = errors.Join(multiError, counter.update(ctx, attributes, scopeAttrs, resourceAttrs, pCtx)) } } diff --git a/connector/countconnector/counter.go b/connector/countconnector/counter.go index 9346350ddde43..2098df40e3bf0 100644 --- a/connector/countconnector/counter.go +++ b/connector/countconnector/counter.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + utilattri "github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" ) @@ -35,34 +36,46 @@ type attrCounter struct { count uint64 } -func (c *counter[K]) update(ctx context.Context, attrs pcommon.Map, tCtx K) error { +func (c *counter[K]) update(ctx context.Context, attrs, scopeAttrs, resourceAttrs pcommon.Map, tCtx K) error { var multiError error for name, md := range c.metricDefs { countAttrs := pcommon.NewMap() for _, attr := range md.attrs { - if attrVal, ok := attrs.Get(attr.Key); ok { - switch typeAttr := attrVal.Type(); typeAttr { + dimension := utilattri.Dimension{ + Name: attr.Key, + Value: func() *pcommon.Value { + if attr.DefaultValue != nil { + switch v := attr.DefaultValue.(type) { + case string: + if v != "" { + strV := pcommon.NewValueStr(v) + return &strV + } + case int: + if v != 0 { + intV := pcommon.NewValueInt(int64(v)) + return &intV + } + case float64: + if v != 0 { + floatV := pcommon.NewValueDouble(v) + return &floatV + } + } + } + + return nil + }(), + } + value, ok := utilattri.GetDimensionValue(dimension, attrs, scopeAttrs, resourceAttrs) + if ok { + switch value.Type() { case pcommon.ValueTypeInt: - countAttrs.PutInt(attr.Key, attrVal.Int()) + countAttrs.PutInt(attr.Key, value.Int()) case pcommon.ValueTypeDouble: - countAttrs.PutDouble(attr.Key, attrVal.Double()) + countAttrs.PutDouble(attr.Key, value.Double()) default: - countAttrs.PutStr(attr.Key, attrVal.Str()) - } - } else if attr.DefaultValue != nil { - switch v := attr.DefaultValue.(type) { - case string: - if v != "" { - countAttrs.PutStr(attr.Key, v) - } - case int: - if v != 0 { - countAttrs.PutInt(attr.Key, int64(v)) - } - case float64: - if v != 0 { - countAttrs.PutDouble(attr.Key, float64(v)) - } + countAttrs.PutStr(attr.Key, value.Str()) } } } diff --git a/connector/countconnector/counter_test.go b/connector/countconnector/counter_test.go new file mode 100644 index 0000000000000..9d255d0d1a27f --- /dev/null +++ b/connector/countconnector/counter_test.go @@ -0,0 +1,143 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package countconnector + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" +) + +func Test_update_attributes(t *testing.T) { + spanMetricDefs := make(map[string]metricDef[ottlspan.TransformContext]) + spanMetricDefs[defaultMetricNameSpans] = metricDef[ottlspan.TransformContext]{ + desc: defaultMetricDescSpans, + attrs: []AttributeConfig{ + { + Key: "component", + DefaultValue: "default", + }, + { + Key: "version", + DefaultValue: "default", + }, + }, + } + + tests := []struct { + name string + resourceAttr pcommon.Map + scopeAttr pcommon.Map + spanAttr pcommon.Map + expectedAttr pcommon.Map + }{ + { + name: "attributes from DefaultValue", + resourceAttr: pcommon.NewMap(), + scopeAttr: pcommon.NewMap(), + spanAttr: pcommon.NewMap(), + expectedAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "default") + res.PutStr("version", "default") + return res + }(), + }, + { + name: "attributes from resourceAttr", + resourceAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "otelcol") + res.PutStr("version", "v1.31.0") + return res + }(), + scopeAttr: pcommon.NewMap(), + spanAttr: pcommon.NewMap(), + expectedAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "otelcol") + res.PutStr("version", "v1.31.0") + return res + }(), + }, + { + name: "attributes from scopeAttr", + resourceAttr: pcommon.NewMap(), + scopeAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "otelcol") + res.PutStr("version", "v1.31.0") + return res + }(), + spanAttr: pcommon.NewMap(), + expectedAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "otelcol") + res.PutStr("version", "v1.31.0") + return res + }(), + }, + { + name: "attributes from spanAttr", + resourceAttr: pcommon.NewMap(), + scopeAttr: pcommon.NewMap(), + spanAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "otelcol") + res.PutStr("version", "v1.31.0") + return res + }(), + expectedAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "otelcol") + res.PutStr("version", "v1.31.0") + return res + }(), + }, + { + name: "attributes with order: spanAttr > scopeAttr > resourceAttr", + resourceAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "value-from-resourceAttr") + res.PutStr("version", "v1.31.0") + return res + }(), + scopeAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "value-from-scopeAttr") + res.PutStr("version", "value-from-scopeAttr") + return res + }(), + spanAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "value-from-spanAttr") + return res + }(), + expectedAttr: func() pcommon.Map { + res := pcommon.NewMap() + res.PutStr("component", "value-from-spanAttr") + res.PutStr("version", "value-from-scopeAttr") + return res + }(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + spansCounter := newCounter[ottlspan.TransformContext](spanMetricDefs) + err := spansCounter.update(t.Context(), tt.spanAttr, tt.scopeAttr, tt.resourceAttr, ottlspan.TransformContext{}) + require.NoError(t, err) + require.NotNil(t, spansCounter) + m := spansCounter.counts[defaultMetricNameSpans] + expectKey := pdatautil.MapHash(tt.expectedAttr) + attrCount, ok := m[expectKey] + require.True(t, ok) + require.NotNil(t, attrCount) + }) + } +} diff --git a/connector/countconnector/go.mod b/connector/countconnector/go.mod index b3dfff22bce64..697fde16846c4 100644 --- a/connector/countconnector/go.mod +++ b/connector/countconnector/go.mod @@ -4,6 +4,7 @@ go 1.24.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.138.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.138.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.138.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.138.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.138.0 @@ -49,6 +50,7 @@ require ( github.com/knadh/koanf/maps v0.1.2 // indirect github.com/knadh/koanf/providers/confmap v1.0.0 // indirect github.com/knadh/koanf/v2 v2.3.0 // indirect + github.com/lightstep/go-expohisto v1.0.0 // indirect github.com/magefile/mage v1.15.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect @@ -101,3 +103,5 @@ retract ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil diff --git a/connector/countconnector/go.sum b/connector/countconnector/go.sum index 2bfcfb7c6fa63..0ccbb7d5c78f8 100644 --- a/connector/countconnector/go.sum +++ b/connector/countconnector/go.sum @@ -66,6 +66,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lightstep/go-expohisto v1.0.0 h1:UPtTS1rGdtehbbAF7o/dhkWLTDI73UifG8LbfQI7cA4= +github.com/lightstep/go-expohisto v1.0.0/go.mod h1:xDXD0++Mu2FOaItXtdDfksfgxfV0z1TMPa+e/EUd0cs= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=