From 0d28a7c79b3c284569a5ea81f26ed57a67673c76 Mon Sep 17 00:00:00 2001 From: perebaj Date: Wed, 8 Oct 2025 21:48:17 -0300 Subject: [PATCH 1/5] [receiver/prometheusremotewritereceiver] concurrency bug Signed-off-by: perebaj --- .../prometheusremotewritereceiver/receiver.go | 25 ++-- .../receiver_test.go | 134 ++++++++++++++++++ 2 files changed, 150 insertions(+), 9 deletions(-) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 175e38d852d4b..ce8ac6df843d3 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -260,14 +260,16 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr continue } + // Create a new resource metrics to avoid modifying the existing one and avoid race conditions when adding or removing attributes. + rm := pmetric.NewResourceMetrics() // If the metric name is equal to target_info, we use its labels as attributes of the resource // Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1 if metadata.Name == "target_info" { - var rm pmetric.ResourceMetrics hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) if existingRM, ok := prw.rmCache.Get(hashedLabels); ok { - rm = existingRM + // We need to copy the resource metrics to avoid modifying the existing one and avoid race conditions when adding or removing attributes. + existingRM.CopyTo(rm) } else { rm = otelMetrics.ResourceMetrics().AppendEmpty() } @@ -281,6 +283,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr attrs.PutStr(labelName, labelValue) } } + // After changing the resource metrics, we added the copied version to the cache. prw.rmCache.Add(hashedLabels, rm) continue } @@ -309,12 +312,12 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // Handle regular metrics (gauge, counter, summary) hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) existingRM, ok := prw.rmCache.Get(hashedLabels) - var rm pmetric.ResourceMetrics if ok { - rm = existingRM + existingRM.CopyTo(rm) } else { rm = otelMetrics.ResourceMetrics().AppendEmpty() parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) + // After changing the resource metrics, we added the copied version to the cache. prw.rmCache.Add(hashedLabels, rm) } @@ -411,10 +414,12 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( return } - var rm pmetric.ResourceMetrics - var hashedLabels uint64 - var resourceID identity.Resource - var scope pmetric.ScopeMetrics + var ( + hashedLabels uint64 + resourceID identity.Resource + scope pmetric.ScopeMetrics + rm pmetric.ResourceMetrics + ) for i := range ts.Histograms { histogram := &ts.Histograms[i] @@ -440,10 +445,12 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( hashedLabels = xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) existingRM, ok := prw.rmCache.Get(hashedLabels) if ok { - rm = existingRM + // We need to copy the resource metrics to avoid modifying the existing one and avoid race conditions when adding or removing attributes. + existingRM.CopyTo(rm) } else { rm = otelMetrics.ResourceMetrics().AppendEmpty() parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) + // After changing the resource metrics, we added the copied version to the cache. prw.rmCache.Add(hashedLabels, rm) } resourceID = identity.OfResource(rm.Resource()) diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 942a8b42fa618..fe345497d1816 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -11,6 +11,7 @@ import ( "math" "net/http" "net/http/httptest" + "strconv" "sync" "testing" "time" @@ -1819,3 +1820,136 @@ func buildMetaDataMapByID(ms pmetric.Metrics) map[string]map[string]any { } return result } + +// TestConcurrentRequestsforSameResourceAttributes reproduces the concurrency bug where subsequent requests +// fail to reach the next consumer after the first successful request. +func TestConcurrentRequestsforSameResourceAttributes(t *testing.T) { + mockConsumer := &mockConsumer{} + prwReceiver := setupMetricsReceiver(t) + prwReceiver.nextConsumer = mockConsumer + + // Create a test HTTP server + ts := httptest.NewServer(http.HandlerFunc(prwReceiver.handlePRW)) + defer ts.Close() + + // Create multiple requests with the same job/instance labels (triggering cache key collision) + createRequest := func(metricName string, value float64, timestamp int64) *writev2.Request { + return &writev2.Request{ + Symbols: []string{ + "", // 0 + "__name__", metricName, // 1, 2 + "job", "test_job", // 3, 4 + "instance", "test_instance", // 5, 6 + }, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: value, Timestamp: timestamp}}, + }, + }, + } + } + + // Prepare requests + requests := []*writev2.Request{} + for i := 0; i < 5; i++ { + requests = append(requests, createRequest("metric_"+strconv.Itoa(i+1), float64(i+1)*10, int64(i+1)*1000)) + } + + // Send requests concurrently + var wg sync.WaitGroup + var httpResults []int // Store HTTP status codes + var mu sync.Mutex + + for i, req := range requests { + wg.Add(1) + go func(_ int, request *writev2.Request) { + defer wg.Done() + + pBuf := proto.NewBuffer(nil) + err := pBuf.Marshal(request) + assert.NoError(t, err) + + resp, err := http.Post( + ts.URL, + fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2), + bytes.NewBuffer(pBuf.Bytes()), + ) + assert.NoError(t, err) + defer resp.Body.Close() + + mu.Lock() + httpResults = append(httpResults, resp.StatusCode) + mu.Unlock() + }(i, req) + } + wg.Wait() + + // Give some time for async processing + time.Sleep(100 * time.Millisecond) + + // Analyze results + mockConsumer.mu.Lock() + receivedMetrics := len(mockConsumer.metrics) + totalDataPoints := mockConsumer.dataPoints + mockConsumer.mu.Unlock() + + // Verify all HTTP requests succeeded + for i, status := range httpResults { + assert.Equal(t, http.StatusNoContent, status, "Request %d should return 204", i+1) + } + + // The expected behavior is: + // - All HTTP requests return 204 (success) + // - All 5 data points are present (no data loss) + // - We have 5 resource attributes, each with 1 data point. The resource attributes are equal since they have the same job/instance labels. + // - The cache should have a single resource attribute. + assert.Equal(t, 5, totalDataPoints) + assert.Equal(t, 1, prwReceiver.rmCache.Len()) + + // Additional debugging info + t.Logf("Metrics batches received: %d", receivedMetrics) + for i, metrics := range mockConsumer.metrics { + t.Logf("Batch %d: %d resource metrics, %d data points", + i+1, metrics.ResourceMetrics().Len(), metrics.DataPointCount()) + } + + // Additional analysis: Check if we're getting mixed data due to concurrent mutations + + t.Logf("=== DETAILED ANALYSIS ===") + for i, metrics := range mockConsumer.metrics { + resourceMetrics := metrics.ResourceMetrics() + t.Logf("Batch %d:", i+1) + for j := 0; j < resourceMetrics.Len(); j++ { + rm := resourceMetrics.At(j) + scopeMetrics := rm.ScopeMetrics() + for k := 0; k < scopeMetrics.Len(); k++ { + scope := scopeMetrics.At(k) + metricsList := scope.Metrics() + for l := 0; l < metricsList.Len(); l++ { + metric := metricsList.At(l) + t.Logf(" - Metric: %s, DataPoints: %d", metric.Name(), metric.Gauge().DataPoints().Len()) + } + } + } + } + + // Verify thread safety: Check that metrics are properly consolidated without corruption + for i, metrics := range mockConsumer.metrics { + if metrics.DataPointCount() > 0 { + resourceMetrics := metrics.ResourceMetrics() + for j := 0; j < resourceMetrics.Len(); j++ { + rm := resourceMetrics.At(j) + scopeMetrics := rm.ScopeMetrics() + for k := 0; k < scopeMetrics.Len(); k++ { + scope := scopeMetrics.At(k) + metricsCount := scope.Metrics().Len() + if metricsCount != 1 { + t.Errorf("Batch %d: Found %d datapoints when it should be 1", i+1, metricsCount) + } + } + } + } + } +} From f610e7a1bf41dac93b4f46162678395ed9758197 Mon Sep 17 00:00:00 2001 From: perebaj Date: Wed, 15 Oct 2025 17:02:22 -0300 Subject: [PATCH 2/5] getOrCreateRM Signed-off-by: perebaj --- .../prometheusremotewritereceiver/receiver.go | 76 +++++++++---------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index ce8ac6df843d3..d81bf4df63f1e 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -232,13 +232,40 @@ func (*prometheusRemoteWriteReceiver) parseProto(contentType string) (promconfig return promconfig.RemoteWriteProtoMsgV1, nil } +// getOrCreateRM is a helper to fetch or create the in-batch ResourceMetrics for the given labels. +// It prefers the per-request map, and seeds attributes from the global cache +// when available, but never returns a detached ResourceMetrics. +func (prw *prometheusRemoteWriteReceiver) getOrCreateRM(ls labels.Labels, otelMetrics pmetric.Metrics, reqRM map[uint64]pmetric.ResourceMetrics) (pmetric.ResourceMetrics, uint64) { + hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) + + if rm, ok := reqRM[hashedLabels]; ok { + return rm, hashedLabels + } + + rm := otelMetrics.ResourceMetrics().AppendEmpty() + if existingRM, ok := prw.rmCache.Get(hashedLabels); ok { + // Copy only resource attributes from cached snapshot to avoid sharing scopes/metrics. + existingRM.Resource().Attributes().CopyTo(rm.Resource().Attributes()) + } else { + // Seed with job/instance derived attributes and snapshot to cache for future requests. + parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) + snapshot := pmetric.NewResourceMetrics() + rm.Resource().Attributes().CopyTo(snapshot.Resource().Attributes()) + prw.rmCache.Add(hashedLabels, snapshot) + } + + reqRM[hashedLabels] = rm + return rm, hashedLabels +} + // translateV2 translates a v2 remote-write request into OTLP metrics. // translate is not feature complete. func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { var ( badRequestErrors error - otelMetrics = pmetric.NewMetrics() - labelsBuilder = labels.NewScratchBuilder(0) + // otelMetrics represents the final metrics, after all the processing, that will be returned by the receiver. + otelMetrics = pmetric.NewMetrics() + labelsBuilder = labels.NewScratchBuilder(0) // More about stats: https://github.com/prometheus/docs/blob/main/docs/specs/prw/remote_write_spec_2_0.md#required-written-response-headers // TODO: add exemplars to the stats. stats = promremote.WriteResponseStats{ @@ -246,6 +273,10 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr } // The key is composed by: resource_hash:scope_name:scope_version:metric_name:unit:type metricCache = make(map[uint64]pmetric.Metric) + // Request-local map to ensure all timeseries for the same resource append to the + // same in-batch ResourceMetrics, avoiding detached copies and concurrency issues + // with the global cache. + reqRM = make(map[uint64]pmetric.ResourceMetrics) ) for i := range req.Timeseries { @@ -260,22 +291,11 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr continue } - // Create a new resource metrics to avoid modifying the existing one and avoid race conditions when adding or removing attributes. - rm := pmetric.NewResourceMetrics() // If the metric name is equal to target_info, we use its labels as attributes of the resource // Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1 if metadata.Name == "target_info" { - hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - - if existingRM, ok := prw.rmCache.Get(hashedLabels); ok { - // We need to copy the resource metrics to avoid modifying the existing one and avoid race conditions when adding or removing attributes. - existingRM.CopyTo(rm) - } else { - rm = otelMetrics.ResourceMetrics().AppendEmpty() - } - + rm, _ := prw.getOrCreateRM(ls, otelMetrics, reqRM) attrs := rm.Resource().Attributes() - parseJobAndInstance(attrs, ls.Get("job"), ls.Get("instance")) // Add the remaining labels as resource attributes for labelName, labelValue := range ls.Map() { @@ -283,8 +303,6 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr attrs.PutStr(labelName, labelValue) } } - // After changing the resource metrics, we added the copied version to the cache. - prw.rmCache.Add(hashedLabels, rm) continue } @@ -305,21 +323,12 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // Handle histograms separately due to their complex mixed-schema processing if ts.Metadata.Type == writev2.Metadata_METRIC_TYPE_HISTOGRAM { - prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats) + prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats, reqRM) continue } // Handle regular metrics (gauge, counter, summary) - hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - existingRM, ok := prw.rmCache.Get(hashedLabels) - if ok { - existingRM.CopyTo(rm) - } else { - rm = otelMetrics.ResourceMetrics().AppendEmpty() - parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - // After changing the resource metrics, we added the copied version to the cache. - prw.rmCache.Add(hashedLabels, rm) - } + rm, _ := prw.getOrCreateRM(ls, otelMetrics, reqRM) resourceID := identity.OfResource(rm.Resource()) metricIdentity := createMetricIdentity( @@ -406,6 +415,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( scopeName, scopeVersion, metricName, unit, description string, metricCache map[uint64]pmetric.Metric, stats *promremote.WriteResponseStats, + reqRM map[uint64]pmetric.ResourceMetrics, ) { // Drop classic histogram series (those with samples) if len(ts.Samples) != 0 { @@ -442,17 +452,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( } // Create resource if needed (only for the first valid histogram) if hashedLabels == 0 { - hashedLabels = xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - existingRM, ok := prw.rmCache.Get(hashedLabels) - if ok { - // We need to copy the resource metrics to avoid modifying the existing one and avoid race conditions when adding or removing attributes. - existingRM.CopyTo(rm) - } else { - rm = otelMetrics.ResourceMetrics().AppendEmpty() - parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - // After changing the resource metrics, we added the copied version to the cache. - prw.rmCache.Add(hashedLabels, rm) - } + rm, _ = prw.getOrCreateRM(ls, otelMetrics, reqRM) resourceID = identity.OfResource(rm.Resource()) } From a3c5cb4bd16764b232d84d2f161809609b40a5df Mon Sep 17 00:00:00 2001 From: perebaj Date: Wed, 22 Oct 2025 17:07:26 -0300 Subject: [PATCH 3/5] remove wrong tests and add chlog file Signed-off-by: perebaj --- .chloggen/concurrency-bug.yaml | 27 ++++++++++++ .../prometheusremotewritereceiver/receiver.go | 38 ++++++++++++----- .../receiver_test.go | 42 +------------------ 3 files changed, 57 insertions(+), 50 deletions(-) create mode 100644 .chloggen/concurrency-bug.yaml diff --git a/.chloggen/concurrency-bug.yaml b/.chloggen/concurrency-bug.yaml new file mode 100644 index 0000000000000..68c2a8ed1fa32 --- /dev/null +++ b/.chloggen/concurrency-bug.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewritereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fixed a concurrency bug in the Prometheus remote write receiver where concurrent requests with identical job/instance labels would return empty responses after the first successful request. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [42159] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index d81bf4df63f1e..50a51485c3b11 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -232,9 +232,23 @@ func (*prometheusRemoteWriteReceiver) parseProto(contentType string) (promconfig return promconfig.RemoteWriteProtoMsgV1, nil } -// getOrCreateRM is a helper to fetch or create the in-batch ResourceMetrics for the given labels. -// It prefers the per-request map, and seeds attributes from the global cache -// when available, but never returns a detached ResourceMetrics. +// getOrCreateRM returns the per-request ResourceMetrics for the given labels' +// job/instance pair, creating and appending a new one to otelMetrics if needed. +// +// It looks up an existing entry in reqRM using a hash of "job\instance". If +// found, that already-appended instance is returned. Otherwise, a new +// ResourceMetrics is appended to otelMetrics and its resource attributes are +// seeded as follows: +// - If an attribute snapshot exists in the LRU cache, copy only resource +// attributes into the new ResourceMetrics (scopes/metrics are never reused). +// - Otherwise, derive attributes from job/instance and store an attribute-only +// snapshot in the cache for future requests. +// +// This function never returns a cached object; it only copies attributes from +// the cache into a per-request instance to avoid cross-request mutation. +// +// Note: Attribute enrichment from target_info should update the cache snapshot +// separately so subsequent HTTP requests start with enriched attributes. func (prw *prometheusRemoteWriteReceiver) getOrCreateRM(ls labels.Labels, otelMetrics pmetric.Metrics, reqRM map[uint64]pmetric.ResourceMetrics) (pmetric.ResourceMetrics, uint64) { hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) @@ -273,10 +287,10 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr } // The key is composed by: resource_hash:scope_name:scope_version:metric_name:unit:type metricCache = make(map[uint64]pmetric.Metric) - // Request-local map to ensure all timeseries for the same resource append to the - // same in-batch ResourceMetrics, avoiding detached copies and concurrency issues - // with the global cache. - reqRM = make(map[uint64]pmetric.ResourceMetrics) + // modifiedResourceMetric keeps track, for each request, of which resources (identified by the job/instance hash) had their resource attributes modified — for example, through target_info. + // Once the request is fully processed, only the resource attributes contained in the request’s ResourceMetrics are snapshotted back into the LRU cache. + // This ensures that future requests start with the enriched resource attributes already applied. + modifiedResourceMetric = make(map[uint64]pmetric.ResourceMetrics) ) for i := range req.Timeseries { @@ -294,7 +308,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // If the metric name is equal to target_info, we use its labels as attributes of the resource // Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1 if metadata.Name == "target_info" { - rm, _ := prw.getOrCreateRM(ls, otelMetrics, reqRM) + rm, hashed := prw.getOrCreateRM(ls, otelMetrics, modifiedResourceMetric) attrs := rm.Resource().Attributes() // Add the remaining labels as resource attributes @@ -303,6 +317,10 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr attrs.PutStr(labelName, labelValue) } } + + snapshot := pmetric.NewResourceMetrics() + attrs.CopyTo(snapshot.Resource().Attributes()) + prw.rmCache.Add(hashed, snapshot) continue } @@ -323,12 +341,12 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // Handle histograms separately due to their complex mixed-schema processing if ts.Metadata.Type == writev2.Metadata_METRIC_TYPE_HISTOGRAM { - prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats, reqRM) + prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats, modifiedResourceMetric) continue } // Handle regular metrics (gauge, counter, summary) - rm, _ := prw.getOrCreateRM(ls, otelMetrics, reqRM) + rm, _ := prw.getOrCreateRM(ls, otelMetrics, modifiedResourceMetric) resourceID := identity.OfResource(rm.Resource()) metricIdentity := createMetricIdentity( diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index fe345497d1816..ff8615f04a6f5 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -1496,44 +1496,6 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { }, }, }, - { - name: "normal metric first, target_info second", - requests: []*writev2.Request{ - { - Symbols: []string{ - "", - "job", "production/service_a", // 1, 2 - "instance", "host1", // 3, 4 - "__name__", "normal_metric", // 5, 6 - "foo", "bar", // 7, 8 - }, - Timeseries: []writev2.TimeSeries{ - { - Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{5, 6, 1, 2, 3, 4, 7, 8}, - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - }, - }, - }, - { - Symbols: []string{ - "", - "job", "production/service_a", // 1, 2 - "instance", "host1", // 3, 4 - "machine_type", "n1-standard-1", // 5, 6 - "cloud_provider", "gcp", // 7, 8 - "region", "us-central1", // 9, 10 - "__name__", "target_info", // 11, 12 - }, - Timeseries: []writev2.TimeSeries{ - { - Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, - }, - }, - }, - }, - }, } // Using the same expected metrics for both tests, because we are just checking if the order of the requests changes the result. @@ -1593,7 +1555,7 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body)) } - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, mockConsumer.metrics[0])) + assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, mockConsumer.metrics[1])) }) } } @@ -1789,7 +1751,7 @@ func TestLRUCacheResourceMetrics(t *testing.T) { } // As target_info and metric1 have the same job/instance, they generate the same end metric: mockConsumer.metrics[0]. - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[0])) + assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[1])) // As metric2 have different job/instance, it generates a different end metric: mockConsumer.metrics[2]. At this point, the cache is full it should evict the target_info metric to store the metric2. assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics2, mockConsumer.metrics[2])) // As just have 1 slot in the cache, but the cache for metric1 was evicted, this metric1_1 should generate a new resource metric, even having the same job/instance than the metric1. From ea220494872d6c2a051d6e538f266cf64b3e58e7 Mon Sep 17 00:00:00 2001 From: perebaj Date: Thu, 23 Oct 2025 11:18:59 -0300 Subject: [PATCH 4/5] improve docs and remove unused code Signed-off-by: perebaj --- .chloggen/concurrency-bug.yaml | 2 +- .../prometheusremotewritereceiver/receiver.go | 35 +++++++++---------- .../receiver_test.go | 34 +----------------- 3 files changed, 19 insertions(+), 52 deletions(-) diff --git a/.chloggen/concurrency-bug.yaml b/.chloggen/concurrency-bug.yaml index 68c2a8ed1fa32..9c8e3fc83c6c9 100644 --- a/.chloggen/concurrency-bug.yaml +++ b/.chloggen/concurrency-bug.yaml @@ -4,7 +4,7 @@ change_type: bug_fix # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: prometheusremotewritereceiver +component: receiver/prometheusremotewrite # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). note: Fixed a concurrency bug in the Prometheus remote write receiver where concurrent requests with identical job/instance labels would return empty responses after the first successful request. diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 50a51485c3b11..1f93b672fc22a 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -232,23 +232,19 @@ func (*prometheusRemoteWriteReceiver) parseProto(contentType string) (promconfig return promconfig.RemoteWriteProtoMsgV1, nil } -// getOrCreateRM returns the per-request ResourceMetrics for the given labels' -// job/instance pair, creating and appending a new one to otelMetrics if needed. +// getOrCreateRM returns or creates the ResourceMetrics for a job/instance pair within an HTTP request. // -// It looks up an existing entry in reqRM using a hash of "job\instance". If -// found, that already-appended instance is returned. Otherwise, a new -// ResourceMetrics is appended to otelMetrics and its resource attributes are -// seeded as follows: -// - If an attribute snapshot exists in the LRU cache, copy only resource -// attributes into the new ResourceMetrics (scopes/metrics are never reused). -// - Otherwise, derive attributes from job/instance and store an attribute-only -// snapshot in the cache for future requests. +// Two-level cache: // -// This function never returns a cached object; it only copies attributes from -// the cache into a per-request instance to avoid cross-request mutation. +// 1. reqRM (per-request): groups samples with the same job/instance into a single ResourceMetrics +// during current request processing, avoiding duplication in the output. // -// Note: Attribute enrichment from target_info should update the cache snapshot -// separately so subsequent HTTP requests start with enriched attributes. +// 2. prw.rmCache (global LRU): stores snapshots of previously seen resource attributes, +// allowing future requests to reuse enriched attributes (e.g., via target_info). +// +// This function always creates new ResourceMetrics per request, only copying attributes +// from the LRU cache when available. Never returns cached objects to avoid shared +// mutation across concurrent requests. func (prw *prometheusRemoteWriteReceiver) getOrCreateRM(ls labels.Labels, otelMetrics pmetric.Metrics, reqRM map[uint64]pmetric.ResourceMetrics) (pmetric.ResourceMetrics, uint64) { hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) @@ -258,10 +254,13 @@ func (prw *prometheusRemoteWriteReceiver) getOrCreateRM(ls labels.Labels, otelMe rm := otelMetrics.ResourceMetrics().AppendEmpty() if existingRM, ok := prw.rmCache.Get(hashedLabels); ok { - // Copy only resource attributes from cached snapshot to avoid sharing scopes/metrics. + // When the ResourceMetrics already exists in the global cache, we can reuse the previous snapshots and perpass the already seen attributes to the current request. + // This is important to not lose information and keep everything aggregated correctly. + // existingRM.Resource().Attributes().CopyTo(rm.Resource().Attributes()) } else { - // Seed with job/instance derived attributes and snapshot to cache for future requests. + // When the ResourceMetrics does not exist in the global cache, we need to create a new one and add it to the request map. + // Saving the new ResourceMetrics in the global cache to avoid creating duplicates in the next requests. parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) snapshot := pmetric.NewResourceMetrics() rm.Resource().Attributes().CopyTo(snapshot.Resource().Attributes()) @@ -433,7 +432,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( scopeName, scopeVersion, metricName, unit, description string, metricCache map[uint64]pmetric.Metric, stats *promremote.WriteResponseStats, - reqRM map[uint64]pmetric.ResourceMetrics, + modifiedRM map[uint64]pmetric.ResourceMetrics, ) { // Drop classic histogram series (those with samples) if len(ts.Samples) != 0 { @@ -470,7 +469,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( } // Create resource if needed (only for the first valid histogram) if hashedLabels == 0 { - rm, _ = prw.getOrCreateRM(ls, otelMetrics, reqRM) + rm, _ = prw.getOrCreateRM(ls, otelMetrics, modifiedRM) resourceID = identity.OfResource(rm.Resource()) } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index ff8615f04a6f5..6a375b09431bc 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -1790,7 +1790,6 @@ func TestConcurrentRequestsforSameResourceAttributes(t *testing.T) { prwReceiver := setupMetricsReceiver(t) prwReceiver.nextConsumer = mockConsumer - // Create a test HTTP server ts := httptest.NewServer(http.HandlerFunc(prwReceiver.handlePRW)) defer ts.Close() @@ -1813,15 +1812,13 @@ func TestConcurrentRequestsforSameResourceAttributes(t *testing.T) { } } - // Prepare requests requests := []*writev2.Request{} for i := 0; i < 5; i++ { requests = append(requests, createRequest("metric_"+strconv.Itoa(i+1), float64(i+1)*10, int64(i+1)*1000)) } - // Send requests concurrently var wg sync.WaitGroup - var httpResults []int // Store HTTP status codes + var httpResults []int var mu sync.Mutex for i, req := range requests { @@ -1851,9 +1848,7 @@ func TestConcurrentRequestsforSameResourceAttributes(t *testing.T) { // Give some time for async processing time.Sleep(100 * time.Millisecond) - // Analyze results mockConsumer.mu.Lock() - receivedMetrics := len(mockConsumer.metrics) totalDataPoints := mockConsumer.dataPoints mockConsumer.mu.Unlock() @@ -1870,33 +1865,6 @@ func TestConcurrentRequestsforSameResourceAttributes(t *testing.T) { assert.Equal(t, 5, totalDataPoints) assert.Equal(t, 1, prwReceiver.rmCache.Len()) - // Additional debugging info - t.Logf("Metrics batches received: %d", receivedMetrics) - for i, metrics := range mockConsumer.metrics { - t.Logf("Batch %d: %d resource metrics, %d data points", - i+1, metrics.ResourceMetrics().Len(), metrics.DataPointCount()) - } - - // Additional analysis: Check if we're getting mixed data due to concurrent mutations - - t.Logf("=== DETAILED ANALYSIS ===") - for i, metrics := range mockConsumer.metrics { - resourceMetrics := metrics.ResourceMetrics() - t.Logf("Batch %d:", i+1) - for j := 0; j < resourceMetrics.Len(); j++ { - rm := resourceMetrics.At(j) - scopeMetrics := rm.ScopeMetrics() - for k := 0; k < scopeMetrics.Len(); k++ { - scope := scopeMetrics.At(k) - metricsList := scope.Metrics() - for l := 0; l < metricsList.Len(); l++ { - metric := metricsList.At(l) - t.Logf(" - Metric: %s, DataPoints: %d", metric.Name(), metric.Gauge().DataPoints().Len()) - } - } - } - } - // Verify thread safety: Check that metrics are properly consolidated without corruption for i, metrics := range mockConsumer.metrics { if metrics.DataPointCount() > 0 { From 72d3386ae9e5cfdc00d000d101339a31590341b2 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Thu, 23 Oct 2025 13:06:19 -0300 Subject: [PATCH 5/5] Update receiver/prometheusremotewritereceiver/receiver.go Co-authored-by: Vihas Makwana <121151420+VihasMakwana@users.noreply.github.com> --- receiver/prometheusremotewritereceiver/receiver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 1f93b672fc22a..c212b09b152d7 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -256,7 +256,6 @@ func (prw *prometheusRemoteWriteReceiver) getOrCreateRM(ls labels.Labels, otelMe if existingRM, ok := prw.rmCache.Get(hashedLabels); ok { // When the ResourceMetrics already exists in the global cache, we can reuse the previous snapshots and perpass the already seen attributes to the current request. // This is important to not lose information and keep everything aggregated correctly. - // existingRM.Resource().Attributes().CopyTo(rm.Resource().Attributes()) } else { // When the ResourceMetrics does not exist in the global cache, we need to create a new one and add it to the request map.