-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Description
Component(s)
receiver/kafka
Is your feature request related to a problem? Please describe.
We use the OpenTelemetry Collector (otelcol) with the Kafka receiver as input and an OTLP exporter to send data into our trace backend.
When a spike of data arrives, the Kafka receiver can pull data very quickly into the pipeline.
However, if the trace backend is slow, the buffered data accumulates in otelcol memory, eventually leading to OOM.
Kafka --> Kafka Receiver --> (processors) --> OTLP Exporter --> Trace Backend
The memorylimiterprocessor
can reject new data to avoid an OOM crash, but the Kafka receiver continues pulling from Kafka and pushing into the pipeline (retrying):
Kafka --> Kafka Receiver --> memorylimiterprocessor --> OTLP Exporter --> Trace Backend
^ rejection (error)
Related Issues:
open-telemetry/opentelemetry-collector#5456
#29410
Describe the solution you'd like
If the Kafka receiver receives a rejection (error
) from any downstream component (processor or exporter), it should slow down fetching (consuming) from Kafka.
This introduces a natural form of backpressure: when the pipeline or backend is overloaded, the receiver reduces the pull rate.
Ideally, a specific error type (e.g. ErrBackendBusy
) would be defined so that receivers can distinguish between "overloaded, slow down" and "fatal error, drop data".
Pseudo-code
err := nextConsumer.ConsumeTraces(ctx, traces)
if err != nil {
if isMemoryLimitError(err) {
r.logger.Warn("downstream busy, slowing down polling", zap.Error(err))
time.Sleep(r.cfg.BackoffOnBusy) // default e.g. 100ms
continue
}
r.logger.Error("consume failed", zap.Error(err))
}
func isMemoryLimitError(err error) bool {
// TODO: ideally, use errors.Is(err, memorylimiter.ErrMemoryLimitExceeded)
return strings.Contains(err.Error(), "memory limiter")
}
Describe alternatives you've considered
-
Use the memorylimiterprocessor alone:
This prevents OOM but does not propagate backpressure to Kafka, because the Kafka receiver keeps polling and resending rejected data. -
Increase batching or add queue processors:
These can help absorb spikes but do not prevent OOM if the backend remains slow.
Additional context
Benefits:
- Improves stability under load spikes.
- Provides a natural feedback loop between downstream saturation and upstream ingestion.
- Lays the foundation for a standardized "busy error" type in otelcol that other receivers could also respect.
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1
or me too
, to help us triage it. Learn more here.