Skip to content

Slow down KafkaReceiver when downstream is busy (backpressure support) #42750

@sokoide

Description

@sokoide

Component(s)

receiver/kafka

Is your feature request related to a problem? Please describe.

We use the OpenTelemetry Collector (otelcol) with the Kafka receiver as input and an OTLP exporter to send data into our trace backend.

When a spike of data arrives, the Kafka receiver can pull data very quickly into the pipeline.
However, if the trace backend is slow, the buffered data accumulates in otelcol memory, eventually leading to OOM.

Kafka --> Kafka Receiver --> (processors) --> OTLP Exporter --> Trace Backend

The memorylimiterprocessor can reject new data to avoid an OOM crash, but the Kafka receiver continues pulling from Kafka and pushing into the pipeline (retrying):

Kafka --> Kafka Receiver --> memorylimiterprocessor --> OTLP Exporter --> Trace Backend
                                             ^ rejection (error)

Related Issues:
open-telemetry/opentelemetry-collector#5456
#29410

Describe the solution you'd like

If the Kafka receiver receives a rejection (error) from any downstream component (processor or exporter), it should slow down fetching (consuming) from Kafka.

This introduces a natural form of backpressure: when the pipeline or backend is overloaded, the receiver reduces the pull rate.

Ideally, a specific error type (e.g. ErrBackendBusy) would be defined so that receivers can distinguish between "overloaded, slow down" and "fatal error, drop data".

Pseudo-code

err := nextConsumer.ConsumeTraces(ctx, traces)
if err != nil {
    if isMemoryLimitError(err) {
        r.logger.Warn("downstream busy, slowing down polling", zap.Error(err))
        time.Sleep(r.cfg.BackoffOnBusy) // default e.g. 100ms
        continue
    }
    r.logger.Error("consume failed", zap.Error(err))
}

func isMemoryLimitError(err error) bool {
    // TODO: ideally, use errors.Is(err, memorylimiter.ErrMemoryLimitExceeded)
    return strings.Contains(err.Error(), "memory limiter")
}

Describe alternatives you've considered

  • Use the memorylimiterprocessor alone:
    This prevents OOM but does not propagate backpressure to Kafka, because the Kafka receiver keeps polling and resending rejected data.

  • Increase batching or add queue processors:
    These can help absorb spikes but do not prevent OOM if the backend remains slow.

Additional context

Benefits:

  • Improves stability under load spikes.
  • Provides a natural feedback loop between downstream saturation and upstream ingestion.
  • Lays the foundation for a standardized "busy error" type in otelcol that other receivers could also respect.

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions