-
Notifications
You must be signed in to change notification settings - Fork 5k
Description
Originally reported by @mauri870:
I found something odd, and I'm still not sure I'm making the right sense of it.
I'm trying to use receivertest.CheckConsumeContract from OTel to test delivery guarantees for the ConsumeLogs call in otelconsumer. I've been going back and forth trying to get this test to work and initially assumed there was something wrong with how I set it up. The test consistently failed for error scenarios (both persistent and non-persistent errors) because it expects data to be retried in case of failures.While testing some changes, I noticed that if I modify the Publish method in otelconsumer to return nil instead of propagating the error, all the tests suddenly pass. My conclusion is that, aside from the batch.Retry call in otelconsumer, retries weren't actually happening at all.
But at the same time, it feels like we should've caught this by now. Am I missing something?
otelconsumer does not have a Connect method, so it is a regular pipeline client which seems to just exit when Publish returns an error:
beats/libbeat/publisher/pipeline/client_worker.go
Lines 88 to 107 in a10fc1e
func (w *clientWorker) run(ctx context.Context) { | |
for { | |
// We wait for either the worker to be closed or for there to be a batch of | |
// events to publish. | |
select { | |
case <-ctx.Done(): | |
return | |
case batch := <-w.qu: | |
if batch == nil { | |
continue | |
} | |
if err := w.client.Publish(ctx, batch); err != nil { | |
return | |
} | |
} | |
} | |
} | |
Elasticsearch, Logstash, and Kafka all have Connect methods so they never hit this.
This is probably a bug in the publisher non-network client that it doesn’t recover from this, but this can also trivially be fixed in otelconsumer but not returning an error.
Failure of the batch is supposed to be handled by calling batch.Retry without returning an error from the output Publish call The Elasticsearch output only returns on connection error for example. It doesn't return an error on batch failure.
beats/libbeat/outputs/elasticsearch/client.go
Lines 248 to 285 in a10fc1e
func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { | |
span, ctx := apm.StartSpan(ctx, "publishEvents", "output") | |
defer span.End() | |
span.Context.SetLabel("events_original", len(batch.Events())) | |
client.observer.NewBatch(len(batch.Events())) | |
// Create and send the bulk request. | |
bulkResult := client.doBulkRequest(ctx, batch) | |
span.Context.SetLabel("events_encoded", len(bulkResult.events)) | |
if bulkResult.connErr != nil { | |
// If there was a connection-level error there is no per-item response, | |
// handle it and return. | |
return client.handleBulkResultError(ctx, batch, bulkResult) | |
} | |
span.Context.SetLabel("events_published", len(bulkResult.events)) | |
// At this point we have an Elasticsearch response for our request, | |
// check and report the per-item results. | |
eventsToRetry, stats := client.bulkCollectPublishFails(bulkResult) | |
stats.reportToObserver(client.observer) | |
if len(eventsToRetry) > 0 { | |
span.Context.SetLabel("events_failed", len(eventsToRetry)) | |
batch.RetryEvents(eventsToRetry) | |
} else { | |
batch.ACK() | |
} | |
return publishResultForStats(stats) | |
} | |
func publishResultForStats(stats bulkResultStats) error { | |
if stats.tooMany > 0 { | |
// We're being throttled by Elasticsearch, return an error so we | |
// retry the connection with exponential backoff | |
return errTooMany | |
} | |
return nil | |
} |