Skip to content

[beats receivers] otelconsumer returns an error on batch publish failure which disables retries #45652

@cmacknz

Description

@cmacknz

Originally reported by @mauri870:

I found something odd, and I'm still not sure I'm making the right sense of it.
I'm trying to use receivertest.CheckConsumeContract from OTel to test delivery guarantees for the ConsumeLogs call in otelconsumer. I've been going back and forth trying to get this test to work and initially assumed there was something wrong with how I set it up. The test consistently failed for error scenarios (both persistent and non-persistent errors) because it expects data to be retried in case of failures.

While testing some changes, I noticed that if I modify the Publish method in otelconsumer to return nil instead of propagating the error, all the tests suddenly pass. My conclusion is that, aside from the batch.Retry call in otelconsumer, retries weren't actually happening at all.

But at the same time, it feels like we should've caught this by now. Am I missing something?

otelconsumer does not have a Connect method, so it is a regular pipeline client which seems to just exit when Publish returns an error:

func (w *clientWorker) run(ctx context.Context) {
for {
// We wait for either the worker to be closed or for there to be a batch of
// events to publish.
select {
case <-ctx.Done():
return
case batch := <-w.qu:
if batch == nil {
continue
}
if err := w.client.Publish(ctx, batch); err != nil {
return
}
}
}
}

Elasticsearch, Logstash, and Kafka all have Connect methods so they never hit this.

This is probably a bug in the publisher non-network client that it doesn’t recover from this, but this can also trivially be fixed in otelconsumer but not returning an error.

Failure of the batch is supposed to be handled by calling batch.Retry without returning an error from the output Publish call The Elasticsearch output only returns on connection error for example. It doesn't return an error on batch failure.

func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error {
span, ctx := apm.StartSpan(ctx, "publishEvents", "output")
defer span.End()
span.Context.SetLabel("events_original", len(batch.Events()))
client.observer.NewBatch(len(batch.Events()))
// Create and send the bulk request.
bulkResult := client.doBulkRequest(ctx, batch)
span.Context.SetLabel("events_encoded", len(bulkResult.events))
if bulkResult.connErr != nil {
// If there was a connection-level error there is no per-item response,
// handle it and return.
return client.handleBulkResultError(ctx, batch, bulkResult)
}
span.Context.SetLabel("events_published", len(bulkResult.events))
// At this point we have an Elasticsearch response for our request,
// check and report the per-item results.
eventsToRetry, stats := client.bulkCollectPublishFails(bulkResult)
stats.reportToObserver(client.observer)
if len(eventsToRetry) > 0 {
span.Context.SetLabel("events_failed", len(eventsToRetry))
batch.RetryEvents(eventsToRetry)
} else {
batch.ACK()
}
return publishResultForStats(stats)
}
func publishResultForStats(stats bulkResultStats) error {
if stats.tooMany > 0 {
// We're being throttled by Elasticsearch, return an error so we
// retry the connection with exponential backoff
return errTooMany
}
return nil
}

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions