From 36fb2181d6eabdb41be8e3cb1e11f189268dab3d Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Tue, 15 Jul 2025 14:18:39 +0200 Subject: [PATCH 01/14] Parsing and emitting azureeventhub app metrics --- .../azureresourcemetrics_unmarshaler.go | 299 +++++++++++++----- .../azureresourcemetrics_unmarshaler_test.go | 68 ++++ 2 files changed, 294 insertions(+), 73 deletions(-) create mode 100644 receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index 6ebb5c589cce1..11df579f9c0ce 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -28,15 +28,60 @@ type azureResourceMetricsUnmarshaler struct { TimeFormat []string } +type azureResourceMetricsConfiger interface { + GetLogger() *zap.Logger + GetBuildVersion() string + GetTimeFormat() []string +} + // azureMetricRecords represents an array of Azure metric records // as exported via an Azure Event Hub type azureMetricRecords struct { - Records []azureMetricRecord `json:"records"` + Records []azureGenericMetricRecord `json:"records"` +} + +type azureMetricAppender interface { + AppendMetric(azureResourceMetricsConfiger, *pmetric.Metrics) error +} + +type azureGenericMetricRecord struct { + Type string + Record azureMetricAppender +} + +func (r *azureGenericMetricRecord) UnmarshalJSON(data []byte) error { + var recordWithType struct { + Type string `json:"Type"` + } + typeDecoder := jsoniter.NewDecoder(bytes.NewReader(data)) + err := typeDecoder.Decode(&recordWithType) + + if err != nil { + return err + } + + r.Type = recordWithType.Type + + switch r.Type { + case "AppMetrics": + r.Record = &azureAppMetricRecord{} + default: + r.Record = &azureResourceMetricRecord{} + } + + recordDecoder := jsoniter.NewDecoder(bytes.NewReader(data)) + err = recordDecoder.Decode(r.Record) + + if err != nil { + return err + } + + return nil } // azureMetricRecord represents a single Azure Metric following // the common schema does not exist (yet): -type azureMetricRecord struct { +type azureResourceMetricRecord struct { Time string `json:"time"` ResourceID string `json:"resourceId"` MetricName string `json:"metricName"` @@ -48,8 +93,168 @@ type azureMetricRecord struct { Average float64 `json:"average"` } +func (r *azureResourceMetricRecord) AppendMetric(c azureResourceMetricsConfiger, md *pmetric.Metrics) error { + resourceMetrics := md.ResourceMetrics().AppendEmpty() + + resource := resourceMetrics.Resource() + resource.Attributes().PutStr(string(conventions.TelemetrySDKNameKey), metadata.ScopeName) + resource.Attributes().PutStr(string(conventions.TelemetrySDKLanguageKey), conventions.TelemetrySDKLanguageGo.Value.AsString()) + resource.Attributes().PutStr(string(conventions.TelemetrySDKVersionKey), c.GetBuildVersion()) + resource.Attributes().PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAzure.Value.AsString()) + + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + + metrics := scopeMetrics.Metrics() + metrics.EnsureCapacity(5) + + if r.ResourceID != "" { + resourceMetrics.Resource().Attributes().PutStr(azureResourceID, r.ResourceID) + } else { + c.GetLogger().Warn("No ResourceID Set on Metrics!") + } + + nanos, err := asTimestamp(r.Time, c.GetTimeFormat()) + if err != nil { + c.GetLogger().Warn("Invalid Timestamp", zap.String("time", r.Time)) + return err + } + + var startTimestamp pcommon.Timestamp + if r.TimeGrain != "PT1M" { + c.GetLogger().Warn("Unhandled Time Grain", zap.String("timegrain", r.TimeGrain)) + return err + } + + startTimestamp = pcommon.NewTimestampFromTime(nanos.AsTime().Add(-time.Minute)) + + metricTotal := metrics.AppendEmpty() + metricTotal.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Total"))) + dpTotal := metricTotal.SetEmptyGauge().DataPoints().AppendEmpty() + dpTotal.SetStartTimestamp(startTimestamp) + dpTotal.SetTimestamp(nanos) + dpTotal.SetDoubleValue(r.Total) + + metricCount := metrics.AppendEmpty() + metricCount.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Count"))) + dpCount := metricCount.SetEmptyGauge().DataPoints().AppendEmpty() + dpCount.SetStartTimestamp(startTimestamp) + dpCount.SetTimestamp(nanos) + dpCount.SetDoubleValue(r.Count) + + metricMin := metrics.AppendEmpty() + metricMin.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Minimum"))) + dpMin := metricMin.SetEmptyGauge().DataPoints().AppendEmpty() + dpMin.SetStartTimestamp(startTimestamp) + dpMin.SetTimestamp(nanos) + dpMin.SetDoubleValue(r.Minimum) + + metricMax := metrics.AppendEmpty() + metricMax.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Maximum"))) + dpMax := metricMax.SetEmptyGauge().DataPoints().AppendEmpty() + dpMax.SetStartTimestamp(startTimestamp) + dpMax.SetTimestamp(nanos) + dpMax.SetDoubleValue(r.Maximum) + + metricAverage := metrics.AppendEmpty() + metricAverage.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Average"))) + dpAverage := metricAverage.SetEmptyGauge().DataPoints().AppendEmpty() + dpAverage.SetStartTimestamp(startTimestamp) + dpAverage.SetTimestamp(nanos) + dpAverage.SetDoubleValue(r.Average) + + return nil +} + +// azureMetricRecord represents a single Azure Metric following +// the common schema does not exist (yet): +type azureAppMetricRecord struct { + Time string `json:"time"` + ResourceID string `json:"resourceId"` + Type string `json:"Type"` + + AppRoleInstance string `json:"AppRoleInstance"` + AppRoleName string `json:"AppRoleName"` + AppVersion string `json:"AppVersion"` + SDKVersion string `json:"SDKVersion"` + + ClientBrowser string `json:"ClientBrowser"` + ClientCity string `json:"ClientCity"` + ClientCountryOrRegion string `json:"ClientCountryOrRegion"` + ClientIP string `json:"ClientIP"` + ClientModel string `json:"ClientModel"` + ClientOS string `json:"ClientOS"` + ClientStateOrProvince string `json:"ClientStateOrProvince"` + ClientType string `json:"ClientType"` + + MetricName string `json:"Name"` + Total float64 `json:"Sum"` + Minimum float64 `json:"Min"` + Maximum float64 `json:"Max"` + Count float64 `json:"ItemCount"` +} + +func (r *azureAppMetricRecord) AppendMetric(c azureResourceMetricsConfiger, md *pmetric.Metrics) error { + resourceMetrics := md.ResourceMetrics().AppendEmpty() + + resource := resourceMetrics.Resource() + resource.Attributes().PutStr(string(conventions.TelemetrySDKVersionKey), r.SDKVersion) + resource.Attributes().PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAzure.Value.AsString()) + resource.Attributes().PutStr(string(conventions.ServiceInstanceIDKey), r.AppRoleInstance) + resource.Attributes().PutStr(string(conventions.ServiceNameKey), r.AppRoleName) + resource.Attributes().PutStr(string(conventions.ServiceVersionKey), r.AppVersion) + + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + + metrics := scopeMetrics.Metrics() + metrics.EnsureCapacity(4) + + if r.ResourceID != "" { + resourceMetrics.Resource().Attributes().PutStr(azureResourceID, r.ResourceID) + } else { + c.GetLogger().Warn("No ResourceID Set on Metrics!") + } + + nanos, err := asTimestamp(r.Time, c.GetTimeFormat()) + if err != nil { + c.GetLogger().Warn("Invalid Timestamp", zap.String("time", r.Time)) + return err + } + + startTimestamp := pcommon.NewTimestampFromTime(nanos.AsTime().Add(-time.Minute)) + + metricTotal := metrics.AppendEmpty() + metricTotal.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Total"))) + dpTotal := metricTotal.SetEmptyGauge().DataPoints().AppendEmpty() + dpTotal.SetStartTimestamp(startTimestamp) + dpTotal.SetTimestamp(nanos) + dpTotal.SetDoubleValue(r.Total) + + metricCount := metrics.AppendEmpty() + metricCount.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Count"))) + dpCount := metricCount.SetEmptyGauge().DataPoints().AppendEmpty() + dpCount.SetStartTimestamp(startTimestamp) + dpCount.SetTimestamp(nanos) + dpCount.SetDoubleValue(r.Count) + + metricMin := metrics.AppendEmpty() + metricMin.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Minimum"))) + dpMin := metricMin.SetEmptyGauge().DataPoints().AppendEmpty() + dpMin.SetStartTimestamp(startTimestamp) + dpMin.SetTimestamp(nanos) + dpMin.SetDoubleValue(r.Minimum) + + metricMax := metrics.AppendEmpty() + metricMax.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Maximum"))) + dpMax := metricMax.SetEmptyGauge().DataPoints().AppendEmpty() + dpMax.SetStartTimestamp(startTimestamp) + dpMax.SetTimestamp(nanos) + dpMax.SetDoubleValue(r.Maximum) + + return nil +} + func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, timeFormat []string) eventMetricsUnmarshaler { - return azureResourceMetricsUnmarshaler{ + return &azureResourceMetricsUnmarshaler{ buildInfo: buildInfo, logger: logger, TimeFormat: timeFormat, @@ -61,92 +266,40 @@ func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *z // an OpenTelemetry pmetric.Metrics object. The data in the Azure // metric record appears as fields and attributes in the // OpenTelemetry representation; -func (r azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *azureEvent) (pmetric.Metrics, error) { +func (r *azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *azureEvent) (pmetric.Metrics, error) { md := pmetric.NewMetrics() var azureMetrics azureMetricRecords decoder := jsoniter.NewDecoder(bytes.NewReader(event.Data())) err := decoder.Decode(&azureMetrics) + //err := jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(event.Data, &azureMetrics) + if err != nil { return md, err } - resourceMetrics := md.ResourceMetrics().AppendEmpty() - resource := resourceMetrics.Resource() - resource.Attributes().PutStr(string(conventions.TelemetrySDKNameKey), metadata.ScopeName) - resource.Attributes().PutStr(string(conventions.TelemetrySDKLanguageKey), conventions.TelemetrySDKLanguageGo.Value.AsString()) - resource.Attributes().PutStr(string(conventions.TelemetrySDKVersionKey), r.buildInfo.Version) - resource.Attributes().PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAzure.Value.AsString()) - - scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() - - metrics := scopeMetrics.Metrics() - metrics.EnsureCapacity(len(azureMetrics.Records) * 5) - - resourceID := "" - for _, azureMetric := range azureMetrics.Records { - if resourceID == "" && azureMetric.ResourceID != "" { - resourceID = azureMetric.ResourceID - } - - nanos, err := asTimestamp(azureMetric.Time, r.TimeFormat) + for _, mr := range azureMetrics.Records { + err := mr.Record.AppendMetric(r, &md) if err != nil { - r.logger.Warn("Invalid Timestamp", zap.String("time", azureMetric.Time)) - continue - } - - var startTimestamp pcommon.Timestamp - if azureMetric.TimeGrain != "PT1M" { - r.logger.Warn("Unhandled Time Grain", zap.String("timegrain", azureMetric.TimeGrain)) - continue + r.logger.Warn("Failed to append metric", zap.Error(err)) } - startTimestamp = pcommon.NewTimestampFromTime(nanos.AsTime().Add(-time.Minute)) - - metricTotal := metrics.AppendEmpty() - metricTotal.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Total"))) - dpTotal := metricTotal.SetEmptyGauge().DataPoints().AppendEmpty() - dpTotal.SetStartTimestamp(startTimestamp) - dpTotal.SetTimestamp(nanos) - dpTotal.SetDoubleValue(azureMetric.Total) - - metricCount := metrics.AppendEmpty() - metricCount.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Count"))) - dpCount := metricCount.SetEmptyGauge().DataPoints().AppendEmpty() - dpCount.SetStartTimestamp(startTimestamp) - dpCount.SetTimestamp(nanos) - dpCount.SetDoubleValue(azureMetric.Count) - - metricMin := metrics.AppendEmpty() - metricMin.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Minimum"))) - dpMin := metricMin.SetEmptyGauge().DataPoints().AppendEmpty() - dpMin.SetStartTimestamp(startTimestamp) - dpMin.SetTimestamp(nanos) - dpMin.SetDoubleValue(azureMetric.Minimum) - - metricMax := metrics.AppendEmpty() - metricMax.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Maximum"))) - dpMax := metricMax.SetEmptyGauge().DataPoints().AppendEmpty() - dpMax.SetStartTimestamp(startTimestamp) - dpMax.SetTimestamp(nanos) - dpMax.SetDoubleValue(azureMetric.Maximum) - - metricAverage := metrics.AppendEmpty() - metricAverage.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Average"))) - dpAverage := metricAverage.SetEmptyGauge().DataPoints().AppendEmpty() - dpAverage.SetStartTimestamp(startTimestamp) - dpAverage.SetTimestamp(nanos) - dpAverage.SetDoubleValue(azureMetric.Average) - } - - if resourceID != "" { - resourceMetrics.Resource().Attributes().PutStr(azureResourceID, resourceID) - } else { - r.logger.Warn("No ResourceID Set on Metrics!") } return md, nil } +func (r *azureResourceMetricsUnmarshaler) GetLogger() *zap.Logger { + return r.logger +} + +func (r *azureResourceMetricsUnmarshaler) GetBuildVersion() string { + return r.buildInfo.Version +} + +func (r *azureResourceMetricsUnmarshaler) GetTimeFormat() []string { + return r.TimeFormat +} + // asTimestamp will parse an ISO8601 string into an OpenTelemetry // nanosecond timestamp. If the string cannot be parsed, it will // return zero and the error. diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go new file mode 100644 index 0000000000000..32fd89792d4f0 --- /dev/null +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go @@ -0,0 +1,68 @@ +package azureeventhubreceiver + +import ( + "testing" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" +) + +func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) { + encodedMetrics := `{"records":[ +{ + "count":23, + "total":12292.1382, + "minimum":27.4786, + "maximum":6695.419, + "average":534.440791304348, + "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "time":"2025-07-14T12:45:00.0000000Z", + "metricName":"dependencies/duration", + "timeGrain":"PT1M" +}, +{ + "time":"2025-07-14T12:35:36.3259399Z", + "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "ResourceGUID":"00000000-0000-0000-0000-000000000000", + "Type":"AppMetrics", + "AppRoleInstance":"00000000-0000-0000-0000-000000000000", + "AppRoleName":"service", + "AppVersion":"1.0.0.0", + "ClientBrowser":"Other", + "ClientCity":"City", + "ClientCountryOrRegion":"Country", + "ClientIP":"0.0.0.0", + "ClientModel":"Other", + "ClientOS":"Linux", + "ClientStateOrProvince":"Province", + "ClientType":"PC", + "IKey":"00000000-0000-0000-0000-000000000000", + "_BilledSize":444, + "SDKVersion":"dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", + "Name":"metric.name", + "Sum":8, + "Min":8, + "Max":8, + "ItemCount":1 +} +]}` + event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}} + logger := zap.NewNop() + unmarshaler := newAzureResourceMetricsUnmarshaler( + component.BuildInfo{ + Command: "Test", + Description: "Test", + Version: "Test", + }, + logger, + []string{ + "", + }, + ) + metrics, err := unmarshaler.UnmarshalMetrics(&event) + + assert.NoError(t, err) + assert.Equal(t, 9, metrics.MetricCount()) +} From f0434ab510634505de1961353ae5ba2b0fdadd67 Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Wed, 16 Jul 2025 14:44:25 +0200 Subject: [PATCH 02/14] Static temporary monitor scope --- extension/azureauthextension/extension.go | 2 +- .../azureeventhubreceiver/azureresourcemetrics_unmarshaler.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/extension/azureauthextension/extension.go b/extension/azureauthextension/extension.go index cf03aa823a08e..4a5a241904328 100644 --- a/extension/azureauthextension/extension.go +++ b/extension/azureauthextension/extension.go @@ -167,7 +167,7 @@ func (a *authenticator) getTokenForHost(ctx context.Context, host string) (strin // Example: if host is "management.azure.com", then the scope to get the // token will be "https://management.azure.com/.default". // See default scope: https://learn.microsoft.com/en-us/entra/identity-platform/scopes-oidc#the-default-scope. - fmt.Sprintf("https://%s/.default", host), + "https://monitor.azure.com/.default", }, } diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index 11df579f9c0ce..432cefb65d95d 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -272,7 +272,6 @@ func (r *azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *azureEvent) (p var azureMetrics azureMetricRecords decoder := jsoniter.NewDecoder(bytes.NewReader(event.Data())) err := decoder.Decode(&azureMetrics) - //err := jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(event.Data, &azureMetrics) if err != nil { return md, err From 008c96b4e3066bf1bb8a280f620329d7b07a08f9 Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Thu, 17 Jul 2025 11:44:36 +0200 Subject: [PATCH 03/14] Adds extra attributes from app metrics --- .../azureresourcemetrics_unmarshaler.go | 64 ++++++++------- .../azureresourcemetrics_unmarshaler_test.go | 78 +++++++++++++++++++ 2 files changed, 113 insertions(+), 29 deletions(-) diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index 432cefb65d95d..bb5d3a355f5d6 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -6,6 +6,7 @@ package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry import ( "bytes" "fmt" + "slices" "strings" "time" @@ -41,28 +42,25 @@ type azureMetricRecords struct { } type azureMetricAppender interface { - AppendMetric(azureResourceMetricsConfiger, *pmetric.Metrics) error + AppendMetrics(azureResourceMetricsConfiger, map[string]interface{}, *pmetric.Metrics) error } type azureGenericMetricRecord struct { - Type string Record azureMetricAppender + Fields map[string]interface{} } func (r *azureGenericMetricRecord) UnmarshalJSON(data []byte) error { - var recordWithType struct { - Type string `json:"Type"` - } typeDecoder := jsoniter.NewDecoder(bytes.NewReader(data)) - err := typeDecoder.Decode(&recordWithType) + err := typeDecoder.Decode(&r.Fields) if err != nil { return err } - r.Type = recordWithType.Type + recordType, _ := r.Fields["Type"].(string) - switch r.Type { + switch recordType { case "AppMetrics": r.Record = &azureAppMetricRecord{} default: @@ -93,7 +91,7 @@ type azureResourceMetricRecord struct { Average float64 `json:"average"` } -func (r *azureResourceMetricRecord) AppendMetric(c azureResourceMetricsConfiger, md *pmetric.Metrics) error { +func (r *azureResourceMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, _ map[string]interface{}, md *pmetric.Metrics) error { resourceMetrics := md.ResourceMetrics().AppendEmpty() resource := resourceMetrics.Resource() @@ -168,24 +166,13 @@ func (r *azureResourceMetricRecord) AppendMetric(c azureResourceMetricsConfiger, // azureMetricRecord represents a single Azure Metric following // the common schema does not exist (yet): type azureAppMetricRecord struct { - Time string `json:"time"` - ResourceID string `json:"resourceId"` - Type string `json:"Type"` + Time string `json:"time"` AppRoleInstance string `json:"AppRoleInstance"` AppRoleName string `json:"AppRoleName"` AppVersion string `json:"AppVersion"` SDKVersion string `json:"SDKVersion"` - ClientBrowser string `json:"ClientBrowser"` - ClientCity string `json:"ClientCity"` - ClientCountryOrRegion string `json:"ClientCountryOrRegion"` - ClientIP string `json:"ClientIP"` - ClientModel string `json:"ClientModel"` - ClientOS string `json:"ClientOS"` - ClientStateOrProvince string `json:"ClientStateOrProvince"` - ClientType string `json:"ClientType"` - MetricName string `json:"Name"` Total float64 `json:"Sum"` Minimum float64 `json:"Min"` @@ -193,7 +180,7 @@ type azureAppMetricRecord struct { Count float64 `json:"ItemCount"` } -func (r *azureAppMetricRecord) AppendMetric(c azureResourceMetricsConfiger, md *pmetric.Metrics) error { +func (r *azureAppMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, fields map[string]interface{}, md *pmetric.Metrics) error { resourceMetrics := md.ResourceMetrics().AppendEmpty() resource := resourceMetrics.Resource() @@ -203,17 +190,36 @@ func (r *azureAppMetricRecord) AppendMetric(c azureResourceMetricsConfiger, md * resource.Attributes().PutStr(string(conventions.ServiceNameKey), r.AppRoleName) resource.Attributes().PutStr(string(conventions.ServiceVersionKey), r.AppVersion) + ignoredAttributes := []string{ + "time", + "Type", + "ResourceGUID", + "AppRoleInstance", + "AppRoleName", + "AppVersion", + "SDKVersion", + "IKey", + "_BilledSize", + "Name", + "Sum", + "Min", + "Max", + "ItemCount", + } + + for k, v := range fields { + if slices.Contains(ignoredAttributes, k) { + continue + } + + resource.Attributes().PutStr(k, fmt.Sprintf("%v", v)) + } + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() metrics := scopeMetrics.Metrics() metrics.EnsureCapacity(4) - if r.ResourceID != "" { - resourceMetrics.Resource().Attributes().PutStr(azureResourceID, r.ResourceID) - } else { - c.GetLogger().Warn("No ResourceID Set on Metrics!") - } - nanos, err := asTimestamp(r.Time, c.GetTimeFormat()) if err != nil { c.GetLogger().Warn("Invalid Timestamp", zap.String("time", r.Time)) @@ -278,7 +284,7 @@ func (r *azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *azureEvent) (p } for _, mr := range azureMetrics.Records { - err := mr.Record.AppendMetric(r, &md) + err := mr.Record.AppendMetrics(r, mr.Fields, &md) if err != nil { r.logger.Warn("Failed to append metric", zap.Error(err)) } diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go index 32fd89792d4f0..f1eabd3e688d5 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go @@ -66,3 +66,81 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 9, metrics.MetricCount()) } + +func TestAzureResourceMetricsUnmarshaler_UnmarshalAppMetricsWithAttributes(t *testing.T) { + encodedMetrics := `{"records":[ +{ + "time":"2025-07-14T12:35:36.3259399Z", + "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "ResourceGUID":"00000000-0000-0000-0000-000000000000", + "Type":"AppMetrics", + "AppRoleInstance":"00000000-0000-0000-0000-000000000000", + "AppRoleName":"service", + "AppVersion":"1.0.0.0", + "ClientBrowser":"Other", + "ClientCity":"City", + "ClientCountryOrRegion":"Country", + "ClientIP":"0.0.0.0", + "ClientModel":"Other", + "ClientOS":"Linux", + "ClientStateOrProvince":"Province", + "ClientType":"PC", + "IKey":"00000000-0000-0000-0000-000000000000", + "_BilledSize":444, + "SDKVersion":"dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", + "Name":"metric.name", + "Sum":8, + "Min":8, + "Max":8, + "ItemCount":1 +} +]}` + event := eventhub.Event{Data: []byte(encodedMetrics)} + logger := zap.NewNop() + unmarshaler := newAzureResourceMetricsUnmarshaler( + component.BuildInfo{ + Command: "Test", + Description: "Test", + Version: "Test", + }, + logger, + []string{ + "", + }, + ) + metrics, err := unmarshaler.UnmarshalMetrics(&event) + + assert.NoError(t, err) + assert.Equal(t, 4, metrics.MetricCount()) + + expectedAttributes := map[string]string{ + "service.instance.id": "00000000-0000-0000-0000-000000000000", + "service.name": "service", + "service.version": "1.0.0.0", + "telemetry.sdk.version": "dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", + "cloud.provider": "azure", + "resourceId": "/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "ClientBrowser": "Other", + "ClientCity": "City", + "ClientCountryOrRegion": "Country", + "ClientIP": "0.0.0.0", + "ClientModel": "Other", + "ClientOS": "Linux", + "ClientStateOrProvince": "Province", + "ClientType": "PC", + } + metric := metrics.ResourceMetrics().At(0).Resource() + + assert.Equal(t, len(expectedAttributes), metric.Attributes().Len()) + + for k, expected := range expectedAttributes { + actual, ok := metric.Attributes().Get(k) + + if !ok { + t.Errorf("Attribute %s not found", k) + continue + } + + assert.Equal(t, expected, actual.AsString()) + } +} From b15fec66c2ee4d12ce94e650a2b4c103525882a4 Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Thu, 17 Jul 2025 12:59:37 +0200 Subject: [PATCH 04/14] Metric aggregation on app metrics, average supported --- .../azureresourcemetrics_unmarshaler.go | 51 +++++++++----- .../azureresourcemetrics_unmarshaler_test.go | 70 +++++++++---------- receiver/azureeventhubreceiver/config.go | 1 + receiver/azureeventhubreceiver/factory.go | 2 +- 4 files changed, 69 insertions(+), 55 deletions(-) diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index bb5d3a355f5d6..5659d9ff4a41c 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -24,15 +24,17 @@ import ( const azureResourceID = "azure.resource.id" type azureResourceMetricsUnmarshaler struct { - buildInfo component.BuildInfo - logger *zap.Logger - TimeFormat []string + buildInfo component.BuildInfo + logger *zap.Logger + TimeFormat []string + Aggregation string } type azureResourceMetricsConfiger interface { GetLogger() *zap.Logger GetBuildVersion() string GetTimeFormat() []string + GetAggregation() string } // azureMetricRecords represents an array of Azure metric records @@ -80,15 +82,16 @@ func (r *azureGenericMetricRecord) UnmarshalJSON(data []byte) error { // azureMetricRecord represents a single Azure Metric following // the common schema does not exist (yet): type azureResourceMetricRecord struct { - Time string `json:"time"` - ResourceID string `json:"resourceId"` - MetricName string `json:"metricName"` - TimeGrain string `json:"timeGrain"` - Total float64 `json:"total"` - Count float64 `json:"count"` - Minimum float64 `json:"minimum"` - Maximum float64 `json:"maximum"` - Average float64 `json:"average"` + Time string `json:"time"` + ResourceID string `json:"resourceId"` + MetricName string `json:"metricName"` + TimeGrain string `json:"timeGrain"` + + Total float64 `json:"total"` + Count float64 `json:"count"` + Minimum float64 `json:"minimum"` + Maximum float64 `json:"maximum"` + Average float64 `json:"average"` } func (r *azureResourceMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, _ map[string]interface{}, md *pmetric.Metrics) error { @@ -228,6 +231,17 @@ func (r *azureAppMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, fie startTimestamp := pcommon.NewTimestampFromTime(nanos.AsTime().Add(-time.Minute)) + if c.GetAggregation() == "average" { + metricAverage := metrics.AppendEmpty() + metricAverage.SetName(strings.ToLower(strings.ReplaceAll(r.MetricName, " ", "_"))) + dpAverage := metricAverage.SetEmptyGauge().DataPoints().AppendEmpty() + dpAverage.SetStartTimestamp(startTimestamp) + dpAverage.SetTimestamp(nanos) + dpAverage.SetDoubleValue(r.Total / r.Count) + + return nil + } + metricTotal := metrics.AppendEmpty() metricTotal.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Total"))) dpTotal := metricTotal.SetEmptyGauge().DataPoints().AppendEmpty() @@ -259,11 +273,12 @@ func (r *azureAppMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, fie return nil } -func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, timeFormat []string) eventMetricsUnmarshaler { +func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, cfg *Config) eventMetricsUnmarshaler { return &azureResourceMetricsUnmarshaler{ - buildInfo: buildInfo, - logger: logger, - TimeFormat: timeFormat, + buildInfo: buildInfo, + logger: logger, + TimeFormat: cfg.TimeFormats.Metrics, + Aggregation: cfg.MetricAggregation, } } @@ -305,6 +320,10 @@ func (r *azureResourceMetricsUnmarshaler) GetTimeFormat() []string { return r.TimeFormat } +func (r *azureResourceMetricsUnmarshaler) GetAggregation() string { + return r.Aggregation +} + // asTimestamp will parse an ISO8601 string into an OpenTelemetry // nanosecond timestamp. If the string cannot be parsed, it will // return zero and the error. diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go index f1eabd3e688d5..7cdd1ecf93c70 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go @@ -9,8 +9,7 @@ import ( "go.uber.org/zap" ) -func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) { - encodedMetrics := `{"records":[ +var encodedMetrics = `{"records":[ { "count":23, "total":12292.1382, @@ -48,6 +47,8 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) { "ItemCount":1 } ]}` + +func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) { event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}} logger := zap.NewNop() unmarshaler := newAzureResourceMetricsUnmarshaler( @@ -57,9 +58,7 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) { Version: "Test", }, logger, - []string{ - "", - }, + &Config{}, ) metrics, err := unmarshaler.UnmarshalMetrics(&event) @@ -68,34 +67,7 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) { } func TestAzureResourceMetricsUnmarshaler_UnmarshalAppMetricsWithAttributes(t *testing.T) { - encodedMetrics := `{"records":[ -{ - "time":"2025-07-14T12:35:36.3259399Z", - "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", - "ResourceGUID":"00000000-0000-0000-0000-000000000000", - "Type":"AppMetrics", - "AppRoleInstance":"00000000-0000-0000-0000-000000000000", - "AppRoleName":"service", - "AppVersion":"1.0.0.0", - "ClientBrowser":"Other", - "ClientCity":"City", - "ClientCountryOrRegion":"Country", - "ClientIP":"0.0.0.0", - "ClientModel":"Other", - "ClientOS":"Linux", - "ClientStateOrProvince":"Province", - "ClientType":"PC", - "IKey":"00000000-0000-0000-0000-000000000000", - "_BilledSize":444, - "SDKVersion":"dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", - "Name":"metric.name", - "Sum":8, - "Min":8, - "Max":8, - "ItemCount":1 -} -]}` - event := eventhub.Event{Data: []byte(encodedMetrics)} + event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}} logger := zap.NewNop() unmarshaler := newAzureResourceMetricsUnmarshaler( component.BuildInfo{ @@ -104,14 +76,11 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalAppMetricsWithAttributes(t *te Version: "Test", }, logger, - []string{ - "", - }, + &Config{}, ) metrics, err := unmarshaler.UnmarshalMetrics(&event) assert.NoError(t, err) - assert.Equal(t, 4, metrics.MetricCount()) expectedAttributes := map[string]string{ "service.instance.id": "00000000-0000-0000-0000-000000000000", @@ -129,7 +98,7 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalAppMetricsWithAttributes(t *te "ClientStateOrProvince": "Province", "ClientType": "PC", } - metric := metrics.ResourceMetrics().At(0).Resource() + metric := metrics.ResourceMetrics().At(1).Resource() assert.Equal(t, len(expectedAttributes), metric.Attributes().Len()) @@ -144,3 +113,28 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalAppMetricsWithAttributes(t *te assert.Equal(t, expected, actual.AsString()) } } + +func TestAzureResourceMetricsUnmarshaler_UnmarshalAggregatedAppMetrics(t *testing.T) { + event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}} + logger := zap.NewNop() + unmarshaler := newAzureResourceMetricsUnmarshaler( + component.BuildInfo{ + Command: "Test", + Description: "Test", + Version: "Test", + }, + logger, + &Config{ + MetricAggregation: "average", + }, + ) + metrics, err := unmarshaler.UnmarshalMetrics(&event) + + assert.NoError(t, err) + assert.Equal(t, 6, metrics.MetricCount()) + + metric := metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().At(0) + + assert.Equal(t, "metric.name", metric.Name()) + assert.Equal(t, 8.0, metric.Gauge().DataPoints().At(0).DoubleValue()) +} diff --git a/receiver/azureeventhubreceiver/config.go b/receiver/azureeventhubreceiver/config.go index 81345caf8c5a1..49d17e6c68dfd 100644 --- a/receiver/azureeventhubreceiver/config.go +++ b/receiver/azureeventhubreceiver/config.go @@ -35,6 +35,7 @@ type Config struct { ConsumerGroup string `mapstructure:"group"` ApplySemanticConventions bool `mapstructure:"apply_semantic_conventions"` TimeFormats TimeFormat `mapstructure:"time_formats"` + MetricAggregation string `mapstructure:"metric_aggregation"` // azeventhub lib specific PollRate int `mapstructure:"poll_rate"` diff --git a/receiver/azureeventhubreceiver/factory.go b/receiver/azureeventhubreceiver/factory.go index cc0ee1098f1fb..9ab1b692f5017 100644 --- a/receiver/azureeventhubreceiver/factory.go +++ b/receiver/azureeventhubreceiver/factory.go @@ -117,7 +117,7 @@ func (f *eventhubReceiverFactory) getReceiver( metricsUnmarshaler = nil err = errors.New("raw format not supported for Metrics") } else { - metricsUnmarshaler = newAzureResourceMetricsUnmarshaler(settings.BuildInfo, settings.Logger, receiverConfig.TimeFormats.Metrics) + metricsUnmarshaler = newAzureResourceMetricsUnmarshaler(settings.BuildInfo, settings.Logger, receiverConfig) } case pipeline.SignalTraces: if logFormat(receiverConfig.Format) == rawLogFormat { From afdd8c966f65615b71d9472c0cf3ba84cf344ca7 Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Thu, 17 Jul 2025 13:44:12 +0200 Subject: [PATCH 05/14] Using additional attributes from properties --- .../azureresourcemetrics_unmarshaler.go | 54 ++++++++----------- .../azureresourcemetrics_unmarshaler_test.go | 18 +++---- 2 files changed, 32 insertions(+), 40 deletions(-) diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index 5659d9ff4a41c..77c2fa7955073 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -6,7 +6,6 @@ package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry import ( "bytes" "fmt" - "slices" "strings" "time" @@ -44,25 +43,25 @@ type azureMetricRecords struct { } type azureMetricAppender interface { - AppendMetrics(azureResourceMetricsConfiger, map[string]interface{}, *pmetric.Metrics) error + AppendMetrics(azureResourceMetricsConfiger, *pmetric.Metrics) error } type azureGenericMetricRecord struct { Record azureMetricAppender - Fields map[string]interface{} } func (r *azureGenericMetricRecord) UnmarshalJSON(data []byte) error { + var recordWithType struct { + Type string `json:"Type"` + } typeDecoder := jsoniter.NewDecoder(bytes.NewReader(data)) - err := typeDecoder.Decode(&r.Fields) + err := typeDecoder.Decode(&recordWithType) if err != nil { return err } - recordType, _ := r.Fields["Type"].(string) - - switch recordType { + switch recordWithType.Type { case "AppMetrics": r.Record = &azureAppMetricRecord{} default: @@ -94,7 +93,7 @@ type azureResourceMetricRecord struct { Average float64 `json:"average"` } -func (r *azureResourceMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, _ map[string]interface{}, md *pmetric.Metrics) error { +func (r *azureResourceMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, md *pmetric.Metrics) error { resourceMetrics := md.ResourceMetrics().AppendEmpty() resource := resourceMetrics.Resource() @@ -171,11 +170,17 @@ func (r *azureResourceMetricRecord) AppendMetrics(c azureResourceMetricsConfiger type azureAppMetricRecord struct { Time string `json:"time"` + ResourceID string `json:"resourceId"` AppRoleInstance string `json:"AppRoleInstance"` AppRoleName string `json:"AppRoleName"` AppVersion string `json:"AppVersion"` SDKVersion string `json:"SDKVersion"` + ClientCountryOrRegion string `json:"ClientCountryOrRegion"` + ClientOS string `json:"ClientOS"` + + Properties map[string]string `json:"Properties"` + MetricName string `json:"Name"` Total float64 `json:"Sum"` Minimum float64 `json:"Min"` @@ -183,39 +188,26 @@ type azureAppMetricRecord struct { Count float64 `json:"ItemCount"` } -func (r *azureAppMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, fields map[string]interface{}, md *pmetric.Metrics) error { +func (r *azureAppMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, md *pmetric.Metrics) error { resourceMetrics := md.ResourceMetrics().AppendEmpty() resource := resourceMetrics.Resource() resource.Attributes().PutStr(string(conventions.TelemetrySDKVersionKey), r.SDKVersion) resource.Attributes().PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAzure.Value.AsString()) + resource.Attributes().PutStr(string(conventions.CloudRegionKey), r.ClientCountryOrRegion) resource.Attributes().PutStr(string(conventions.ServiceInstanceIDKey), r.AppRoleInstance) resource.Attributes().PutStr(string(conventions.ServiceNameKey), r.AppRoleName) resource.Attributes().PutStr(string(conventions.ServiceVersionKey), r.AppVersion) + resource.Attributes().PutStr(string(conventions.OSNameKey), r.ClientOS) - ignoredAttributes := []string{ - "time", - "Type", - "ResourceGUID", - "AppRoleInstance", - "AppRoleName", - "AppVersion", - "SDKVersion", - "IKey", - "_BilledSize", - "Name", - "Sum", - "Min", - "Max", - "ItemCount", + if r.ResourceID != "" { + resourceMetrics.Resource().Attributes().PutStr(azureResourceID, r.ResourceID) + } else { + c.GetLogger().Warn("No ResourceID Set on Metrics!") } - for k, v := range fields { - if slices.Contains(ignoredAttributes, k) { - continue - } - - resource.Attributes().PutStr(k, fmt.Sprintf("%v", v)) + for k, v := range r.Properties { + resource.Attributes().PutStr(k, v) } scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() @@ -299,7 +291,7 @@ func (r *azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *azureEvent) (p } for _, mr := range azureMetrics.Records { - err := mr.Record.AppendMetrics(r, mr.Fields, &md) + err := mr.Record.AppendMetrics(r, &md) if err != nil { r.logger.Warn("Failed to append metric", zap.Error(err)) } diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go index 7cdd1ecf93c70..7bdd379b2f41b 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go @@ -40,6 +40,10 @@ var encodedMetrics = `{"records":[ "IKey":"00000000-0000-0000-0000-000000000000", "_BilledSize":444, "SDKVersion":"dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", + "Properties": { + "an_attribute": "a_value", + "another_attribute": "another_value" + }, "Name":"metric.name", "Sum":8, "Min":8, @@ -88,15 +92,11 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalAppMetricsWithAttributes(t *te "service.version": "1.0.0.0", "telemetry.sdk.version": "dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", "cloud.provider": "azure", - "resourceId": "/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", - "ClientBrowser": "Other", - "ClientCity": "City", - "ClientCountryOrRegion": "Country", - "ClientIP": "0.0.0.0", - "ClientModel": "Other", - "ClientOS": "Linux", - "ClientStateOrProvince": "Province", - "ClientType": "PC", + "cloud.region": "Country", + "azure.resource.id": "/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "os.name": "Linux", + "an_attribute": "a_value", + "another_attribute": "another_value", } metric := metrics.ResourceMetrics().At(1).Resource() From be5605e146372facd6fa1a15dc7b356c31c25190 Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Thu, 17 Jul 2025 14:02:36 +0200 Subject: [PATCH 06/14] Support resource metric aggregation --- .../azureresourcemetrics_unmarshaler.go | 19 +++++++++++++++++-- .../azureresourcemetrics_unmarshaler_test.go | 11 +++++++---- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index 77c2fa7955073..487f5d3002e66 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -105,7 +105,6 @@ func (r *azureResourceMetricRecord) AppendMetrics(c azureResourceMetricsConfiger scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() metrics := scopeMetrics.Metrics() - metrics.EnsureCapacity(5) if r.ResourceID != "" { resourceMetrics.Resource().Attributes().PutStr(azureResourceID, r.ResourceID) @@ -127,6 +126,20 @@ func (r *azureResourceMetricRecord) AppendMetrics(c azureResourceMetricsConfiger startTimestamp = pcommon.NewTimestampFromTime(nanos.AsTime().Add(-time.Minute)) + if c.GetAggregation() == "average" { + metrics.EnsureCapacity(1) + metricAverage := metrics.AppendEmpty() + metricAverage.SetName(strings.ToLower(strings.ReplaceAll(r.MetricName, " ", "_"))) + dpAverage := metricAverage.SetEmptyGauge().DataPoints().AppendEmpty() + dpAverage.SetStartTimestamp(startTimestamp) + dpAverage.SetTimestamp(nanos) + dpAverage.SetDoubleValue(r.Total / r.Count) + + return nil + } + + metrics.EnsureCapacity(5) + metricTotal := metrics.AppendEmpty() metricTotal.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Total"))) dpTotal := metricTotal.SetEmptyGauge().DataPoints().AppendEmpty() @@ -213,7 +226,6 @@ func (r *azureAppMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, md scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() metrics := scopeMetrics.Metrics() - metrics.EnsureCapacity(4) nanos, err := asTimestamp(r.Time, c.GetTimeFormat()) if err != nil { @@ -224,6 +236,7 @@ func (r *azureAppMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, md startTimestamp := pcommon.NewTimestampFromTime(nanos.AsTime().Add(-time.Minute)) if c.GetAggregation() == "average" { + metrics.EnsureCapacity(1) metricAverage := metrics.AppendEmpty() metricAverage.SetName(strings.ToLower(strings.ReplaceAll(r.MetricName, " ", "_"))) dpAverage := metricAverage.SetEmptyGauge().DataPoints().AppendEmpty() @@ -234,6 +247,8 @@ func (r *azureAppMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, md return nil } + metrics.EnsureCapacity(4) + metricTotal := metrics.AppendEmpty() metricTotal.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Total"))) dpTotal := metricTotal.SetEmptyGauge().DataPoints().AppendEmpty() diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go index 7bdd379b2f41b..cab62dee4b82e 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go @@ -131,10 +131,13 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalAggregatedAppMetrics(t *testin metrics, err := unmarshaler.UnmarshalMetrics(&event) assert.NoError(t, err) - assert.Equal(t, 6, metrics.MetricCount()) + assert.Equal(t, 2, metrics.MetricCount()) - metric := metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().At(0) + resMetric := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(t, "dependencies/duration", resMetric.Name()) + assert.Equal(t, 534.4407913043478, resMetric.Gauge().DataPoints().At(0).DoubleValue()) - assert.Equal(t, "metric.name", metric.Name()) - assert.Equal(t, 8.0, metric.Gauge().DataPoints().At(0).DoubleValue()) + appMetric := metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(t, "metric.name", appMetric.Name()) + assert.Equal(t, 8.0, appMetric.Gauge().DataPoints().At(0).DoubleValue()) } From 16298a56fac6719ced654e9679d3d3217c9c311b Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Mon, 4 Aug 2025 14:33:15 +0200 Subject: [PATCH 07/14] Extends readme with app metrics and links --- receiver/azureeventhubreceiver/README.md | 66 +++++++++++++++++++++--- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/receiver/azureeventhubreceiver/README.md b/receiver/azureeventhubreceiver/README.md index 069abb45755b4..18223c8805829 100644 --- a/receiver/azureeventhubreceiver/README.md +++ b/receiver/azureeventhubreceiver/README.md @@ -15,11 +15,17 @@ ## Overview + Azure resources and services can be [configured](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/diagnostic-settings) -to send their logs to an Azure Event Hub. The Azure Event Hub receiver pulls logs from an Azure +to send their telemetry to an Azure Event Hub. The Azure Event Hub receiver pulls telemetry from an Azure Event Hub, transforms them, and pushes them through the collector pipeline. +### Read further + +* [Diagnostic settings in Azure Monitor](https://learn.microsoft.com/en-us/azure/azure-monitor/platform/diagnostic-settings?tabs=portal) +* [Stream Azure monitoring data to an event hub and external partner](https://learn.microsoft.com/en-us/azure/azure-monitor/platform/stream-monitoring-data-event-hubs) + ## Configuration ### connection (Required) @@ -58,6 +64,21 @@ All supported time format for logs, metrics and traces. Default is `nil` (unset) Default: `nil` +### metric_aggregation (optional) + +Metric records received from an Azure Event Hub will contain multiple aggregated datapoints. Depending on the +[type of metric](https://learn.microsoft.com/en-us/azure/azure-monitor/metrics/data-platform-metrics#types-of-metrics), +these datapoints will be slightly different, but all contain a total/sum, min, max and count. This setting will manage +these datapoints. + +#### Possible values: + +* By default, these datapoints will be mapped to metrics named with a corresponding suffix, like `_total`, `_min`, etc. + See [azure format](#azure). +* With `average`, datapoints will be aggregated into an average value (`sum/count`), and keep the original metric name. + +Default: `nil` + > [!NOTE] > You can opt-in to use the [`azeventhubs`](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs) sdk by enabling the feature gate > `receiver.azureeventhubreceiver.UseAzeventhubs` when you run the OpenTelemetry Collector. See the following page @@ -102,10 +123,13 @@ The "raw" format maps the AMQP properties and data into the attributes and body of an OpenTelemetry LogRecord, respectively. The body is represented as a raw byte array. -This format is not supported for Metrics. +> [!WARN] +> This format is not supported for Metrics. ### azure +#### Logs + The "azure" format extracts the Azure log records from the AMQP message data, parses them, and maps the fields to OpenTelemetry attributes. The table below summarizes the mapping between the @@ -137,8 +161,18 @@ Notes: * JSON does not distinguish between fixed and floating point numbers. All JSON numbers are encoded as doubles. +#### Metrics + For Metrics the Azure Metric Records are an array -of "records" with the following fields. +of "records" with the following fields, by +[type of metric](https://learn.microsoft.com/en-us/azure/azure-monitor/metrics/data-platform-metrics#types-of-metrics). + +From this data a Metric of type Gauge is created +with a Data Points that represents the values +for the Metric including: Total, Minimum, Maximum, +Average and Count. + +##### Platform metric (from Azure resources) | Azure | Open Telemetry | |------------|---------------------------------------------| @@ -152,10 +186,28 @@ of "records" with the following fields. | maximum | mapped to datapoint metricName + "_MAXIMUM" | | average | mapped to datapoint metricName + "_AVERAGE" | -From this data a Metric of type Gauge is created -with a Data Points that represents the values -for the Metric including: Total, Minimum, Maximum, -Average and Count. +##### Application metrics (from Application Insights) + +See: https://learn.microsoft.com/en-us/azure/azure-monitor/reference/tables/appmetrics + +| Azure | Open Telemetry | +|----------------------------|---------------------------------------------| +| time | time_unix_nano (field) | +| resourceId | azure.resource.id (resource attribute) | +| Name | (metric name) | +| AppRoleInstance | service.instance.id (resource attribute) | +| AppRoleName | service.name (resource attribute) | +| AppVersion | service.version (resource attribute) | +| SDKVersion | telemetry.sdk.version (resource attribute) | +| ClientCountryOrRegion | cloud.region (resource attribute) | +| ClientOS | os.name (resource attribute) | +| Properties (key/value map) | mapped to resource attributes | +| Sum | mapped to datapoint metricName + "_TOTAL" | +| ItemCount | mapped to datapoint metricName + "_COUNT" | +| Min | mapped to datapoint metricName + "_MINIMUM" | +| Max | mapped to datapoint metricName + "_MAXIMUM" | + +#### Traces Traces based on Azure Application Insights array of records from `AppRequests` & `AppDependencies` with the following fields. From 9db0ad61d1bb63da597ff6e762940e5501cd6f22 Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Mon, 4 Aug 2025 14:38:36 +0200 Subject: [PATCH 08/14] Some comments on different azure metrics in eventhub --- .../azureresourcemetrics_unmarshaler.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index 487f5d3002e66..226275436ca7e 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -78,8 +78,9 @@ func (r *azureGenericMetricRecord) UnmarshalJSON(data []byte) error { return nil } -// azureMetricRecord represents a single Azure Metric following +// azureMetricRecord represents a single Azure Platform Metric record following // the common schema does not exist (yet): +// See: https://learn.microsoft.com/en-us/azure/azure-monitor/platform/stream-monitoring-data-event-hubs#data-formats type azureResourceMetricRecord struct { Time string `json:"time"` ResourceID string `json:"resourceId"` @@ -178,8 +179,8 @@ func (r *azureResourceMetricRecord) AppendMetrics(c azureResourceMetricsConfiger return nil } -// azureMetricRecord represents a single Azure Metric following -// the common schema does not exist (yet): +// azureMetricRecord represents a single Azure Application Metric record +// See: https://learn.microsoft.com/en-us/azure/azure-monitor/reference/tables/appmetrics type azureAppMetricRecord struct { Time string `json:"time"` From 90987053be11dddbb3e93d39981f2b2fa4d2b9ad Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Thu, 7 Aug 2025 09:54:18 +0200 Subject: [PATCH 09/14] Adjust azureauth scopes to upstream --- extension/azureauthextension/extension.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/azureauthextension/extension.go b/extension/azureauthextension/extension.go index 4a5a241904328..cf03aa823a08e 100644 --- a/extension/azureauthextension/extension.go +++ b/extension/azureauthextension/extension.go @@ -167,7 +167,7 @@ func (a *authenticator) getTokenForHost(ctx context.Context, host string) (strin // Example: if host is "management.azure.com", then the scope to get the // token will be "https://management.azure.com/.default". // See default scope: https://learn.microsoft.com/en-us/entra/identity-platform/scopes-oidc#the-default-scope. - "https://monitor.azure.com/.default", + fmt.Sprintf("https://%s/.default", host), }, } From b4a579679ae2da70010ce7a1d434738922d6a3e4 Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Tue, 9 Sep 2025 10:57:26 +0200 Subject: [PATCH 10/14] Adds licence header in metrics test file --- .../azureresourcemetrics_unmarshaler_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go index cab62dee4b82e..215a5febef099 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package azureeventhubreceiver import ( From e764fe6511c30184f75ef21905e8dd205d301118 Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Fri, 26 Sep 2025 11:59:35 +0200 Subject: [PATCH 11/14] Solves go linting --- .../azureeventhubreceiver/azureresourcemetrics_unmarshaler.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index 226275436ca7e..1d6054244d9b0 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -56,7 +56,6 @@ func (r *azureGenericMetricRecord) UnmarshalJSON(data []byte) error { } typeDecoder := jsoniter.NewDecoder(bytes.NewReader(data)) err := typeDecoder.Decode(&recordWithType) - if err != nil { return err } @@ -70,7 +69,6 @@ func (r *azureGenericMetricRecord) UnmarshalJSON(data []byte) error { recordDecoder := jsoniter.NewDecoder(bytes.NewReader(data)) err = recordDecoder.Decode(r.Record) - if err != nil { return err } @@ -301,7 +299,6 @@ func (r *azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *azureEvent) (p var azureMetrics azureMetricRecords decoder := jsoniter.NewDecoder(bytes.NewReader(event.Data())) err := decoder.Decode(&azureMetrics) - if err != nil { return md, err } From bc40e2f758a63de182e3b3790f06c3ccef0bf7ba Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Fri, 26 Sep 2025 12:00:08 +0200 Subject: [PATCH 12/14] Azure event hub metrics test data --- .../testdata/config.yaml | 4 ++ .../testdata/metrics.json | 44 +++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 receiver/azureeventhubreceiver/testdata/metrics.json diff --git a/receiver/azureeventhubreceiver/testdata/config.yaml b/receiver/azureeventhubreceiver/testdata/config.yaml index 091b2cbe0a48c..30ae723c1708c 100644 --- a/receiver/azureeventhubreceiver/testdata/config.yaml +++ b/receiver/azureeventhubreceiver/testdata/config.yaml @@ -21,3 +21,7 @@ service: receivers: [azureeventhub, azureeventhub/all] processors: [nop] exporters: [nop] + metrics: + receivers: [azureeventhub, azureeventhub/all] + processors: [nop] + exporters: [nop] diff --git a/receiver/azureeventhubreceiver/testdata/metrics.json b/receiver/azureeventhubreceiver/testdata/metrics.json new file mode 100644 index 0000000000000..8da42779dd193 --- /dev/null +++ b/receiver/azureeventhubreceiver/testdata/metrics.json @@ -0,0 +1,44 @@ +{ + "records":[ + { + "count":23, + "total":12292.1382, + "minimum":27.4786, + "maximum":6695.419, + "average":534.440791304348, + "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "time":"2025-07-14T12:45:00.0000000Z", + "metricName":"dependencies/duration", + "timeGrain":"PT1M" + }, + { + "time":"2025-07-14T12:35:36.3259399Z", + "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "ResourceGUID":"00000000-0000-0000-0000-000000000000", + "Type":"AppMetrics", + "AppRoleInstance":"00000000-0000-0000-0000-000000000000", + "AppRoleName":"service", + "AppVersion":"1.0.0.0", + "ClientBrowser":"Other", + "ClientCity":"City", + "ClientCountryOrRegion":"Country", + "ClientIP":"0.0.0.0", + "ClientModel":"Other", + "ClientOS":"Linux", + "ClientStateOrProvince":"Province", + "ClientType":"PC", + "IKey":"00000000-0000-0000-0000-000000000000", + "_BilledSize":444, + "SDKVersion":"dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", + "Properties": { + "an_attribute": "a_value", + "another_attribute": "another_value" + }, + "Name":"metric.name", + "Sum":8, + "Min":8, + "Max":8, + "ItemCount":1 + } + ] +} \ No newline at end of file From 2fa4eca67ce43f033ce3d42fc3b5c1142479fe47 Mon Sep 17 00:00:00 2001 From: Olof Montin Date: Fri, 26 Sep 2025 14:23:43 +0200 Subject: [PATCH 13/14] Changelog entry on commited changes --- .../feature_azureeventhub-appmetrics.yaml | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 .chloggen/feature_azureeventhub-appmetrics.yaml diff --git a/.chloggen/feature_azureeventhub-appmetrics.yaml b/.chloggen/feature_azureeventhub-appmetrics.yaml new file mode 100644 index 0000000000000..d8791c137fb66 --- /dev/null +++ b/.chloggen/feature_azureeventhub-appmetrics.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: azureeventhubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Adds support for receiving Azure app metrics from Azure Event Hubs in the azureeventhubreceiver" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: + - 41343 + - 41367 + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The azureeventhubreceiver now supports receiving custom metrics emitted by applications to Azure Insights and forwarded using Diagnostic Settings to Azure Event Hub. + There's also on optional setting to aggregate received metrics into a single metric to keep the original name, instead of multiply the metrics by added suffixes `_total`, `_sum`, `_max` etc. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 71cc7d51c66a864424750dff0c2f86099856904b Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Fri, 24 Oct 2025 13:43:43 -0700 Subject: [PATCH 14/14] Update .chloggen/feature_azureeventhub-appmetrics.yaml --- .chloggen/feature_azureeventhub-appmetrics.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/feature_azureeventhub-appmetrics.yaml b/.chloggen/feature_azureeventhub-appmetrics.yaml index d8791c137fb66..9acfc54a48490 100644 --- a/.chloggen/feature_azureeventhub-appmetrics.yaml +++ b/.chloggen/feature_azureeventhub-appmetrics.yaml @@ -4,7 +4,7 @@ change_type: enhancement # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: azureeventhubreceiver +component: receiver/azureeventhub # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). note: "Adds support for receiving Azure app metrics from Azure Event Hubs in the azureeventhubreceiver"