diff --git a/.chloggen/feature_azureeventhub-appmetrics.yaml b/.chloggen/feature_azureeventhub-appmetrics.yaml new file mode 100644 index 0000000000000..9acfc54a48490 --- /dev/null +++ b/.chloggen/feature_azureeventhub-appmetrics.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/azureeventhub + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Adds support for receiving Azure app metrics from Azure Event Hubs in the azureeventhubreceiver" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: + - 41343 + - 41367 + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The azureeventhubreceiver now supports receiving custom metrics emitted by applications to Azure Insights and forwarded using Diagnostic Settings to Azure Event Hub. + There's also on optional setting to aggregate received metrics into a single metric to keep the original name, instead of multiply the metrics by added suffixes `_total`, `_sum`, `_max` etc. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/azureeventhubreceiver/README.md b/receiver/azureeventhubreceiver/README.md index 069abb45755b4..18223c8805829 100644 --- a/receiver/azureeventhubreceiver/README.md +++ b/receiver/azureeventhubreceiver/README.md @@ -15,11 +15,17 @@ ## Overview + Azure resources and services can be [configured](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/diagnostic-settings) -to send their logs to an Azure Event Hub. The Azure Event Hub receiver pulls logs from an Azure +to send their telemetry to an Azure Event Hub. The Azure Event Hub receiver pulls telemetry from an Azure Event Hub, transforms them, and pushes them through the collector pipeline. +### Read further + +* [Diagnostic settings in Azure Monitor](https://learn.microsoft.com/en-us/azure/azure-monitor/platform/diagnostic-settings?tabs=portal) +* [Stream Azure monitoring data to an event hub and external partner](https://learn.microsoft.com/en-us/azure/azure-monitor/platform/stream-monitoring-data-event-hubs) + ## Configuration ### connection (Required) @@ -58,6 +64,21 @@ All supported time format for logs, metrics and traces. Default is `nil` (unset) Default: `nil` +### metric_aggregation (optional) + +Metric records received from an Azure Event Hub will contain multiple aggregated datapoints. Depending on the +[type of metric](https://learn.microsoft.com/en-us/azure/azure-monitor/metrics/data-platform-metrics#types-of-metrics), +these datapoints will be slightly different, but all contain a total/sum, min, max and count. This setting will manage +these datapoints. + +#### Possible values: + +* By default, these datapoints will be mapped to metrics named with a corresponding suffix, like `_total`, `_min`, etc. + See [azure format](#azure). +* With `average`, datapoints will be aggregated into an average value (`sum/count`), and keep the original metric name. + +Default: `nil` + > [!NOTE] > You can opt-in to use the [`azeventhubs`](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs) sdk by enabling the feature gate > `receiver.azureeventhubreceiver.UseAzeventhubs` when you run the OpenTelemetry Collector. See the following page @@ -102,10 +123,13 @@ The "raw" format maps the AMQP properties and data into the attributes and body of an OpenTelemetry LogRecord, respectively. The body is represented as a raw byte array. -This format is not supported for Metrics. +> [!WARN] +> This format is not supported for Metrics. ### azure +#### Logs + The "azure" format extracts the Azure log records from the AMQP message data, parses them, and maps the fields to OpenTelemetry attributes. The table below summarizes the mapping between the @@ -137,8 +161,18 @@ Notes: * JSON does not distinguish between fixed and floating point numbers. All JSON numbers are encoded as doubles. +#### Metrics + For Metrics the Azure Metric Records are an array -of "records" with the following fields. +of "records" with the following fields, by +[type of metric](https://learn.microsoft.com/en-us/azure/azure-monitor/metrics/data-platform-metrics#types-of-metrics). + +From this data a Metric of type Gauge is created +with a Data Points that represents the values +for the Metric including: Total, Minimum, Maximum, +Average and Count. + +##### Platform metric (from Azure resources) | Azure | Open Telemetry | |------------|---------------------------------------------| @@ -152,10 +186,28 @@ of "records" with the following fields. | maximum | mapped to datapoint metricName + "_MAXIMUM" | | average | mapped to datapoint metricName + "_AVERAGE" | -From this data a Metric of type Gauge is created -with a Data Points that represents the values -for the Metric including: Total, Minimum, Maximum, -Average and Count. +##### Application metrics (from Application Insights) + +See: https://learn.microsoft.com/en-us/azure/azure-monitor/reference/tables/appmetrics + +| Azure | Open Telemetry | +|----------------------------|---------------------------------------------| +| time | time_unix_nano (field) | +| resourceId | azure.resource.id (resource attribute) | +| Name | (metric name) | +| AppRoleInstance | service.instance.id (resource attribute) | +| AppRoleName | service.name (resource attribute) | +| AppVersion | service.version (resource attribute) | +| SDKVersion | telemetry.sdk.version (resource attribute) | +| ClientCountryOrRegion | cloud.region (resource attribute) | +| ClientOS | os.name (resource attribute) | +| Properties (key/value map) | mapped to resource attributes | +| Sum | mapped to datapoint metricName + "_TOTAL" | +| ItemCount | mapped to datapoint metricName + "_COUNT" | +| Min | mapped to datapoint metricName + "_MINIMUM" | +| Max | mapped to datapoint metricName + "_MAXIMUM" | + +#### Traces Traces based on Azure Application Insights array of records from `AppRequests` & `AppDependencies` with the following fields. diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go index 6ebb5c589cce1..1d6054244d9b0 100644 --- a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler.go @@ -23,130 +23,312 @@ import ( const azureResourceID = "azure.resource.id" type azureResourceMetricsUnmarshaler struct { - buildInfo component.BuildInfo - logger *zap.Logger - TimeFormat []string + buildInfo component.BuildInfo + logger *zap.Logger + TimeFormat []string + Aggregation string +} + +type azureResourceMetricsConfiger interface { + GetLogger() *zap.Logger + GetBuildVersion() string + GetTimeFormat() []string + GetAggregation() string } // azureMetricRecords represents an array of Azure metric records // as exported via an Azure Event Hub type azureMetricRecords struct { - Records []azureMetricRecord `json:"records"` + Records []azureGenericMetricRecord `json:"records"` } -// azureMetricRecord represents a single Azure Metric following -// the common schema does not exist (yet): -type azureMetricRecord struct { - Time string `json:"time"` - ResourceID string `json:"resourceId"` - MetricName string `json:"metricName"` - TimeGrain string `json:"timeGrain"` - Total float64 `json:"total"` - Count float64 `json:"count"` - Minimum float64 `json:"minimum"` - Maximum float64 `json:"maximum"` - Average float64 `json:"average"` +type azureMetricAppender interface { + AppendMetrics(azureResourceMetricsConfiger, *pmetric.Metrics) error } -func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, timeFormat []string) eventMetricsUnmarshaler { - return azureResourceMetricsUnmarshaler{ - buildInfo: buildInfo, - logger: logger, - TimeFormat: timeFormat, - } +type azureGenericMetricRecord struct { + Record azureMetricAppender } -// UnmarshalMetrics takes a byte array containing a JSON-encoded -// payload with Azure metric records and transforms it into -// an OpenTelemetry pmetric.Metrics object. The data in the Azure -// metric record appears as fields and attributes in the -// OpenTelemetry representation; -func (r azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *azureEvent) (pmetric.Metrics, error) { - md := pmetric.NewMetrics() +func (r *azureGenericMetricRecord) UnmarshalJSON(data []byte) error { + var recordWithType struct { + Type string `json:"Type"` + } + typeDecoder := jsoniter.NewDecoder(bytes.NewReader(data)) + err := typeDecoder.Decode(&recordWithType) + if err != nil { + return err + } - var azureMetrics azureMetricRecords - decoder := jsoniter.NewDecoder(bytes.NewReader(event.Data())) - err := decoder.Decode(&azureMetrics) + switch recordWithType.Type { + case "AppMetrics": + r.Record = &azureAppMetricRecord{} + default: + r.Record = &azureResourceMetricRecord{} + } + + recordDecoder := jsoniter.NewDecoder(bytes.NewReader(data)) + err = recordDecoder.Decode(r.Record) if err != nil { - return md, err + return err } + return nil +} + +// azureMetricRecord represents a single Azure Platform Metric record following +// the common schema does not exist (yet): +// See: https://learn.microsoft.com/en-us/azure/azure-monitor/platform/stream-monitoring-data-event-hubs#data-formats +type azureResourceMetricRecord struct { + Time string `json:"time"` + ResourceID string `json:"resourceId"` + MetricName string `json:"metricName"` + TimeGrain string `json:"timeGrain"` + + Total float64 `json:"total"` + Count float64 `json:"count"` + Minimum float64 `json:"minimum"` + Maximum float64 `json:"maximum"` + Average float64 `json:"average"` +} + +func (r *azureResourceMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, md *pmetric.Metrics) error { resourceMetrics := md.ResourceMetrics().AppendEmpty() + resource := resourceMetrics.Resource() resource.Attributes().PutStr(string(conventions.TelemetrySDKNameKey), metadata.ScopeName) resource.Attributes().PutStr(string(conventions.TelemetrySDKLanguageKey), conventions.TelemetrySDKLanguageGo.Value.AsString()) - resource.Attributes().PutStr(string(conventions.TelemetrySDKVersionKey), r.buildInfo.Version) + resource.Attributes().PutStr(string(conventions.TelemetrySDKVersionKey), c.GetBuildVersion()) resource.Attributes().PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAzure.Value.AsString()) scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() metrics := scopeMetrics.Metrics() - metrics.EnsureCapacity(len(azureMetrics.Records) * 5) - resourceID := "" - for _, azureMetric := range azureMetrics.Records { - if resourceID == "" && azureMetric.ResourceID != "" { - resourceID = azureMetric.ResourceID - } + if r.ResourceID != "" { + resourceMetrics.Resource().Attributes().PutStr(azureResourceID, r.ResourceID) + } else { + c.GetLogger().Warn("No ResourceID Set on Metrics!") + } - nanos, err := asTimestamp(azureMetric.Time, r.TimeFormat) - if err != nil { - r.logger.Warn("Invalid Timestamp", zap.String("time", azureMetric.Time)) - continue - } + nanos, err := asTimestamp(r.Time, c.GetTimeFormat()) + if err != nil { + c.GetLogger().Warn("Invalid Timestamp", zap.String("time", r.Time)) + return err + } - var startTimestamp pcommon.Timestamp - if azureMetric.TimeGrain != "PT1M" { - r.logger.Warn("Unhandled Time Grain", zap.String("timegrain", azureMetric.TimeGrain)) - continue - } - startTimestamp = pcommon.NewTimestampFromTime(nanos.AsTime().Add(-time.Minute)) - - metricTotal := metrics.AppendEmpty() - metricTotal.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Total"))) - dpTotal := metricTotal.SetEmptyGauge().DataPoints().AppendEmpty() - dpTotal.SetStartTimestamp(startTimestamp) - dpTotal.SetTimestamp(nanos) - dpTotal.SetDoubleValue(azureMetric.Total) - - metricCount := metrics.AppendEmpty() - metricCount.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Count"))) - dpCount := metricCount.SetEmptyGauge().DataPoints().AppendEmpty() - dpCount.SetStartTimestamp(startTimestamp) - dpCount.SetTimestamp(nanos) - dpCount.SetDoubleValue(azureMetric.Count) - - metricMin := metrics.AppendEmpty() - metricMin.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Minimum"))) - dpMin := metricMin.SetEmptyGauge().DataPoints().AppendEmpty() - dpMin.SetStartTimestamp(startTimestamp) - dpMin.SetTimestamp(nanos) - dpMin.SetDoubleValue(azureMetric.Minimum) - - metricMax := metrics.AppendEmpty() - metricMax.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Maximum"))) - dpMax := metricMax.SetEmptyGauge().DataPoints().AppendEmpty() - dpMax.SetStartTimestamp(startTimestamp) - dpMax.SetTimestamp(nanos) - dpMax.SetDoubleValue(azureMetric.Maximum) + var startTimestamp pcommon.Timestamp + if r.TimeGrain != "PT1M" { + c.GetLogger().Warn("Unhandled Time Grain", zap.String("timegrain", r.TimeGrain)) + return err + } + startTimestamp = pcommon.NewTimestampFromTime(nanos.AsTime().Add(-time.Minute)) + + if c.GetAggregation() == "average" { + metrics.EnsureCapacity(1) metricAverage := metrics.AppendEmpty() - metricAverage.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(azureMetric.MetricName, " ", "_"), "Average"))) + metricAverage.SetName(strings.ToLower(strings.ReplaceAll(r.MetricName, " ", "_"))) dpAverage := metricAverage.SetEmptyGauge().DataPoints().AppendEmpty() dpAverage.SetStartTimestamp(startTimestamp) dpAverage.SetTimestamp(nanos) - dpAverage.SetDoubleValue(azureMetric.Average) + dpAverage.SetDoubleValue(r.Total / r.Count) + + return nil } - if resourceID != "" { - resourceMetrics.Resource().Attributes().PutStr(azureResourceID, resourceID) + metrics.EnsureCapacity(5) + + metricTotal := metrics.AppendEmpty() + metricTotal.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Total"))) + dpTotal := metricTotal.SetEmptyGauge().DataPoints().AppendEmpty() + dpTotal.SetStartTimestamp(startTimestamp) + dpTotal.SetTimestamp(nanos) + dpTotal.SetDoubleValue(r.Total) + + metricCount := metrics.AppendEmpty() + metricCount.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Count"))) + dpCount := metricCount.SetEmptyGauge().DataPoints().AppendEmpty() + dpCount.SetStartTimestamp(startTimestamp) + dpCount.SetTimestamp(nanos) + dpCount.SetDoubleValue(r.Count) + + metricMin := metrics.AppendEmpty() + metricMin.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Minimum"))) + dpMin := metricMin.SetEmptyGauge().DataPoints().AppendEmpty() + dpMin.SetStartTimestamp(startTimestamp) + dpMin.SetTimestamp(nanos) + dpMin.SetDoubleValue(r.Minimum) + + metricMax := metrics.AppendEmpty() + metricMax.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Maximum"))) + dpMax := metricMax.SetEmptyGauge().DataPoints().AppendEmpty() + dpMax.SetStartTimestamp(startTimestamp) + dpMax.SetTimestamp(nanos) + dpMax.SetDoubleValue(r.Maximum) + + metricAverage := metrics.AppendEmpty() + metricAverage.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Average"))) + dpAverage := metricAverage.SetEmptyGauge().DataPoints().AppendEmpty() + dpAverage.SetStartTimestamp(startTimestamp) + dpAverage.SetTimestamp(nanos) + dpAverage.SetDoubleValue(r.Average) + + return nil +} + +// azureMetricRecord represents a single Azure Application Metric record +// See: https://learn.microsoft.com/en-us/azure/azure-monitor/reference/tables/appmetrics +type azureAppMetricRecord struct { + Time string `json:"time"` + + ResourceID string `json:"resourceId"` + AppRoleInstance string `json:"AppRoleInstance"` + AppRoleName string `json:"AppRoleName"` + AppVersion string `json:"AppVersion"` + SDKVersion string `json:"SDKVersion"` + + ClientCountryOrRegion string `json:"ClientCountryOrRegion"` + ClientOS string `json:"ClientOS"` + + Properties map[string]string `json:"Properties"` + + MetricName string `json:"Name"` + Total float64 `json:"Sum"` + Minimum float64 `json:"Min"` + Maximum float64 `json:"Max"` + Count float64 `json:"ItemCount"` +} + +func (r *azureAppMetricRecord) AppendMetrics(c azureResourceMetricsConfiger, md *pmetric.Metrics) error { + resourceMetrics := md.ResourceMetrics().AppendEmpty() + + resource := resourceMetrics.Resource() + resource.Attributes().PutStr(string(conventions.TelemetrySDKVersionKey), r.SDKVersion) + resource.Attributes().PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAzure.Value.AsString()) + resource.Attributes().PutStr(string(conventions.CloudRegionKey), r.ClientCountryOrRegion) + resource.Attributes().PutStr(string(conventions.ServiceInstanceIDKey), r.AppRoleInstance) + resource.Attributes().PutStr(string(conventions.ServiceNameKey), r.AppRoleName) + resource.Attributes().PutStr(string(conventions.ServiceVersionKey), r.AppVersion) + resource.Attributes().PutStr(string(conventions.OSNameKey), r.ClientOS) + + if r.ResourceID != "" { + resourceMetrics.Resource().Attributes().PutStr(azureResourceID, r.ResourceID) } else { - r.logger.Warn("No ResourceID Set on Metrics!") + c.GetLogger().Warn("No ResourceID Set on Metrics!") + } + + for k, v := range r.Properties { + resource.Attributes().PutStr(k, v) + } + + scopeMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + + metrics := scopeMetrics.Metrics() + + nanos, err := asTimestamp(r.Time, c.GetTimeFormat()) + if err != nil { + c.GetLogger().Warn("Invalid Timestamp", zap.String("time", r.Time)) + return err + } + + startTimestamp := pcommon.NewTimestampFromTime(nanos.AsTime().Add(-time.Minute)) + + if c.GetAggregation() == "average" { + metrics.EnsureCapacity(1) + metricAverage := metrics.AppendEmpty() + metricAverage.SetName(strings.ToLower(strings.ReplaceAll(r.MetricName, " ", "_"))) + dpAverage := metricAverage.SetEmptyGauge().DataPoints().AppendEmpty() + dpAverage.SetStartTimestamp(startTimestamp) + dpAverage.SetTimestamp(nanos) + dpAverage.SetDoubleValue(r.Total / r.Count) + + return nil + } + + metrics.EnsureCapacity(4) + + metricTotal := metrics.AppendEmpty() + metricTotal.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Total"))) + dpTotal := metricTotal.SetEmptyGauge().DataPoints().AppendEmpty() + dpTotal.SetStartTimestamp(startTimestamp) + dpTotal.SetTimestamp(nanos) + dpTotal.SetDoubleValue(r.Total) + + metricCount := metrics.AppendEmpty() + metricCount.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Count"))) + dpCount := metricCount.SetEmptyGauge().DataPoints().AppendEmpty() + dpCount.SetStartTimestamp(startTimestamp) + dpCount.SetTimestamp(nanos) + dpCount.SetDoubleValue(r.Count) + + metricMin := metrics.AppendEmpty() + metricMin.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Minimum"))) + dpMin := metricMin.SetEmptyGauge().DataPoints().AppendEmpty() + dpMin.SetStartTimestamp(startTimestamp) + dpMin.SetTimestamp(nanos) + dpMin.SetDoubleValue(r.Minimum) + + metricMax := metrics.AppendEmpty() + metricMax.SetName(strings.ToLower(fmt.Sprintf("%s_%s", strings.ReplaceAll(r.MetricName, " ", "_"), "Maximum"))) + dpMax := metricMax.SetEmptyGauge().DataPoints().AppendEmpty() + dpMax.SetStartTimestamp(startTimestamp) + dpMax.SetTimestamp(nanos) + dpMax.SetDoubleValue(r.Maximum) + + return nil +} + +func newAzureResourceMetricsUnmarshaler(buildInfo component.BuildInfo, logger *zap.Logger, cfg *Config) eventMetricsUnmarshaler { + return &azureResourceMetricsUnmarshaler{ + buildInfo: buildInfo, + logger: logger, + TimeFormat: cfg.TimeFormats.Metrics, + Aggregation: cfg.MetricAggregation, + } +} + +// UnmarshalMetrics takes a byte array containing a JSON-encoded +// payload with Azure metric records and transforms it into +// an OpenTelemetry pmetric.Metrics object. The data in the Azure +// metric record appears as fields and attributes in the +// OpenTelemetry representation; +func (r *azureResourceMetricsUnmarshaler) UnmarshalMetrics(event *azureEvent) (pmetric.Metrics, error) { + md := pmetric.NewMetrics() + + var azureMetrics azureMetricRecords + decoder := jsoniter.NewDecoder(bytes.NewReader(event.Data())) + err := decoder.Decode(&azureMetrics) + if err != nil { + return md, err + } + + for _, mr := range azureMetrics.Records { + err := mr.Record.AppendMetrics(r, &md) + if err != nil { + r.logger.Warn("Failed to append metric", zap.Error(err)) + } } return md, nil } +func (r *azureResourceMetricsUnmarshaler) GetLogger() *zap.Logger { + return r.logger +} + +func (r *azureResourceMetricsUnmarshaler) GetBuildVersion() string { + return r.buildInfo.Version +} + +func (r *azureResourceMetricsUnmarshaler) GetTimeFormat() []string { + return r.TimeFormat +} + +func (r *azureResourceMetricsUnmarshaler) GetAggregation() string { + return r.Aggregation +} + // asTimestamp will parse an ISO8601 string into an OpenTelemetry // nanosecond timestamp. If the string cannot be parsed, it will // return zero and the error. diff --git a/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go new file mode 100644 index 0000000000000..215a5febef099 --- /dev/null +++ b/receiver/azureeventhubreceiver/azureresourcemetrics_unmarshaler_test.go @@ -0,0 +1,146 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package azureeventhubreceiver + +import ( + "testing" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" +) + +var encodedMetrics = `{"records":[ +{ + "count":23, + "total":12292.1382, + "minimum":27.4786, + "maximum":6695.419, + "average":534.440791304348, + "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "time":"2025-07-14T12:45:00.0000000Z", + "metricName":"dependencies/duration", + "timeGrain":"PT1M" +}, +{ + "time":"2025-07-14T12:35:36.3259399Z", + "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "ResourceGUID":"00000000-0000-0000-0000-000000000000", + "Type":"AppMetrics", + "AppRoleInstance":"00000000-0000-0000-0000-000000000000", + "AppRoleName":"service", + "AppVersion":"1.0.0.0", + "ClientBrowser":"Other", + "ClientCity":"City", + "ClientCountryOrRegion":"Country", + "ClientIP":"0.0.0.0", + "ClientModel":"Other", + "ClientOS":"Linux", + "ClientStateOrProvince":"Province", + "ClientType":"PC", + "IKey":"00000000-0000-0000-0000-000000000000", + "_BilledSize":444, + "SDKVersion":"dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", + "Properties": { + "an_attribute": "a_value", + "another_attribute": "another_value" + }, + "Name":"metric.name", + "Sum":8, + "Min":8, + "Max":8, + "ItemCount":1 +} +]}` + +func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) { + event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}} + logger := zap.NewNop() + unmarshaler := newAzureResourceMetricsUnmarshaler( + component.BuildInfo{ + Command: "Test", + Description: "Test", + Version: "Test", + }, + logger, + &Config{}, + ) + metrics, err := unmarshaler.UnmarshalMetrics(&event) + + assert.NoError(t, err) + assert.Equal(t, 9, metrics.MetricCount()) +} + +func TestAzureResourceMetricsUnmarshaler_UnmarshalAppMetricsWithAttributes(t *testing.T) { + event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}} + logger := zap.NewNop() + unmarshaler := newAzureResourceMetricsUnmarshaler( + component.BuildInfo{ + Command: "Test", + Description: "Test", + Version: "Test", + }, + logger, + &Config{}, + ) + metrics, err := unmarshaler.UnmarshalMetrics(&event) + + assert.NoError(t, err) + + expectedAttributes := map[string]string{ + "service.instance.id": "00000000-0000-0000-0000-000000000000", + "service.name": "service", + "service.version": "1.0.0.0", + "telemetry.sdk.version": "dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", + "cloud.provider": "azure", + "cloud.region": "Country", + "azure.resource.id": "/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "os.name": "Linux", + "an_attribute": "a_value", + "another_attribute": "another_value", + } + metric := metrics.ResourceMetrics().At(1).Resource() + + assert.Equal(t, len(expectedAttributes), metric.Attributes().Len()) + + for k, expected := range expectedAttributes { + actual, ok := metric.Attributes().Get(k) + + if !ok { + t.Errorf("Attribute %s not found", k) + continue + } + + assert.Equal(t, expected, actual.AsString()) + } +} + +func TestAzureResourceMetricsUnmarshaler_UnmarshalAggregatedAppMetrics(t *testing.T) { + event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}} + logger := zap.NewNop() + unmarshaler := newAzureResourceMetricsUnmarshaler( + component.BuildInfo{ + Command: "Test", + Description: "Test", + Version: "Test", + }, + logger, + &Config{ + MetricAggregation: "average", + }, + ) + metrics, err := unmarshaler.UnmarshalMetrics(&event) + + assert.NoError(t, err) + assert.Equal(t, 2, metrics.MetricCount()) + + resMetric := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(t, "dependencies/duration", resMetric.Name()) + assert.Equal(t, 534.4407913043478, resMetric.Gauge().DataPoints().At(0).DoubleValue()) + + appMetric := metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(t, "metric.name", appMetric.Name()) + assert.Equal(t, 8.0, appMetric.Gauge().DataPoints().At(0).DoubleValue()) +} diff --git a/receiver/azureeventhubreceiver/config.go b/receiver/azureeventhubreceiver/config.go index 81345caf8c5a1..49d17e6c68dfd 100644 --- a/receiver/azureeventhubreceiver/config.go +++ b/receiver/azureeventhubreceiver/config.go @@ -35,6 +35,7 @@ type Config struct { ConsumerGroup string `mapstructure:"group"` ApplySemanticConventions bool `mapstructure:"apply_semantic_conventions"` TimeFormats TimeFormat `mapstructure:"time_formats"` + MetricAggregation string `mapstructure:"metric_aggregation"` // azeventhub lib specific PollRate int `mapstructure:"poll_rate"` diff --git a/receiver/azureeventhubreceiver/factory.go b/receiver/azureeventhubreceiver/factory.go index cc0ee1098f1fb..9ab1b692f5017 100644 --- a/receiver/azureeventhubreceiver/factory.go +++ b/receiver/azureeventhubreceiver/factory.go @@ -117,7 +117,7 @@ func (f *eventhubReceiverFactory) getReceiver( metricsUnmarshaler = nil err = errors.New("raw format not supported for Metrics") } else { - metricsUnmarshaler = newAzureResourceMetricsUnmarshaler(settings.BuildInfo, settings.Logger, receiverConfig.TimeFormats.Metrics) + metricsUnmarshaler = newAzureResourceMetricsUnmarshaler(settings.BuildInfo, settings.Logger, receiverConfig) } case pipeline.SignalTraces: if logFormat(receiverConfig.Format) == rawLogFormat { diff --git a/receiver/azureeventhubreceiver/testdata/config.yaml b/receiver/azureeventhubreceiver/testdata/config.yaml index 091b2cbe0a48c..30ae723c1708c 100644 --- a/receiver/azureeventhubreceiver/testdata/config.yaml +++ b/receiver/azureeventhubreceiver/testdata/config.yaml @@ -21,3 +21,7 @@ service: receivers: [azureeventhub, azureeventhub/all] processors: [nop] exporters: [nop] + metrics: + receivers: [azureeventhub, azureeventhub/all] + processors: [nop] + exporters: [nop] diff --git a/receiver/azureeventhubreceiver/testdata/metrics.json b/receiver/azureeventhubreceiver/testdata/metrics.json new file mode 100644 index 0000000000000..8da42779dd193 --- /dev/null +++ b/receiver/azureeventhubreceiver/testdata/metrics.json @@ -0,0 +1,44 @@ +{ + "records":[ + { + "count":23, + "total":12292.1382, + "minimum":27.4786, + "maximum":6695.419, + "average":534.440791304348, + "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "time":"2025-07-14T12:45:00.0000000Z", + "metricName":"dependencies/duration", + "timeGrain":"PT1M" + }, + { + "time":"2025-07-14T12:35:36.3259399Z", + "resourceId":"/SUBSCRIPTIONS/00000000-0000-0000-0000-000000000000/RESOURCEGROUPS/RG/PROVIDERS/MICROSOFT.INSIGHTS/COMPONENTS/SERVICE", + "ResourceGUID":"00000000-0000-0000-0000-000000000000", + "Type":"AppMetrics", + "AppRoleInstance":"00000000-0000-0000-0000-000000000000", + "AppRoleName":"service", + "AppVersion":"1.0.0.0", + "ClientBrowser":"Other", + "ClientCity":"City", + "ClientCountryOrRegion":"Country", + "ClientIP":"0.0.0.0", + "ClientModel":"Other", + "ClientOS":"Linux", + "ClientStateOrProvince":"Province", + "ClientType":"PC", + "IKey":"00000000-0000-0000-0000-000000000000", + "_BilledSize":444, + "SDKVersion":"dotnetiso:1.1.0.0_dotnet8.0.16:otel1.12.0:ext1.4.0", + "Properties": { + "an_attribute": "a_value", + "another_attribute": "another_value" + }, + "Name":"metric.name", + "Sum":8, + "Min":8, + "Max":8, + "ItemCount":1 + } + ] +} \ No newline at end of file