diff --git a/.chloggen/41347.yaml b/.chloggen/41347.yaml new file mode 100644 index 0000000000000..febb818055bc0 --- /dev/null +++ b/.chloggen/41347.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix panic caused by mutating read-only ResourceMetrics + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41347] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/prometheusremotewritereceiver/README.md b/receiver/prometheusremotewritereceiver/README.md index a8701355e5cc8..8f2b804f02729 100644 --- a/receiver/prometheusremotewritereceiver/README.md +++ b/receiver/prometheusremotewritereceiver/README.md @@ -84,3 +84,13 @@ This approach has some limitations, for example: - If the process dies or restarts, the cache will be lost. - Some inconsistencies can happen according to the order of the requests and the current cache size. - The limit of 1000 resource metrics is hardcoded and not configurable for now. + +Request Handling Scenarios + +- If `target_info` arrives first: + - The associated labels are cached as a `pcommon.Map`. + - Subsequent metric data with matching job/instance labels will use these cached resource attributes. + +- If normal metrics arrive first: + - The receiver cannot predict when (or if) the corresponding `target_info` will arrive. + - The metrics are forwarded immediately without resource attributes. diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index c5637638c7372..c49d2118c6b5c 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -129,7 +129,7 @@ require ( golang.org/x/crypto v0.41.0 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect - golang.org/x/sys v0.35.0 // indirect + golang.org/x/sys v0.36.0 // indirect golang.org/x/text v0.28.0 // indirect golang.org/x/time v0.12.0 // indirect google.golang.org/api v0.239.0 // indirect diff --git a/receiver/prometheusremotewritereceiver/go.sum b/receiver/prometheusremotewritereceiver/go.sum index 142ae20d63836..447df6d6700b1 100644 --- a/receiver/prometheusremotewritereceiver/go.sum +++ b/receiver/prometheusremotewritereceiver/go.sum @@ -459,8 +459,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 175e38d852d4b..b6f8ce18e14aa 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -35,7 +35,7 @@ import ( ) func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) { - cache, err := lru.New[uint64, pmetric.ResourceMetrics](1000) + cache, err := lru.New[uint64, pcommon.Map](1000) if err != nil { return nil, fmt.Errorf("failed to create LRU cache: %w", err) } @@ -47,7 +47,7 @@ func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsume server: &http.Server{ ReadTimeout: 60 * time.Second, }, - rmCache: cache, + attrCache: cache, }, nil } @@ -59,8 +59,8 @@ type prometheusRemoteWriteReceiver struct { server *http.Server wg sync.WaitGroup - rmCache *lru.Cache[uint64, pmetric.ResourceMetrics] - obsrecv *receiverhelper.ObsReport + attrCache *lru.Cache[uint64, pcommon.Map] + obsrecv *receiverhelper.ObsReport } // metricIdentity contains all the components that uniquely identify a metric @@ -195,6 +195,9 @@ func (prw *prometheusRemoteWriteReceiver) handlePRW(w http.ResponseWriter, req * w.WriteHeader(http.StatusNoContent) obsrecvCtx := prw.obsrecv.StartMetricsOp(req.Context()) + if m.MetricCount() == 0 { + return + } err = prw.nextConsumer.ConsumeMetrics(req.Context(), m) if err != nil { prw.settings.Logger.Error("Error consuming metrics", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err}) @@ -246,8 +249,13 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr } // The key is composed by: resource_hash:scope_name:scope_version:metric_name:unit:type metricCache = make(map[uint64]pmetric.Metric) + // resourceMetricCache caches ResourceMetrics by job/instance labels + resourceMetricCache = make(map[uint64]pmetric.ResourceMetrics) ) + // First, extract any target_info metric, if any. + prw.extractTargetInfo(req, &labelsBuilder) + for i := range req.Timeseries { ts := &req.Timeseries[i] ls := ts.ToLabels(&labelsBuilder, req.Symbols) @@ -260,28 +268,8 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr continue } - // If the metric name is equal to target_info, we use its labels as attributes of the resource - // Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1 if metadata.Name == "target_info" { - var rm pmetric.ResourceMetrics - hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - - if existingRM, ok := prw.rmCache.Get(hashedLabels); ok { - rm = existingRM - } else { - rm = otelMetrics.ResourceMetrics().AppendEmpty() - } - - attrs := rm.Resource().Attributes() - parseJobAndInstance(attrs, ls.Get("job"), ls.Get("instance")) - - // Add the remaining labels as resource attributes - for labelName, labelValue := range ls.Map() { - if labelName != "job" && labelName != "instance" && !schema.IsMetadataLabel(labelName) { - attrs.PutStr(labelName, labelValue) - } - } - prw.rmCache.Add(hashedLabels, rm) + // We already converted the target_info labels into resource attributes. Continue. continue } @@ -302,20 +290,23 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // Handle histograms separately due to their complex mixed-schema processing if ts.Metadata.Type == writev2.Metadata_METRIC_TYPE_HISTOGRAM { - prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats) + prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats, resourceMetricCache) continue } // Handle regular metrics (gauge, counter, summary) hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - existingRM, ok := prw.rmCache.Get(hashedLabels) + existingRM, ok := resourceMetricCache[hashedLabels] var rm pmetric.ResourceMetrics if ok { rm = existingRM } else { rm = otelMetrics.ResourceMetrics().AppendEmpty() + if attr, ok := prw.attrCache.Get(hashedLabels); ok { + attr.CopyTo(rm.Resource().Attributes()) + } parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - prw.rmCache.Add(hashedLabels, rm) + resourceMetricCache[hashedLabels] = rm } resourceID := identity.OfResource(rm.Resource()) @@ -395,6 +386,29 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr return otelMetrics, stats, badRequestErrors } +func (prw *prometheusRemoteWriteReceiver) extractTargetInfo(req *writev2.Request, labelsBuilder *labels.ScratchBuilder) { + for i := range req.Timeseries { + ts := &req.Timeseries[i] + ls := ts.ToLabels(labelsBuilder, req.Symbols) + metadata := schema.NewMetadataFromLabels(ls) + // If the metric name is equal to target_info, we use its labels as attributes of the resource + // Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1 + if metadata.Name == "target_info" { + attrs := pcommon.NewMap() + parseJobAndInstance(attrs, ls.Get("job"), ls.Get("instance")) + // Add the remaining labels as resource attributes + for labelName, labelValue := range ls.Map() { + if labelName != "job" && labelName != "instance" && !schema.IsMetadataLabel(labelName) { + attrs.PutStr(labelName, labelValue) + } + } + + hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) + prw.attrCache.Add(hashedLabels, attrs) + } + } +} + // processHistogramTimeSeries handles all histogram processing, including validation and mixed schemas. func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( otelMetrics pmetric.Metrics, @@ -403,6 +417,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( scopeName, scopeVersion, metricName, unit, description string, metricCache map[uint64]pmetric.Metric, stats *promremote.WriteResponseStats, + resourceMetricCache map[uint64]pmetric.ResourceMetrics, ) { // Drop classic histogram series (those with samples) if len(ts.Samples) != 0 { @@ -438,13 +453,13 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( // Create resource if needed (only for the first valid histogram) if hashedLabels == 0 { hashedLabels = xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - existingRM, ok := prw.rmCache.Get(hashedLabels) + existingRM, ok := resourceMetricCache[hashedLabels] if ok { rm = existingRM } else { rm = otelMetrics.ResourceMetrics().AppendEmpty() parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - prw.rmCache.Add(hashedLabels, rm) + resourceMetricCache[hashedLabels] = rm } resourceID = identity.OfResource(rm.Resource()) } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 942a8b42fa618..c79b1966e4b79 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -22,6 +22,7 @@ import ( writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -83,7 +84,7 @@ func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver { // Add cleanup to ensure LRU cache is properly purged t.Cleanup(func() { - writeReceiver.rmCache.Purge() + writeReceiver.attrCache.Purge() }) return writeReceiver @@ -1414,8 +1415,8 @@ func TestTranslateV2(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - // since we are using the rmCache to store values across requests, we need to clear it after each test, otherwise it will affect the next test - prwReceiver.rmCache.Purge() + // since we are using the attrCache to store values across requests, we need to clear it after each test, otherwise it will affect the next test + prwReceiver.attrCache.Purge() metrics, stats, err := prwReceiver.translateV2(ctx, tc.request) if tc.expectError != "" { assert.ErrorContains(t, err, tc.expectError) @@ -1449,13 +1450,22 @@ func (m *mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) err defer m.mu.Unlock() m.metrics = append(m.metrics, md) m.dataPoints += md.DataPointCount() + md.MarkReadOnly() return nil } +func (m *mockConsumer) clearAll() { + m.mu.Lock() + defer m.mu.Unlock() + m.metrics = make([]pmetric.Metrics, 0) + m.dataPoints = 0 +} + func TestTargetInfoWithMultipleRequests(t *testing.T) { tests := []struct { - name string - requests []*writev2.Request + name string + requests []*writev2.Request + expectedMetrics func() pmetric.Metrics }{ { name: "target_info first, normal metric second", @@ -1494,6 +1504,31 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { }, }, }, + expectedMetrics: func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + attrs := rm.Resource().Attributes() + attrs.PutStr("service.namespace", "production") + attrs.PutStr("service.name", "service_a") + attrs.PutStr("service.instance.id", "host1") + attrs.PutStr("machine_type", "n1-standard-1") + attrs.PutStr("cloud_provider", "gcp") + attrs.PutStr("region", "us-central1") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + m1 := sm.Metrics().AppendEmpty() + m1.SetName("normal_metric") + m1.SetUnit("") + m1.SetDescription("") + dp1 := m1.SetEmptyGauge().DataPoints().AppendEmpty() + dp1.SetDoubleValue(2.0) + dp1.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) + dp1.Attributes().PutStr("foo", "bar") + + return metrics + }, }, { name: "normal metric first, target_info second", @@ -1532,36 +1567,32 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { }, }, }, + expectedMetrics: func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + attrs := rm.Resource().Attributes() + attrs.PutStr("service.namespace", "production") + attrs.PutStr("service.name", "service_a") + attrs.PutStr("service.instance.id", "host1") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + m1 := sm.Metrics().AppendEmpty() + m1.SetName("normal_metric") + m1.SetUnit("") + m1.SetDescription("") + dp1 := m1.SetEmptyGauge().DataPoints().AppendEmpty() + dp1.SetDoubleValue(2.0) + dp1.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) + dp1.Attributes().PutStr("foo", "bar") + + return metrics + }, }, } // Using the same expected metrics for both tests, because we are just checking if the order of the requests changes the result. - expectedMetrics := func() pmetric.Metrics { - metrics := pmetric.NewMetrics() - rm := metrics.ResourceMetrics().AppendEmpty() - attrs := rm.Resource().Attributes() - attrs.PutStr("service.namespace", "production") - attrs.PutStr("service.name", "service_a") - attrs.PutStr("service.instance.id", "host1") - attrs.PutStr("machine_type", "n1-standard-1") - attrs.PutStr("cloud_provider", "gcp") - attrs.PutStr("region", "us-central1") - - sm := rm.ScopeMetrics().AppendEmpty() - sm.Scope().SetName("OpenTelemetry Collector") - sm.Scope().SetVersion("latest") - m1 := sm.Metrics().AppendEmpty() - m1.SetName("normal_metric") - m1.SetUnit("") - m1.SetDescription("") - dp1 := m1.SetEmptyGauge().DataPoints().AppendEmpty() - dp1.SetDoubleValue(2.0) - dp1.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) - dp1.Attributes().PutStr("foo", "bar") - - return metrics - }() - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mockConsumer := new(mockConsumer) @@ -1592,7 +1623,7 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body)) } - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, mockConsumer.metrics[0])) + assert.NoError(t, pmetrictest.CompareMetrics(tt.expectedMetrics(), mockConsumer.metrics[0])) }) } } @@ -1605,10 +1636,10 @@ func TestLRUCacheResourceMetrics(t *testing.T) { prwReceiver := setupMetricsReceiver(t) // Set a small cache size to emulate the cache eviction - prwReceiver.rmCache.Resize(1) + prwReceiver.attrCache.Resize(1) t.Cleanup(func() { - prwReceiver.rmCache.Purge() + prwReceiver.attrCache.Purge() }) // Metric 1. @@ -1747,6 +1778,9 @@ func TestLRUCacheResourceMetrics(t *testing.T) { attrs.PutStr("service.namespace", "production") attrs.PutStr("service.name", "service_a") attrs.PutStr("service.instance.id", "host1") + attrs.PutStr("machine_type", "n1-standard-1") + attrs.PutStr("cloud_provider", "gcp") + attrs.PutStr("region", "us-central1") sm := rm.ScopeMetrics().AppendEmpty() sm.Scope().SetName("OpenTelemetry Collector") @@ -1790,9 +1824,9 @@ func TestLRUCacheResourceMetrics(t *testing.T) { // As target_info and metric1 have the same job/instance, they generate the same end metric: mockConsumer.metrics[0]. assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[0])) // As metric2 have different job/instance, it generates a different end metric: mockConsumer.metrics[2]. At this point, the cache is full it should evict the target_info metric to store the metric2. - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics2, mockConsumer.metrics[2])) + assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics2, mockConsumer.metrics[1])) // As just have 1 slot in the cache, but the cache for metric1 was evicted, this metric1_1 should generate a new resource metric, even having the same job/instance than the metric1. - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1_1, mockConsumer.metrics[3])) + assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1_1, mockConsumer.metrics[2])) } func buildMetaDataMapByID(ms pmetric.Metrics) map[string]map[string]any { @@ -1819,3 +1853,264 @@ func buildMetaDataMapByID(ms pmetric.Metrics) map[string]map[string]any { } return result } + +func TestReceiverWithExpectedMetrics(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + prwReceiver, err := factory.CreateMetrics(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil) + require.NoError(t, err) + + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: component.MustNewID("test"), + Transport: "http", + ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type), + }) + require.NoError(t, err) + + consumer := &mockConsumer{} + receiver := prwReceiver.(*prometheusRemoteWriteReceiver) + receiver.nextConsumer = consumer + receiver.obsrecv = obsrecv + + ts := httptest.NewServer(http.HandlerFunc(receiver.handlePRW)) + defer ts.Close() + + testCases := []struct { + name string + requests []*writev2.Request + expectedMetrics []pmetric.Metrics + }{ + { + name: "Single metric, one sample", + requests: []*writev2.Request{ + { + Symbols: []string{"", "__name__", "cpu_usage", "job", "jobA", "instance", "instA"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 0.5, Timestamp: 1000}}, + }, + }, + }, + }, + expectedMetrics: []pmetric.Metrics{ + func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.instance.id", "instA") + rm.Resource().Attributes().PutStr("service.name", "jobA") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + m := sm.Metrics().AppendEmpty() + m.SetName("cpu_usage") + m.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(0.5) + m.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) // convert ms to ns + return metrics + }(), + }, + }, + { + name: "Two metrics, one sample each", + requests: []*writev2.Request{ + { + Symbols: []string{"", "__name__", "cpu_usage", "job", "jobA", "instance", "instA", "memory_usage"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, + }, + { + LabelsRefs: []uint32{1, 7, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 2048.0, Timestamp: 1000}}, + }, + }, + }, + }, + expectedMetrics: []pmetric.Metrics{ + func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.name", "jobA") + rm.Resource().Attributes().PutStr("service.instance.id", "instA") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + + // cpu_usage + m1 := sm.Metrics().AppendEmpty() + m1.SetName("cpu_usage") + m1.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(1.0) + m1.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + // memory_usage + m2 := sm.Metrics().AppendEmpty() + m2.SetName("memory_usage") + m2.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(2048.0) + m2.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + return metrics + }(), + }, + }, + { + name: "Two resources, Two metrics, one sample each", + requests: []*writev2.Request{ + { + Symbols: []string{"", "__name__", "cpu_usage", "job", "jobA", "instance", "instA", "memory_usage", "jobB", "instB"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, + }, + { + LabelsRefs: []uint32{1, 7, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 2048.0, Timestamp: 1000}}, + }, + { + LabelsRefs: []uint32{1, 2, 3, 8, 5, 9}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 1000}}, + }, + { + LabelsRefs: []uint32{1, 7, 3, 8, 5, 9}, + Samples: []writev2.Sample{{Value: 4096.0, Timestamp: 1000}}, + }, + }, + }, + }, + expectedMetrics: []pmetric.Metrics{ + func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.name", "jobA") + rm.Resource().Attributes().PutStr("service.instance.id", "instA") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + + // cpu_usage + m1 := sm.Metrics().AppendEmpty() + m1.SetName("cpu_usage") + m1.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(1.0) + m1.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + // memory_usage + m2 := sm.Metrics().AppendEmpty() + m2.SetName("memory_usage") + m2.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(2048.0) + m2.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + rm = metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.name", "jobB") + rm.Resource().Attributes().PutStr("service.instance.id", "instB") + + sm = rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + + // cpu_usage + m1 = sm.Metrics().AppendEmpty() + m1.SetName("cpu_usage") + m1.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(2.0) + m1.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + // memory_usage + m2 = sm.Metrics().AppendEmpty() + m2.SetName("memory_usage") + m2.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(4096.0) + m2.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + return metrics + }(), + }, + }, + { + name: "Same job-instance, multiple metrics", + requests: []*writev2.Request{ + { + Symbols: []string{"", "__name__", "metric1", "job", "jobA", "instance", "instA", "metric2"}, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED}, + LabelsRefs: []uint32{1, 7, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + }, + }, + }, + expectedMetrics: []pmetric.Metrics{ + func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.name", "jobA") + rm.Resource().Attributes().PutStr("service.instance.id", "instA") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + + m := sm.Metrics().AppendEmpty() + m.SetName("metric1") + m.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(1) + m.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1 * 1e6)) + + m = sm.Metrics().AppendEmpty() + m.SetName("metric2") + m.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(2) + m.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(2 * 1e6)) + return metrics + }(), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + consumer.clearAll() + require.Len(t, tc.requests, len(tc.expectedMetrics)) + + for _, req := range tc.requests { + pBuf := proto.NewBuffer(nil) + require.NoError(t, pBuf.Marshal(req)) + + resp, err := http.Post( + ts.URL, + fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2), + bytes.NewBuffer(pBuf.Bytes()), + ) + require.NoError(t, err) + defer resp.Body.Close() + _, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, resp.StatusCode) + } + + require.Len(t, consumer.metrics, len(tc.expectedMetrics)) + + for i := range tc.expectedMetrics { + actual := consumer.metrics[i] + expected := tc.expectedMetrics[i] + + assert.NoError(t, pmetrictest.CompareMetrics(expected, actual)) + } + }) + } +}