diff --git a/.chloggen/concurrency-bug.yaml b/.chloggen/concurrency-bug.yaml new file mode 100644 index 0000000000000..9c8e3fc83c6c9 --- /dev/null +++ b/.chloggen/concurrency-bug.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fixed a concurrency bug in the Prometheus remote write receiver where concurrent requests with identical job/instance labels would return empty responses after the first successful request. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [42159] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 175e38d852d4b..c212b09b152d7 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -232,13 +232,52 @@ func (*prometheusRemoteWriteReceiver) parseProto(contentType string) (promconfig return promconfig.RemoteWriteProtoMsgV1, nil } +// getOrCreateRM returns or creates the ResourceMetrics for a job/instance pair within an HTTP request. +// +// Two-level cache: +// +// 1. reqRM (per-request): groups samples with the same job/instance into a single ResourceMetrics +// during current request processing, avoiding duplication in the output. +// +// 2. prw.rmCache (global LRU): stores snapshots of previously seen resource attributes, +// allowing future requests to reuse enriched attributes (e.g., via target_info). +// +// This function always creates new ResourceMetrics per request, only copying attributes +// from the LRU cache when available. Never returns cached objects to avoid shared +// mutation across concurrent requests. +func (prw *prometheusRemoteWriteReceiver) getOrCreateRM(ls labels.Labels, otelMetrics pmetric.Metrics, reqRM map[uint64]pmetric.ResourceMetrics) (pmetric.ResourceMetrics, uint64) { + hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) + + if rm, ok := reqRM[hashedLabels]; ok { + return rm, hashedLabels + } + + rm := otelMetrics.ResourceMetrics().AppendEmpty() + if existingRM, ok := prw.rmCache.Get(hashedLabels); ok { + // When the ResourceMetrics already exists in the global cache, we can reuse the previous snapshots and perpass the already seen attributes to the current request. + // This is important to not lose information and keep everything aggregated correctly. + existingRM.Resource().Attributes().CopyTo(rm.Resource().Attributes()) + } else { + // When the ResourceMetrics does not exist in the global cache, we need to create a new one and add it to the request map. + // Saving the new ResourceMetrics in the global cache to avoid creating duplicates in the next requests. + parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) + snapshot := pmetric.NewResourceMetrics() + rm.Resource().Attributes().CopyTo(snapshot.Resource().Attributes()) + prw.rmCache.Add(hashedLabels, snapshot) + } + + reqRM[hashedLabels] = rm + return rm, hashedLabels +} + // translateV2 translates a v2 remote-write request into OTLP metrics. // translate is not feature complete. func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { var ( badRequestErrors error - otelMetrics = pmetric.NewMetrics() - labelsBuilder = labels.NewScratchBuilder(0) + // otelMetrics represents the final metrics, after all the processing, that will be returned by the receiver. + otelMetrics = pmetric.NewMetrics() + labelsBuilder = labels.NewScratchBuilder(0) // More about stats: https://github.com/prometheus/docs/blob/main/docs/specs/prw/remote_write_spec_2_0.md#required-written-response-headers // TODO: add exemplars to the stats. stats = promremote.WriteResponseStats{ @@ -246,6 +285,10 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr } // The key is composed by: resource_hash:scope_name:scope_version:metric_name:unit:type metricCache = make(map[uint64]pmetric.Metric) + // modifiedResourceMetric keeps track, for each request, of which resources (identified by the job/instance hash) had their resource attributes modified — for example, through target_info. + // Once the request is fully processed, only the resource attributes contained in the request’s ResourceMetrics are snapshotted back into the LRU cache. + // This ensures that future requests start with the enriched resource attributes already applied. + modifiedResourceMetric = make(map[uint64]pmetric.ResourceMetrics) ) for i := range req.Timeseries { @@ -263,17 +306,8 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // If the metric name is equal to target_info, we use its labels as attributes of the resource // Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1 if metadata.Name == "target_info" { - var rm pmetric.ResourceMetrics - hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - - if existingRM, ok := prw.rmCache.Get(hashedLabels); ok { - rm = existingRM - } else { - rm = otelMetrics.ResourceMetrics().AppendEmpty() - } - + rm, hashed := prw.getOrCreateRM(ls, otelMetrics, modifiedResourceMetric) attrs := rm.Resource().Attributes() - parseJobAndInstance(attrs, ls.Get("job"), ls.Get("instance")) // Add the remaining labels as resource attributes for labelName, labelValue := range ls.Map() { @@ -281,7 +315,10 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr attrs.PutStr(labelName, labelValue) } } - prw.rmCache.Add(hashedLabels, rm) + + snapshot := pmetric.NewResourceMetrics() + attrs.CopyTo(snapshot.Resource().Attributes()) + prw.rmCache.Add(hashed, snapshot) continue } @@ -302,21 +339,12 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // Handle histograms separately due to their complex mixed-schema processing if ts.Metadata.Type == writev2.Metadata_METRIC_TYPE_HISTOGRAM { - prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats) + prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats, modifiedResourceMetric) continue } // Handle regular metrics (gauge, counter, summary) - hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - existingRM, ok := prw.rmCache.Get(hashedLabels) - var rm pmetric.ResourceMetrics - if ok { - rm = existingRM - } else { - rm = otelMetrics.ResourceMetrics().AppendEmpty() - parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - prw.rmCache.Add(hashedLabels, rm) - } + rm, _ := prw.getOrCreateRM(ls, otelMetrics, modifiedResourceMetric) resourceID := identity.OfResource(rm.Resource()) metricIdentity := createMetricIdentity( @@ -403,6 +431,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( scopeName, scopeVersion, metricName, unit, description string, metricCache map[uint64]pmetric.Metric, stats *promremote.WriteResponseStats, + modifiedRM map[uint64]pmetric.ResourceMetrics, ) { // Drop classic histogram series (those with samples) if len(ts.Samples) != 0 { @@ -411,10 +440,12 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( return } - var rm pmetric.ResourceMetrics - var hashedLabels uint64 - var resourceID identity.Resource - var scope pmetric.ScopeMetrics + var ( + hashedLabels uint64 + resourceID identity.Resource + scope pmetric.ScopeMetrics + rm pmetric.ResourceMetrics + ) for i := range ts.Histograms { histogram := &ts.Histograms[i] @@ -437,15 +468,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( } // Create resource if needed (only for the first valid histogram) if hashedLabels == 0 { - hashedLabels = xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - existingRM, ok := prw.rmCache.Get(hashedLabels) - if ok { - rm = existingRM - } else { - rm = otelMetrics.ResourceMetrics().AppendEmpty() - parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - prw.rmCache.Add(hashedLabels, rm) - } + rm, _ = prw.getOrCreateRM(ls, otelMetrics, modifiedRM) resourceID = identity.OfResource(rm.Resource()) } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 942a8b42fa618..6a375b09431bc 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -11,6 +11,7 @@ import ( "math" "net/http" "net/http/httptest" + "strconv" "sync" "testing" "time" @@ -1495,44 +1496,6 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { }, }, }, - { - name: "normal metric first, target_info second", - requests: []*writev2.Request{ - { - Symbols: []string{ - "", - "job", "production/service_a", // 1, 2 - "instance", "host1", // 3, 4 - "__name__", "normal_metric", // 5, 6 - "foo", "bar", // 7, 8 - }, - Timeseries: []writev2.TimeSeries{ - { - Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{5, 6, 1, 2, 3, 4, 7, 8}, - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - }, - }, - }, - { - Symbols: []string{ - "", - "job", "production/service_a", // 1, 2 - "instance", "host1", // 3, 4 - "machine_type", "n1-standard-1", // 5, 6 - "cloud_provider", "gcp", // 7, 8 - "region", "us-central1", // 9, 10 - "__name__", "target_info", // 11, 12 - }, - Timeseries: []writev2.TimeSeries{ - { - Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, - }, - }, - }, - }, - }, } // Using the same expected metrics for both tests, because we are just checking if the order of the requests changes the result. @@ -1592,7 +1555,7 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body)) } - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, mockConsumer.metrics[0])) + assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, mockConsumer.metrics[1])) }) } } @@ -1788,7 +1751,7 @@ func TestLRUCacheResourceMetrics(t *testing.T) { } // As target_info and metric1 have the same job/instance, they generate the same end metric: mockConsumer.metrics[0]. - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[0])) + assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[1])) // As metric2 have different job/instance, it generates a different end metric: mockConsumer.metrics[2]. At this point, the cache is full it should evict the target_info metric to store the metric2. assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics2, mockConsumer.metrics[2])) // As just have 1 slot in the cache, but the cache for metric1 was evicted, this metric1_1 should generate a new resource metric, even having the same job/instance than the metric1. @@ -1819,3 +1782,104 @@ func buildMetaDataMapByID(ms pmetric.Metrics) map[string]map[string]any { } return result } + +// TestConcurrentRequestsforSameResourceAttributes reproduces the concurrency bug where subsequent requests +// fail to reach the next consumer after the first successful request. +func TestConcurrentRequestsforSameResourceAttributes(t *testing.T) { + mockConsumer := &mockConsumer{} + prwReceiver := setupMetricsReceiver(t) + prwReceiver.nextConsumer = mockConsumer + + ts := httptest.NewServer(http.HandlerFunc(prwReceiver.handlePRW)) + defer ts.Close() + + // Create multiple requests with the same job/instance labels (triggering cache key collision) + createRequest := func(metricName string, value float64, timestamp int64) *writev2.Request { + return &writev2.Request{ + Symbols: []string{ + "", // 0 + "__name__", metricName, // 1, 2 + "job", "test_job", // 3, 4 + "instance", "test_instance", // 5, 6 + }, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: value, Timestamp: timestamp}}, + }, + }, + } + } + + requests := []*writev2.Request{} + for i := 0; i < 5; i++ { + requests = append(requests, createRequest("metric_"+strconv.Itoa(i+1), float64(i+1)*10, int64(i+1)*1000)) + } + + var wg sync.WaitGroup + var httpResults []int + var mu sync.Mutex + + for i, req := range requests { + wg.Add(1) + go func(_ int, request *writev2.Request) { + defer wg.Done() + + pBuf := proto.NewBuffer(nil) + err := pBuf.Marshal(request) + assert.NoError(t, err) + + resp, err := http.Post( + ts.URL, + fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2), + bytes.NewBuffer(pBuf.Bytes()), + ) + assert.NoError(t, err) + defer resp.Body.Close() + + mu.Lock() + httpResults = append(httpResults, resp.StatusCode) + mu.Unlock() + }(i, req) + } + wg.Wait() + + // Give some time for async processing + time.Sleep(100 * time.Millisecond) + + mockConsumer.mu.Lock() + totalDataPoints := mockConsumer.dataPoints + mockConsumer.mu.Unlock() + + // Verify all HTTP requests succeeded + for i, status := range httpResults { + assert.Equal(t, http.StatusNoContent, status, "Request %d should return 204", i+1) + } + + // The expected behavior is: + // - All HTTP requests return 204 (success) + // - All 5 data points are present (no data loss) + // - We have 5 resource attributes, each with 1 data point. The resource attributes are equal since they have the same job/instance labels. + // - The cache should have a single resource attribute. + assert.Equal(t, 5, totalDataPoints) + assert.Equal(t, 1, prwReceiver.rmCache.Len()) + + // Verify thread safety: Check that metrics are properly consolidated without corruption + for i, metrics := range mockConsumer.metrics { + if metrics.DataPointCount() > 0 { + resourceMetrics := metrics.ResourceMetrics() + for j := 0; j < resourceMetrics.Len(); j++ { + rm := resourceMetrics.At(j) + scopeMetrics := rm.ScopeMetrics() + for k := 0; k < scopeMetrics.Len(); k++ { + scope := scopeMetrics.At(k) + metricsCount := scope.Metrics().Len() + if metricsCount != 1 { + t.Errorf("Batch %d: Found %d datapoints when it should be 1", i+1, metricsCount) + } + } + } + } + } +}