From a94973d9f7a2a7287f842bbcbc9007e1561ffe73 Mon Sep 17 00:00:00 2001 From: Vihas Date: Wed, 15 Oct 2025 16:00:33 +0530 Subject: [PATCH 1/6] initial commit --- receiver/prometheusremotewritereceiver/go.mod | 16 ++- receiver/prometheusremotewritereceiver/go.sum | 32 ++++- .../integration_test.go | 131 ++++++++++++++++++ .../prometheusremotewritereceiver/receiver.go | 32 +++-- .../receiver_test.go | 97 ++++++++----- 5 files changed, 258 insertions(+), 50 deletions(-) create mode 100644 receiver/prometheusremotewritereceiver/integration_test.go diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index 2a7616cd89a6b..7b862df6accbe 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -19,6 +19,8 @@ require ( go.opentelemetry.io/collector/confmap v1.43.0 go.opentelemetry.io/collector/consumer v1.43.0 go.opentelemetry.io/collector/consumer/consumertest v0.137.0 + go.opentelemetry.io/collector/exporter/debugexporter v0.137.0 + go.opentelemetry.io/collector/exporter/exportertest v0.137.0 go.opentelemetry.io/collector/pdata v1.43.0 go.opentelemetry.io/collector/receiver v1.43.0 go.opentelemetry.io/collector/receiver/receiverhelper v0.137.0 @@ -50,6 +52,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 // indirect github.com/aws/smithy-go v1.22.5 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dennwc/varint v1.0.0 // indirect github.com/docker/docker v28.3.3+incompatible // indirect @@ -101,16 +104,27 @@ require ( go.opentelemetry.io/collector/config/configmiddleware v1.43.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.43.0 // indirect go.opentelemetry.io/collector/config/configoptional v1.43.0 // indirect + go.opentelemetry.io/collector/config/configretry v1.43.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.137.0 // indirect go.opentelemetry.io/collector/config/configtls v1.43.0 // indirect go.opentelemetry.io/collector/confmap/xconfmap v0.137.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.137.0 // indirect + go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.137.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.137.0 // indirect + go.opentelemetry.io/collector/exporter v1.43.0 // indirect + go.opentelemetry.io/collector/exporter/exporterhelper v0.137.0 // indirect + go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.137.0 // indirect + go.opentelemetry.io/collector/exporter/xexporter v0.137.0 // indirect + go.opentelemetry.io/collector/extension v1.43.0 // indirect go.opentelemetry.io/collector/extension/extensionauth v1.43.0 // indirect go.opentelemetry.io/collector/extension/extensionmiddleware v0.137.0 // indirect + go.opentelemetry.io/collector/extension/xextension v0.137.0 // indirect go.opentelemetry.io/collector/featuregate v1.43.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.137.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.137.0 // indirect + go.opentelemetry.io/collector/pdata/xpdata v0.137.0 // indirect go.opentelemetry.io/collector/pipeline v1.43.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.137.0 // indirect go.opentelemetry.io/collector/processor v1.43.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.137.0 // indirect go.opentelemetry.io/collector/semconv v0.128.1-0.20250610090210-188191247685 // indirect @@ -129,7 +143,7 @@ require ( golang.org/x/crypto v0.41.0 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect - golang.org/x/sys v0.35.0 // indirect + golang.org/x/sys v0.36.0 // indirect golang.org/x/text v0.28.0 // indirect golang.org/x/time v0.12.0 // indirect google.golang.org/api v0.239.0 // indirect diff --git a/receiver/prometheusremotewritereceiver/go.sum b/receiver/prometheusremotewritereceiver/go.sum index 856bc2bcf852e..a6ebe675fa145 100644 --- a/receiver/prometheusremotewritereceiver/go.sum +++ b/receiver/prometheusremotewritereceiver/go.sum @@ -62,6 +62,8 @@ github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls= @@ -341,6 +343,10 @@ go.opentelemetry.io/collector/config/configopaque v1.43.0 h1:Hnal1eqOfWf+fRojiCE go.opentelemetry.io/collector/config/configopaque v1.43.0/go.mod h1:9uzLyGsWX0FtPWkomQXqLtblmSHgJFaM4T0gMBrCma0= go.opentelemetry.io/collector/config/configoptional v1.43.0 h1:u/MCeLUawXINEi05VdRuBRQ3wivEltxTjJqnL1eww4w= go.opentelemetry.io/collector/config/configoptional v1.43.0/go.mod h1:vdhEmJCpL4nQx2fETr3Bvg9Uy14IwThxL5/g8Mvo/A8= +go.opentelemetry.io/collector/config/configretry v1.43.0 h1:Va5pDNL0TOzqjLdJZ4xxQN9EggMSGVmxXBa+M6UEG30= +go.opentelemetry.io/collector/config/configretry v1.43.0/go.mod h1:ZSTYqAJCq4qf+/4DGoIxCElDIl5yHt8XxEbcnpWBbMM= +go.opentelemetry.io/collector/config/configtelemetry v0.137.0 h1:+QwfFnMwb5UatXYhZ+sY5dvBmqZsfnC3093nwgAgw8A= +go.opentelemetry.io/collector/config/configtelemetry v0.137.0/go.mod h1:Xjw2+DpNLjYtx596EHSWBy0dNQRiJ2H+BlWU907lO40= go.opentelemetry.io/collector/config/configtls v1.43.0 h1:DYbI0kOp4u7b0RA9B4b19ftCCkszSpL1kZqQVOn/tjc= go.opentelemetry.io/collector/config/configtls v1.43.0/go.mod h1:i+v6g4DvnYtq74GS1QV/adgVg7NG2HfL42G2QwkjZjg= go.opentelemetry.io/collector/confmap v1.43.0 h1:QVAnbS7A+2Ra61xsuG355vhlW6uOMaKWysrwLQzDUz4= @@ -351,10 +357,24 @@ go.opentelemetry.io/collector/consumer v1.43.0 h1:51pfN5h6PLlaBwGPtyHn6BdK0DgtVG go.opentelemetry.io/collector/consumer v1.43.0/go.mod h1:v3J2g+6IwOPbLsnzL9cQfvgpmmsZt1YS7aXSNDFmJfk= go.opentelemetry.io/collector/consumer/consumererror v0.137.0 h1:4HgYX6vVmaF17RRRtJDpR8EuWmLAv6JdKYG8slDDa+g= go.opentelemetry.io/collector/consumer/consumererror v0.137.0/go.mod h1:muYN3UZ/43YHpDpQRVvCj0Rhpt/YjoPAF/BO63cPSwk= +go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.137.0 h1:3XUc5SlbO+R7uP7C79pG3TVPbHmKf0HWaJPt12SWaGk= +go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.137.0/go.mod h1:Weh+7UFfhqMNslkT00EA+2vXGBSXmJoTCBRLGMx2EYo= go.opentelemetry.io/collector/consumer/consumertest v0.137.0 h1:tkqBk/DmJcrkRvHwNdDwvdiWfqyS6ymGgr9eyn6Vy6A= go.opentelemetry.io/collector/consumer/consumertest v0.137.0/go.mod h1:6bKAlEgrAZ3NSn7ULLFZQMQtlW2xJlvVWkzIaGprucg= go.opentelemetry.io/collector/consumer/xconsumer v0.137.0 h1:p3tkV3O9bL3bZl3RN2wmoxl22f8B8eMomKUqz656OPY= go.opentelemetry.io/collector/consumer/xconsumer v0.137.0/go.mod h1:N+nRnP0ga4Scu8Ew87F+kxVajE/eGjRLbWC9H+elN5Q= +go.opentelemetry.io/collector/exporter v1.43.0 h1:FYQ/bhOOiLcmIFvDAUvqfzHmZSvKkTrIFyYprPw3xug= +go.opentelemetry.io/collector/exporter v1.43.0/go.mod h1:lUB2OSGrRyD5PSXU0rF9gWcUYCGublBdnCV5hKlG+z8= +go.opentelemetry.io/collector/exporter/debugexporter v0.137.0 h1:Eq7Xa1mQPktrEitnfjtpkScUtOav3HVX1pqP6WOC+j0= +go.opentelemetry.io/collector/exporter/debugexporter v0.137.0/go.mod h1:mtyfQZzaUjIYTBfawVp4blnyoDwp+7o6Ztv4P21bnTk= +go.opentelemetry.io/collector/exporter/exporterhelper v0.137.0 h1:ffiZjBJvzgPYJpOltwIpvTCF8zg1VPxsoP6aW4VTDuQ= +go.opentelemetry.io/collector/exporter/exporterhelper v0.137.0/go.mod h1:osf2K/HkbdUU7EFigLhxMmz2r5MX/74vYC2RrBDURrc= +go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.137.0 h1:jnURp5i+sb1XgDN6iU6s8LbGB8h/njwo/F889/Al2nE= +go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.137.0/go.mod h1:waCyRPNVJxuDkfM1hNot9vRKExRbyQvmya3n5ihLHiE= +go.opentelemetry.io/collector/exporter/exportertest v0.137.0 h1:JesnY7M87UWE/gRsVUgskX95QCL/S4j1ARQTVHH4ggg= +go.opentelemetry.io/collector/exporter/exportertest v0.137.0/go.mod h1:6UxHqO5IyMKL3ehlE3UNpFupIyGc5BBj7xzmPoDImOI= +go.opentelemetry.io/collector/exporter/xexporter v0.137.0 h1:2fSmBDB+tuFoYKJSHbR/1nJIeO+LvvrjdOYEODKuhdo= +go.opentelemetry.io/collector/exporter/xexporter v0.137.0/go.mod h1:9gudRad3ijkbzcnTLE0y+CzUDtC4TaPyZQDUKB2yzVs= go.opentelemetry.io/collector/extension v1.43.0 h1:39cGAGMJIZEhhm4KbsvJJrG8AheS6wOc++ydY0Wpdp0= go.opentelemetry.io/collector/extension v1.43.0/go.mod h1:HVCPnRqx70Qn9BAmnqJt393er4l1OwcgAytLv1fSOSo= go.opentelemetry.io/collector/extension/extensionauth v1.43.0 h1:S2le/+BCkmus1olVJ1REsTbL6f3RqdGQAb1I1tO12mE= @@ -365,6 +385,10 @@ go.opentelemetry.io/collector/extension/extensionmiddleware v0.137.0 h1:7lliwvu8 go.opentelemetry.io/collector/extension/extensionmiddleware v0.137.0/go.mod h1:Vxtt+KlwwO4mpPEFyUMb/92BlMqOZc4Jk8RNjM99vcU= go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest v0.137.0 h1:P4eN3wDjxYnatSInSbtehXbmZK9Qsuac5WtyRJD0u3s= go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest v0.137.0/go.mod h1:mnz+JamjjJWsu4jHued+LIX8T03eE2MbREcH0EBg2Fk= +go.opentelemetry.io/collector/extension/extensiontest v0.137.0 h1:gnPF3HIOKqNk93XObt2x0WFvVfPtm76VggWe7LxgcaY= +go.opentelemetry.io/collector/extension/extensiontest v0.137.0/go.mod h1:vVmKojdITYka9+iAi3aarxeMrO6kdlywKuf3d3c6lcI= +go.opentelemetry.io/collector/extension/xextension v0.137.0 h1:UQ/I7D5/YmkvAV7g8yhWHY7BV31HvjGBCYduQJPyt+M= +go.opentelemetry.io/collector/extension/xextension v0.137.0/go.mod h1:T2Vr5ijSNW7PavuyZyRYYxCitpUTN+f4tRUdED/rtRw= go.opentelemetry.io/collector/featuregate v1.43.0 h1:Aq8UR5qv1zNlbbkTyqv8kLJtnoQMq/sG1/jS9o1cCJI= go.opentelemetry.io/collector/featuregate v1.43.0/go.mod h1:d0tiRzVYrytB6LkcYgz2ESFTv7OktRPQe0QEQcPt1L4= go.opentelemetry.io/collector/internal/telemetry v0.137.0 h1:KlJcaBnIIn+QJzQIfA1eXbYUvHmgM7h/gLp/vjvUBMw= @@ -375,8 +399,12 @@ go.opentelemetry.io/collector/pdata/pprofile v0.137.0 h1:bLVp8p8hpH81eQhhEQBkvLt go.opentelemetry.io/collector/pdata/pprofile v0.137.0/go.mod h1:QfhMf7NnG+fTuwGGB1mXgcPzcXNxEYSW6CrVouOsF7Q= go.opentelemetry.io/collector/pdata/testdata v0.137.0 h1:+oaGvbt0v7xryTX827szmyYWSAtvA0LbysEFV2nFjs0= go.opentelemetry.io/collector/pdata/testdata v0.137.0/go.mod h1:3512FJaQsZz5EBlrY46xKjzoBc0MoMcQtAqYs2NaRQM= +go.opentelemetry.io/collector/pdata/xpdata v0.137.0 h1:EZvBE26Hxzk+Dv3NU7idjsS+cXbwZrwdWXGgcTxsC8g= +go.opentelemetry.io/collector/pdata/xpdata v0.137.0/go.mod h1:MFbISBnECZ1m1JPc5F6LUhVIkmFkebuVk3NcpmGPtB8= go.opentelemetry.io/collector/pipeline v1.43.0 h1:IJjdqE5UCQlyVvFUUzlhSWhP4WIwpH6UyJQ9iWXpyww= go.opentelemetry.io/collector/pipeline v1.43.0/go.mod h1:xUrAqiebzYbrgxyoXSkk6/Y3oi5Sy3im2iCA51LwUAI= +go.opentelemetry.io/collector/pipeline/xpipeline v0.137.0 h1:2JPeB3PYyiC6WE3hJwNwarLDPKI37iFk1vYXJDu14qo= +go.opentelemetry.io/collector/pipeline/xpipeline v0.137.0/go.mod h1:nQmJ9w3UWOwNmaUR1EalDLyswzHfJcBPMm/NmcytH74= go.opentelemetry.io/collector/processor v1.43.0 h1:JmsceK1UUFtXoe3CALb+/A09RUQBsCbcqA+fSs4O0c0= go.opentelemetry.io/collector/processor v1.43.0/go.mod h1:w40CABuhIGpUoXtkIKik/5L5nfK2RTEjUuwl83n2PEo= go.opentelemetry.io/collector/processor/processortest v0.137.0 h1:ArZ6fFzE7Fyyfy4A7/skOGJMnG6bZDkYzOb0XPWEj9o= @@ -459,8 +487,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/receiver/prometheusremotewritereceiver/integration_test.go b/receiver/prometheusremotewritereceiver/integration_test.go new file mode 100644 index 0000000000000..5d8f27bc35c28 --- /dev/null +++ b/receiver/prometheusremotewritereceiver/integration_test.go @@ -0,0 +1,131 @@ +package prometheusremotewritereceiver + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver/internal/metadata" + promconfig "github.com/prometheus/prometheus/config" + writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/debugexporter" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "go.uber.org/zap/zaptest/observer" +) + +func TestReceiverWithDebugExporter(t *testing.T) { + // Create debug exporter + exporterCfg := debugexporter.NewFactory().CreateDefaultConfig() + core, obslogs := observer.New(zapcore.InfoLevel) + set := exportertest.NewNopSettings(component.MustNewType("debug")) + set.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core())) + exporter, err := debugexporter.NewFactory().CreateMetrics( + context.Background(), + set, + exporterCfg, + ) + if err != nil { + t.Fatalf("failed to create debug exporter: %v", err) + } + + // Setup receiver as in receiver_test.go + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + prwReceiver, err := factory.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, exporter) + if err != nil { + t.Fatalf("failed to create receiver: %v", err) + } + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: component.MustNewID("test"), + Transport: "http", + ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type), + }) + if err != nil { + t.Fatalf("failed to create receiver: %v", err) + } + + receiver := prwReceiver.(*prometheusRemoteWriteReceiver) + receiver.nextConsumer = exporter + receiver.obsrecv = obsrecv + + ts := httptest.NewServer(http.HandlerFunc(receiver.handlePRW)) + defer ts.Close() + { + // Prepare a sample writev2.Request + req := &writev2.Request{ + Symbols: []string{"", "__name__", "test_metric", "job", "test_job", "instance", "test_instance"}, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + CreatedTimestamp: 1, + }, + }, + } + pBuf := proto.NewBuffer(nil) + // we don't need to compress the body to use the snappy compression in the unit test + // because the encoder is just initialized when we initialize the http server. + // so we can just use the uncompressed body. + err = pBuf.Marshal(req) + assert.NoError(t, err) + + resp, err := http.Post( + ts.URL, + fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2), + bytes.NewBuffer(pBuf.Bytes()), + ) + assert.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body)) + + } + + { + // Prepare a sample writev2.Request + req := &writev2.Request{ + Symbols: []string{"", "__name__", "test_metric", "job", "test_job", "instance", "test_instance"}, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + CreatedTimestamp: 1, + }, + }, + } + pBuf := proto.NewBuffer(nil) + // we don't need to compress the body to use the snappy compression in the unit test + // because the encoder is just initialized when we initialize the http server. + // so we can just use the uncompressed body. + err = pBuf.Marshal(req) + assert.NoError(t, err) + + resp, err := http.Post( + ts.URL, + fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2), + bytes.NewBuffer(pBuf.Bytes()), + ) + assert.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body)) + + } + fmt.Println(obslogs.All()[0].Entry.Message) +} diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 175e38d852d4b..46540d5ecf6f6 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -35,7 +35,7 @@ import ( ) func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) { - cache, err := lru.New[uint64, pmetric.ResourceMetrics](1000) + cache, err := lru.New[uint64, pcommon.Map](1000) if err != nil { return nil, fmt.Errorf("failed to create LRU cache: %w", err) } @@ -47,7 +47,7 @@ func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsume server: &http.Server{ ReadTimeout: 60 * time.Second, }, - rmCache: cache, + attrCache: cache, }, nil } @@ -59,8 +59,8 @@ type prometheusRemoteWriteReceiver struct { server *http.Server wg sync.WaitGroup - rmCache *lru.Cache[uint64, pmetric.ResourceMetrics] - obsrecv *receiverhelper.ObsReport + attrCache *lru.Cache[uint64, pcommon.Map] + obsrecv *receiverhelper.ObsReport } // metricIdentity contains all the components that uniquely identify a metric @@ -195,6 +195,9 @@ func (prw *prometheusRemoteWriteReceiver) handlePRW(w http.ResponseWriter, req * w.WriteHeader(http.StatusNoContent) obsrecvCtx := prw.obsrecv.StartMetricsOp(req.Context()) + if m.MetricCount() == 0 { + return + } err = prw.nextConsumer.ConsumeMetrics(req.Context(), m) if err != nil { prw.settings.Logger.Error("Error consuming metrics", zapcore.Field{Key: "error", Type: zapcore.ErrorType, Interface: err}) @@ -246,6 +249,8 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr } // The key is composed by: resource_hash:scope_name:scope_version:metric_name:unit:type metricCache = make(map[uint64]pmetric.Metric) + // resourceMetricCache caches ResourceMetrics by job/instance labels + resourceMetricCache = make(map[uint64]pmetric.ResourceMetrics) ) for i := range req.Timeseries { @@ -266,7 +271,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr var rm pmetric.ResourceMetrics hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - if existingRM, ok := prw.rmCache.Get(hashedLabels); ok { + if existingRM, ok := resourceMetricCache[hashedLabels]; ok { rm = existingRM } else { rm = otelMetrics.ResourceMetrics().AppendEmpty() @@ -281,7 +286,8 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr attrs.PutStr(labelName, labelValue) } } - prw.rmCache.Add(hashedLabels, rm) + prw.attrCache.Add(hashedLabels, attrs) + resourceMetricCache[hashedLabels] = rm continue } @@ -302,20 +308,23 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // Handle histograms separately due to their complex mixed-schema processing if ts.Metadata.Type == writev2.Metadata_METRIC_TYPE_HISTOGRAM { - prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats) + prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats, resourceMetricCache) continue } // Handle regular metrics (gauge, counter, summary) hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - existingRM, ok := prw.rmCache.Get(hashedLabels) + existingRM, ok := resourceMetricCache[hashedLabels] var rm pmetric.ResourceMetrics if ok { rm = existingRM } else { rm = otelMetrics.ResourceMetrics().AppendEmpty() + if attr, ok := prw.attrCache.Get(hashedLabels); ok { + attr.CopyTo(rm.Resource().Attributes()) + } parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - prw.rmCache.Add(hashedLabels, rm) + resourceMetricCache[hashedLabels] = rm } resourceID := identity.OfResource(rm.Resource()) @@ -403,6 +412,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( scopeName, scopeVersion, metricName, unit, description string, metricCache map[uint64]pmetric.Metric, stats *promremote.WriteResponseStats, + resourceMetricCache map[uint64]pmetric.ResourceMetrics, ) { // Drop classic histogram series (those with samples) if len(ts.Samples) != 0 { @@ -438,13 +448,13 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( // Create resource if needed (only for the first valid histogram) if hashedLabels == 0 { hashedLabels = xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - existingRM, ok := prw.rmCache.Get(hashedLabels) + existingRM, ok := resourceMetricCache[hashedLabels] if ok { rm = existingRM } else { rm = otelMetrics.ResourceMetrics().AppendEmpty() parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - prw.rmCache.Add(hashedLabels, rm) + resourceMetricCache[hashedLabels] = rm } resourceID = identity.OfResource(rm.Resource()) } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 942a8b42fa618..2e0c51de40428 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -83,7 +83,7 @@ func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver { // Add cleanup to ensure LRU cache is properly purged t.Cleanup(func() { - writeReceiver.rmCache.Purge() + writeReceiver.attrCache.Purge() }) return writeReceiver @@ -1414,8 +1414,8 @@ func TestTranslateV2(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - // since we are using the rmCache to store values across requests, we need to clear it after each test, otherwise it will affect the next test - prwReceiver.rmCache.Purge() + // since we are using the attrCache to store values across requests, we need to clear it after each test, otherwise it will affect the next test + prwReceiver.attrCache.Purge() metrics, stats, err := prwReceiver.translateV2(ctx, tc.request) if tc.expectError != "" { assert.ErrorContains(t, err, tc.expectError) @@ -1454,8 +1454,9 @@ func (m *mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) err func TestTargetInfoWithMultipleRequests(t *testing.T) { tests := []struct { - name string - requests []*writev2.Request + name string + requests []*writev2.Request + expectedMetrics func() pmetric.Metrics }{ { name: "target_info first, normal metric second", @@ -1494,6 +1495,31 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { }, }, }, + expectedMetrics: func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + attrs := rm.Resource().Attributes() + attrs.PutStr("service.namespace", "production") + attrs.PutStr("service.name", "service_a") + attrs.PutStr("service.instance.id", "host1") + attrs.PutStr("machine_type", "n1-standard-1") + attrs.PutStr("cloud_provider", "gcp") + attrs.PutStr("region", "us-central1") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + m1 := sm.Metrics().AppendEmpty() + m1.SetName("normal_metric") + m1.SetUnit("") + m1.SetDescription("") + dp1 := m1.SetEmptyGauge().DataPoints().AppendEmpty() + dp1.SetDoubleValue(2.0) + dp1.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) + dp1.Attributes().PutStr("foo", "bar") + + return metrics + }, }, { name: "normal metric first, target_info second", @@ -1532,36 +1558,32 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { }, }, }, + expectedMetrics: func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + attrs := rm.Resource().Attributes() + attrs.PutStr("service.namespace", "production") + attrs.PutStr("service.name", "service_a") + attrs.PutStr("service.instance.id", "host1") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + m1 := sm.Metrics().AppendEmpty() + m1.SetName("normal_metric") + m1.SetUnit("") + m1.SetDescription("") + dp1 := m1.SetEmptyGauge().DataPoints().AppendEmpty() + dp1.SetDoubleValue(2.0) + dp1.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) + dp1.Attributes().PutStr("foo", "bar") + + return metrics + }, }, } // Using the same expected metrics for both tests, because we are just checking if the order of the requests changes the result. - expectedMetrics := func() pmetric.Metrics { - metrics := pmetric.NewMetrics() - rm := metrics.ResourceMetrics().AppendEmpty() - attrs := rm.Resource().Attributes() - attrs.PutStr("service.namespace", "production") - attrs.PutStr("service.name", "service_a") - attrs.PutStr("service.instance.id", "host1") - attrs.PutStr("machine_type", "n1-standard-1") - attrs.PutStr("cloud_provider", "gcp") - attrs.PutStr("region", "us-central1") - - sm := rm.ScopeMetrics().AppendEmpty() - sm.Scope().SetName("OpenTelemetry Collector") - sm.Scope().SetVersion("latest") - m1 := sm.Metrics().AppendEmpty() - m1.SetName("normal_metric") - m1.SetUnit("") - m1.SetDescription("") - dp1 := m1.SetEmptyGauge().DataPoints().AppendEmpty() - dp1.SetDoubleValue(2.0) - dp1.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) - dp1.Attributes().PutStr("foo", "bar") - - return metrics - }() - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mockConsumer := new(mockConsumer) @@ -1592,7 +1614,7 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) { assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body)) } - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, mockConsumer.metrics[0])) + assert.NoError(t, pmetrictest.CompareMetrics(tt.expectedMetrics(), mockConsumer.metrics[0])) }) } } @@ -1605,10 +1627,10 @@ func TestLRUCacheResourceMetrics(t *testing.T) { prwReceiver := setupMetricsReceiver(t) // Set a small cache size to emulate the cache eviction - prwReceiver.rmCache.Resize(1) + prwReceiver.attrCache.Resize(1) t.Cleanup(func() { - prwReceiver.rmCache.Purge() + prwReceiver.attrCache.Purge() }) // Metric 1. @@ -1747,6 +1769,9 @@ func TestLRUCacheResourceMetrics(t *testing.T) { attrs.PutStr("service.namespace", "production") attrs.PutStr("service.name", "service_a") attrs.PutStr("service.instance.id", "host1") + attrs.PutStr("machine_type", "n1-standard-1") + attrs.PutStr("cloud_provider", "gcp") + attrs.PutStr("region", "us-central1") sm := rm.ScopeMetrics().AppendEmpty() sm.Scope().SetName("OpenTelemetry Collector") @@ -1790,9 +1815,9 @@ func TestLRUCacheResourceMetrics(t *testing.T) { // As target_info and metric1 have the same job/instance, they generate the same end metric: mockConsumer.metrics[0]. assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[0])) // As metric2 have different job/instance, it generates a different end metric: mockConsumer.metrics[2]. At this point, the cache is full it should evict the target_info metric to store the metric2. - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics2, mockConsumer.metrics[2])) + assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics2, mockConsumer.metrics[1])) // As just have 1 slot in the cache, but the cache for metric1 was evicted, this metric1_1 should generate a new resource metric, even having the same job/instance than the metric1. - assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1_1, mockConsumer.metrics[3])) + assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1_1, mockConsumer.metrics[2])) } func buildMetaDataMapByID(ms pmetric.Metrics) map[string]map[string]any { From 2c3c3825803a4efc775846fecbe6805e5ea33de6 Mon Sep 17 00:00:00 2001 From: Vihas Date: Thu, 16 Oct 2025 17:40:07 +0530 Subject: [PATCH 2/6] test case --- .../integration_test.go | 131 --------- .../receiver_test.go | 270 ++++++++++++++++++ 2 files changed, 270 insertions(+), 131 deletions(-) delete mode 100644 receiver/prometheusremotewritereceiver/integration_test.go diff --git a/receiver/prometheusremotewritereceiver/integration_test.go b/receiver/prometheusremotewritereceiver/integration_test.go deleted file mode 100644 index 5d8f27bc35c28..0000000000000 --- a/receiver/prometheusremotewritereceiver/integration_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package prometheusremotewritereceiver - -import ( - "bytes" - "context" - "fmt" - "io" - "net/http" - "net/http/httptest" - "testing" - - "github.com/gogo/protobuf/proto" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusremotewritereceiver/internal/metadata" - promconfig "github.com/prometheus/prometheus/config" - writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter/debugexporter" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest" - "go.uber.org/zap/zaptest/observer" -) - -func TestReceiverWithDebugExporter(t *testing.T) { - // Create debug exporter - exporterCfg := debugexporter.NewFactory().CreateDefaultConfig() - core, obslogs := observer.New(zapcore.InfoLevel) - set := exportertest.NewNopSettings(component.MustNewType("debug")) - set.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core())) - exporter, err := debugexporter.NewFactory().CreateMetrics( - context.Background(), - set, - exporterCfg, - ) - if err != nil { - t.Fatalf("failed to create debug exporter: %v", err) - } - - // Setup receiver as in receiver_test.go - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - prwReceiver, err := factory.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, exporter) - if err != nil { - t.Fatalf("failed to create receiver: %v", err) - } - obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: component.MustNewID("test"), - Transport: "http", - ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type), - }) - if err != nil { - t.Fatalf("failed to create receiver: %v", err) - } - - receiver := prwReceiver.(*prometheusRemoteWriteReceiver) - receiver.nextConsumer = exporter - receiver.obsrecv = obsrecv - - ts := httptest.NewServer(http.HandlerFunc(receiver.handlePRW)) - defer ts.Close() - { - // Prepare a sample writev2.Request - req := &writev2.Request{ - Symbols: []string{"", "__name__", "test_metric", "job", "test_job", "instance", "test_instance"}, - Timeseries: []writev2.TimeSeries{ - { - Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED}, - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, - Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, - CreatedTimestamp: 1, - }, - }, - } - pBuf := proto.NewBuffer(nil) - // we don't need to compress the body to use the snappy compression in the unit test - // because the encoder is just initialized when we initialize the http server. - // so we can just use the uncompressed body. - err = pBuf.Marshal(req) - assert.NoError(t, err) - - resp, err := http.Post( - ts.URL, - fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2), - bytes.NewBuffer(pBuf.Bytes()), - ) - assert.NoError(t, err) - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - assert.NoError(t, err) - assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body)) - - } - - { - // Prepare a sample writev2.Request - req := &writev2.Request{ - Symbols: []string{"", "__name__", "test_metric", "job", "test_job", "instance", "test_instance"}, - Timeseries: []writev2.TimeSeries{ - { - Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED}, - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - CreatedTimestamp: 1, - }, - }, - } - pBuf := proto.NewBuffer(nil) - // we don't need to compress the body to use the snappy compression in the unit test - // because the encoder is just initialized when we initialize the http server. - // so we can just use the uncompressed body. - err = pBuf.Marshal(req) - assert.NoError(t, err) - - resp, err := http.Post( - ts.URL, - fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2), - bytes.NewBuffer(pBuf.Bytes()), - ) - assert.NoError(t, err) - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - assert.NoError(t, err) - assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body)) - - } - fmt.Println(obslogs.All()[0].Entry.Message) -} diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 2e0c51de40428..4f6a7c0115152 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -22,6 +22,7 @@ import ( writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -1449,9 +1450,17 @@ func (m *mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) err defer m.mu.Unlock() m.metrics = append(m.metrics, md) m.dataPoints += md.DataPointCount() + md.MarkReadOnly() return nil } +func (m *mockConsumer) clearAll() { + m.mu.Lock() + defer m.mu.Unlock() + m.metrics = make([]pmetric.Metrics, 0) + m.dataPoints = 0 +} + func TestTargetInfoWithMultipleRequests(t *testing.T) { tests := []struct { name string @@ -1844,3 +1853,264 @@ func buildMetaDataMapByID(ms pmetric.Metrics) map[string]map[string]any { } return result } + +func TestReceiverWithExpectedMetrics(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + prwReceiver, err := factory.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil) + require.NoError(t, err) + + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: component.MustNewID("test"), + Transport: "http", + ReceiverCreateSettings: receivertest.NewNopSettings(metadata.Type), + }) + require.NoError(t, err) + + consumer := &mockConsumer{} + receiver := prwReceiver.(*prometheusRemoteWriteReceiver) + receiver.nextConsumer = consumer + receiver.obsrecv = obsrecv + + ts := httptest.NewServer(http.HandlerFunc(receiver.handlePRW)) + defer ts.Close() + + testCases := []struct { + name string + requests []*writev2.Request + expectedMetrics []pmetric.Metrics + }{ + { + name: "Single metric, one sample", + requests: []*writev2.Request{ + { + Symbols: []string{"", "__name__", "cpu_usage", "job", "jobA", "instance", "instA"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 0.5, Timestamp: 1000}}, + }, + }, + }, + }, + expectedMetrics: []pmetric.Metrics{ + func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.instance.id", "instA") + rm.Resource().Attributes().PutStr("service.name", "jobA") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + m := sm.Metrics().AppendEmpty() + m.SetName("cpu_usage") + m.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(0.5) + m.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) // convert ms to ns + return metrics + }(), + }, + }, + { + name: "Two metrics, one sample each", + requests: []*writev2.Request{ + { + Symbols: []string{"", "__name__", "cpu_usage", "job", "jobA", "instance", "instA", "memory_usage"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, + }, + { + LabelsRefs: []uint32{1, 7, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 2048.0, Timestamp: 1000}}, + }, + }, + }, + }, + expectedMetrics: []pmetric.Metrics{ + func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.name", "jobA") + rm.Resource().Attributes().PutStr("service.instance.id", "instA") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + + // cpu_usage + m1 := sm.Metrics().AppendEmpty() + m1.SetName("cpu_usage") + m1.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(1.0) + m1.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + // memory_usage + m2 := sm.Metrics().AppendEmpty() + m2.SetName("memory_usage") + m2.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(2048.0) + m2.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + return metrics + }(), + }, + }, + { + name: "Two resources, Two metrics, one sample each", + requests: []*writev2.Request{ + { + Symbols: []string{"", "__name__", "cpu_usage", "job", "jobA", "instance", "instA", "memory_usage", "jobB", "instB"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 1.0, Timestamp: 1000}}, + }, + { + LabelsRefs: []uint32{1, 7, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 2048.0, Timestamp: 1000}}, + }, + { + LabelsRefs: []uint32{1, 2, 3, 8, 5, 9}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 1000}}, + }, + { + LabelsRefs: []uint32{1, 7, 3, 8, 5, 9}, + Samples: []writev2.Sample{{Value: 4096.0, Timestamp: 1000}}, + }, + }, + }, + }, + expectedMetrics: []pmetric.Metrics{ + func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.name", "jobA") + rm.Resource().Attributes().PutStr("service.instance.id", "instA") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + + // cpu_usage + m1 := sm.Metrics().AppendEmpty() + m1.SetName("cpu_usage") + m1.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(1.0) + m1.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + // memory_usage + m2 := sm.Metrics().AppendEmpty() + m2.SetName("memory_usage") + m2.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(2048.0) + m2.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + rm = metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.name", "jobB") + rm.Resource().Attributes().PutStr("service.instance.id", "instB") + + sm = rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + + // cpu_usage + m1 = sm.Metrics().AppendEmpty() + m1.SetName("cpu_usage") + m1.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(2.0) + m1.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + // memory_usage + m2 = sm.Metrics().AppendEmpty() + m2.SetName("memory_usage") + m2.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(4096.0) + m2.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1000 * 1e6)) + + return metrics + }(), + }, + }, + { + name: "Same job-instance, multiple metrics", + requests: []*writev2.Request{ + { + Symbols: []string{"", "__name__", "metric1", "job", "jobA", "instance", "instA", "metric2"}, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_UNSPECIFIED}, + LabelsRefs: []uint32{1, 7, 3, 4, 5, 6}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + }, + }, + }, + expectedMetrics: []pmetric.Metrics{ + func() pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("service.name", "jobA") + rm.Resource().Attributes().PutStr("service.instance.id", "instA") + + sm := rm.ScopeMetrics().AppendEmpty() + sm.Scope().SetName("OpenTelemetry Collector") + sm.Scope().SetVersion("latest") + + m := sm.Metrics().AppendEmpty() + m.SetName("metric1") + m.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(1) + m.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(1 * 1e6)) + + m = sm.Metrics().AppendEmpty() + m.SetName("metric2") + m.SetEmptyGauge().DataPoints().AppendEmpty(). + SetDoubleValue(2) + m.Gauge().DataPoints().At(0).SetTimestamp(pcommon.Timestamp(2 * 1e6)) + return metrics + }(), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + consumer.clearAll() + require.Equal(t, len(tc.requests), len(tc.expectedMetrics)) + + for _, req := range tc.requests { + pBuf := proto.NewBuffer(nil) + require.NoError(t, pBuf.Marshal(req)) + + resp, err := http.Post( + ts.URL, + fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2), + bytes.NewBuffer(pBuf.Bytes()), + ) + require.NoError(t, err) + defer resp.Body.Close() + _, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusNoContent, resp.StatusCode) + } + + require.Equal(t, len(tc.expectedMetrics), len(consumer.metrics)) + + for i := range tc.expectedMetrics { + actual := consumer.metrics[i] + expected := tc.expectedMetrics[i] + + assert.NoError(t, pmetrictest.CompareMetrics(expected, actual)) + } + }) + } +} From 9a1456c265f8051e3d35ef035b71d2f1bb9ab12b Mon Sep 17 00:00:00 2001 From: Vihas Date: Thu, 16 Oct 2025 18:59:56 +0530 Subject: [PATCH 3/6] bug --- .../prometheusremotewritereceiver/receiver.go | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 46540d5ecf6f6..b6f8ce18e14aa 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -253,6 +253,9 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr resourceMetricCache = make(map[uint64]pmetric.ResourceMetrics) ) + // First, extract any target_info metric, if any. + prw.extractTargetInfo(req, &labelsBuilder) + for i := range req.Timeseries { ts := &req.Timeseries[i] ls := ts.ToLabels(&labelsBuilder, req.Symbols) @@ -265,29 +268,8 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr continue } - // If the metric name is equal to target_info, we use its labels as attributes of the resource - // Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1 if metadata.Name == "target_info" { - var rm pmetric.ResourceMetrics - hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - - if existingRM, ok := resourceMetricCache[hashedLabels]; ok { - rm = existingRM - } else { - rm = otelMetrics.ResourceMetrics().AppendEmpty() - } - - attrs := rm.Resource().Attributes() - parseJobAndInstance(attrs, ls.Get("job"), ls.Get("instance")) - - // Add the remaining labels as resource attributes - for labelName, labelValue := range ls.Map() { - if labelName != "job" && labelName != "instance" && !schema.IsMetadataLabel(labelName) { - attrs.PutStr(labelName, labelValue) - } - } - prw.attrCache.Add(hashedLabels, attrs) - resourceMetricCache[hashedLabels] = rm + // We already converted the target_info labels into resource attributes. Continue. continue } @@ -404,6 +386,29 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr return otelMetrics, stats, badRequestErrors } +func (prw *prometheusRemoteWriteReceiver) extractTargetInfo(req *writev2.Request, labelsBuilder *labels.ScratchBuilder) { + for i := range req.Timeseries { + ts := &req.Timeseries[i] + ls := ts.ToLabels(labelsBuilder, req.Symbols) + metadata := schema.NewMetadataFromLabels(ls) + // If the metric name is equal to target_info, we use its labels as attributes of the resource + // Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1 + if metadata.Name == "target_info" { + attrs := pcommon.NewMap() + parseJobAndInstance(attrs, ls.Get("job"), ls.Get("instance")) + // Add the remaining labels as resource attributes + for labelName, labelValue := range ls.Map() { + if labelName != "job" && labelName != "instance" && !schema.IsMetadataLabel(labelName) { + attrs.PutStr(labelName, labelValue) + } + } + + hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) + prw.attrCache.Add(hashedLabels, attrs) + } + } +} + // processHistogramTimeSeries handles all histogram processing, including validation and mixed schemas. func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries( otelMetrics pmetric.Metrics, From 85e8554d0a64843d0b7f9d0a4b12ec928b1f00cc Mon Sep 17 00:00:00 2001 From: Vihas Date: Thu, 23 Oct 2025 11:42:57 +0530 Subject: [PATCH 4/6] CI and lint --- .chloggen/41347.yaml | 27 +++++++++++++++++++ .../receiver_test.go | 4 +-- 2 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 .chloggen/41347.yaml diff --git a/.chloggen/41347.yaml b/.chloggen/41347.yaml new file mode 100644 index 0000000000000..febb818055bc0 --- /dev/null +++ b/.chloggen/41347.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix panic caused by mutating read-only ResourceMetrics + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41347] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 4f6a7c0115152..c8f887c62ba84 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -1858,7 +1858,7 @@ func TestReceiverWithExpectedMetrics(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() - prwReceiver, err := factory.CreateMetrics(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil) + prwReceiver, err := factory.CreateMetrics(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil) require.NoError(t, err) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ @@ -2085,7 +2085,7 @@ func TestReceiverWithExpectedMetrics(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { consumer.clearAll() - require.Equal(t, len(tc.requests), len(tc.expectedMetrics)) + require.Len(t, tc.requests, len(tc.expectedMetrics)) for _, req := range tc.requests { pBuf := proto.NewBuffer(nil) From 5f0fba0574c78bd5723186fd70f2485aad87dc18 Mon Sep 17 00:00:00 2001 From: Vihas Date: Thu, 23 Oct 2025 11:46:31 +0530 Subject: [PATCH 5/6] document --- receiver/prometheusremotewritereceiver/README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/receiver/prometheusremotewritereceiver/README.md b/receiver/prometheusremotewritereceiver/README.md index a8701355e5cc8..8f2b804f02729 100644 --- a/receiver/prometheusremotewritereceiver/README.md +++ b/receiver/prometheusremotewritereceiver/README.md @@ -84,3 +84,13 @@ This approach has some limitations, for example: - If the process dies or restarts, the cache will be lost. - Some inconsistencies can happen according to the order of the requests and the current cache size. - The limit of 1000 resource metrics is hardcoded and not configurable for now. + +Request Handling Scenarios + +- If `target_info` arrives first: + - The associated labels are cached as a `pcommon.Map`. + - Subsequent metric data with matching job/instance labels will use these cached resource attributes. + +- If normal metrics arrive first: + - The receiver cannot predict when (or if) the corresponding `target_info` will arrive. + - The metrics are forwarded immediately without resource attributes. From 1095f02d56a1433ea5e92336cc5e92e989a612b2 Mon Sep 17 00:00:00 2001 From: Vihas Date: Thu, 23 Oct 2025 16:04:11 +0530 Subject: [PATCH 6/6] lint --- receiver/prometheusremotewritereceiver/receiver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index c8f887c62ba84..c79b1966e4b79 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -2103,7 +2103,7 @@ func TestReceiverWithExpectedMetrics(t *testing.T) { require.Equal(t, http.StatusNoContent, resp.StatusCode) } - require.Equal(t, len(tc.expectedMetrics), len(consumer.metrics)) + require.Len(t, consumer.metrics, len(tc.expectedMetrics)) for i := range tc.expectedMetrics { actual := consumer.metrics[i]