Skip to content

otel: data race in publishing pipeline with Beats receivers #45643

@mauri870

Description

@mauri870

I found this while running the test from #45637 with the race detector.

This particular test creates multiple receivers via CreateLogs.

$ go test -race -run ^TestConsumeContract$ ./x-pack/filebeat/fbreceiver -v -count=1
race.txt
=== RUN   TestConsumeContract
=== RUN   TestConsumeContract/always_succeed
Sent 100, accepted=100, expected dropped=0, non-permanent errors retried=0
=== RUN   TestConsumeContract/random_non_permanent_error
==================
WARNING: DATA RACE
Write at 0x00c001ed83e0 by goroutine 565:
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*ttlBatch).ACK()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/ttl_batch.go:97 +0x2c
  github.com/elastic/beats/v7/x-pack/libbeat/outputs/otelconsumer.(*otelConsumer).logsPublish()
      /home/mauri870/git/elastic/beats/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go:185 +0x1601
  github.com/elastic/beats/v7/x-pack/libbeat/outputs/otelconsumer.(*otelConsumer).Publish()
      /home/mauri870/git/elastic/beats/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go:75 +0x69
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*clientWorker).run()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/client_worker.go:101 +0x15e
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.makeClientWorker.gowrap1()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/client_worker.go:75 +0x61

Previous read at 0x00c001ed83e0 by goroutine 556:
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*ttlBatch).Events()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/ttl_batch.go:92 +0x508
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*eventConsumer).run()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/consumer.go:166 +0x4c7
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.newEventConsumer.func1()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/consumer.go:91 +0x84

Goroutine 565 (running) created at:
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.makeClientWorker()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/client_worker.go:75 +0x347
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*outputController).Set()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/controller.go:140 +0x49e
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.New()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/pipeline.go:154 +0x690
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.LoadWithSettings()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/module.go:100 +0x390
  github.com/elastic/beats/v7/x-pack/libbeat/cmd/instance.NewBeatForReceiver()
      /home/mauri870/git/elastic/beats/x-pack/libbeat/cmd/instance/beat.go:264 +0x36af
  github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver.createReceiver()
      /home/mauri870/git/elastic/beats/x-pack/filebeat/fbreceiver/factory.go:48 +0x885
  go.opentelemetry.io/collector/receiver.(*factory).CreateLogs()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver@v1.36.0/receiver.go:177 +0x348
  go.opentelemetry.io/collector/receiver/receivertest.checkConsumeContractScenario()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver/receivertest@v0.130.0/contract_checker.go:118 +0x5fa
  go.opentelemetry.io/collector/receiver/receivertest.CheckConsumeContract.func2()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver/receivertest@v0.130.0/contract_checker.go:103 +0x9c
  testing.tRunner()
      /usr/lib/go/src/testing/testing.go:1792 +0x225
  testing.(*T).Run.gowrap1()
      /usr/lib/go/src/testing/testing.go:1851 +0x44

Goroutine 556 (running) created at:
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.newEventConsumer()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/consumer.go:89 +0x304
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.newOutputController()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/controller.go:97 +0x9c
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.New()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/pipeline.go:149 +0x584
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.LoadWithSettings()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/module.go:100 +0x390
  github.com/elastic/beats/v7/x-pack/libbeat/cmd/instance.NewBeatForReceiver()
      /home/mauri870/git/elastic/beats/x-pack/libbeat/cmd/instance/beat.go:264 +0x36af
  github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver.createReceiver()
      /home/mauri870/git/elastic/beats/x-pack/filebeat/fbreceiver/factory.go:48 +0x885
  go.opentelemetry.io/collector/receiver.(*factory).CreateLogs()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver@v1.36.0/receiver.go:177 +0x348
  go.opentelemetry.io/collector/receiver/receivertest.checkConsumeContractScenario()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver/receivertest@v0.130.0/contract_checker.go:118 +0x5fa
  go.opentelemetry.io/collector/receiver/receivertest.CheckConsumeContract.func2()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver/receivertest@v0.130.0/contract_checker.go:103 +0x9c
  testing.tRunner()
      /usr/lib/go/src/testing/testing.go:1792 +0x225
  testing.(*T).Run.gowrap1()
      /usr/lib/go/src/testing/testing.go:1851 +0x44
==================
Sent 100, accepted=100, expected dropped=0, non-permanent errors retried=116
    testing.go:1490: race detected during execution of test
=== RUN   TestConsumeContract/random_permanent_error
Sent 100, accepted=49, expected dropped=51, non-permanent errors retried=0
=== RUN   TestConsumeContract/random_error
==================
WARNING: DATA RACE
Write at 0x00c0032100e0 by goroutine 1458:
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*ttlBatch).Drop()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/ttl_batch.go:103 +0x2c
  github.com/elastic/beats/v7/x-pack/libbeat/outputs/otelconsumer.(*otelConsumer).logsPublish()
      /home/mauri870/git/elastic/beats/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go:175 +0x14c3
  github.com/elastic/beats/v7/x-pack/libbeat/outputs/otelconsumer.(*otelConsumer).Publish()
      /home/mauri870/git/elastic/beats/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go:75 +0x69
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*clientWorker).run()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/client_worker.go:101 +0x15e
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.makeClientWorker.gowrap1()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/client_worker.go:75 +0x61

Previous read at 0x00c0032100e0 by goroutine 1444:
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*ttlBatch).Events()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/ttl_batch.go:92 +0x508
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*eventConsumer).run()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/consumer.go:166 +0x4c7
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.newEventConsumer.func1()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/consumer.go:91 +0x84

Goroutine 1458 (running) created at:
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.makeClientWorker()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/client_worker.go:75 +0x347
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*outputController).Set()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/controller.go:140 +0x49e
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.New()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/pipeline.go:154 +0x690
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.LoadWithSettings()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/module.go:100 +0x390
  github.com/elastic/beats/v7/x-pack/libbeat/cmd/instance.NewBeatForReceiver()
      /home/mauri870/git/elastic/beats/x-pack/libbeat/cmd/instance/beat.go:264 +0x36af
  github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver.createReceiver()
      /home/mauri870/git/elastic/beats/x-pack/filebeat/fbreceiver/factory.go:48 +0x885
  go.opentelemetry.io/collector/receiver.(*factory).CreateLogs()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver@v1.36.0/receiver.go:177 +0x348
  go.opentelemetry.io/collector/receiver/receivertest.checkConsumeContractScenario()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver/receivertest@v0.130.0/contract_checker.go:118 +0x5fa
  go.opentelemetry.io/collector/receiver/receivertest.CheckConsumeContract.func2()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver/receivertest@v0.130.0/contract_checker.go:103 +0x9c
  testing.tRunner()
      /usr/lib/go/src/testing/testing.go:1792 +0x225
  testing.(*T).Run.gowrap1()
      /usr/lib/go/src/testing/testing.go:1851 +0x44

Goroutine 1444 (running) created at:
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.newEventConsumer()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/consumer.go:89 +0x304
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.newOutputController()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/controller.go:97 +0x9c
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.New()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/pipeline.go:149 +0x584
  github.com/elastic/beats/v7/libbeat/publisher/pipeline.LoadWithSettings()
      /home/mauri870/git/elastic/beats/libbeat/publisher/pipeline/module.go:100 +0x390
  github.com/elastic/beats/v7/x-pack/libbeat/cmd/instance.NewBeatForReceiver()
      /home/mauri870/git/elastic/beats/x-pack/libbeat/cmd/instance/beat.go:264 +0x36af
  github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver.createReceiver()
      /home/mauri870/git/elastic/beats/x-pack/filebeat/fbreceiver/factory.go:48 +0x885
  go.opentelemetry.io/collector/receiver.(*factory).CreateLogs()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver@v1.36.0/receiver.go:177 +0x348
  go.opentelemetry.io/collector/receiver/receivertest.checkConsumeContractScenario()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver/receivertest@v0.130.0/contract_checker.go:118 +0x5fa
  go.opentelemetry.io/collector/receiver/receivertest.CheckConsumeContract.func2()
      /home/mauri870/gopath/pkg/mod/go.opentelemetry.io/collector/receiver/receivertest@v0.130.0/contract_checker.go:103 +0x9c
  testing.tRunner()
      /usr/lib/go/src/testing/testing.go:1792 +0x225
  testing.(*T).Run.gowrap1()
      /usr/lib/go/src/testing/testing.go:1851 +0x44
==================
Sent 100, accepted=49, expected dropped=51, non-permanent errors retried=54
    testing.go:1490: race detected during execution of test
--- FAIL: TestConsumeContract (12.56s)
    --- PASS: TestConsumeContract/always_succeed (3.15s)
    --- FAIL: TestConsumeContract/random_non_permanent_error (3.14s)
    --- PASS: TestConsumeContract/random_permanent_error (3.14s)
    --- FAIL: TestConsumeContract/random_error (3.14s)
FAIL
FAIL    github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver  12.689s
FAIL

Metadata

Metadata

Assignees

Type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions