-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
All released version
Issue Description
The persistent dispatcher for multiple consumers in Apache Pulsar does not consistently honor Netty channel backpressure. When a consumer's channel becomes non-writable, the dispatcher sometimes continues sending one entry at a time and sometimes skips dispatch entirely—with no reliable wake-up mechanism when the channel becomes writable again. This behavior can result in unbounded broker-side buffer accumulation, risking OOM under sustained slow consumers, large batches, or network congestion. In cases where dispatching is skipped entirely, the consumer may become stuck indefinitely until some other event resumes dispatching. This can lead to unnecessary latency when backpressure occurs, particularly when there is no continuous flow of new messages to the topic.
Error messages
Reproducing the issue
These are assumed ways to reproduce the issues.
It should be possible to reproduce the OOME issue on Shared/Key_Shared subscription by having a slow client with a large receiver queue size.
- When the client stops reading from messages, the dispatching will continue each time the dispatcher is waken up.
- reproducing would require having a backlog and sending more messages to the topic to wake up dispatching
The consumer stuck issue should reproduce on Exclusive/Failover subscription after the client's connection gets backpressured. There's no logic to restart dispatching after dispatching stops due to the writability status. This would require that the client isn't able to send the next flow command to wake up dispatching and no new messages are added to the topic to wake up dispatching.
There are existing issue reports which might be related.
Additional information
BrokkAI assisted analysis:
Verified behavior in code (and assumptions where noted)
-
Dispatch when non-writable:
- Observed behavior: dispatch continues with a reduced batch size of 1 even when the channel is not writable; in other paths it skips dispatch entirely.
- Risk: even “1 at a time” can still overflow buffers for large/batched entries or long-lived non-writable channels.
-
No event-driven resumption:
- There’s no linkage from Netty
channelWritabilityChangedto the dispatcher scheduling loop. When dispatch is skipped due to non-writability, there is no guaranteed wake-up when the channel becomes writable again. - Result: starvation or stalled subscriptions until an unrelated event triggers work.
- There’s no linkage from Netty
-
No consistent transport-level gate before write:
- The send path that ultimately calls Netty
writeAndFlushdoesn’t consistently checkChannel#isWritable()or block/suspend dispatch work per-consumer. - Result: data can be pushed into the outbound buffer even when unwritable, relying on Netty’s buffering and watermarks alone.
- The send path that ultimately calls Netty
-
Pause/resume is not tied to transport backpressure:
- Dispatcher pause/resume behavior tends to track storage/cursor state (e.g., acks, reads) rather than transport backpressure.
- Result: the broker keeps reading from BookKeeper and queuing for a consumer that can’t drain the network.
-
Watermarks exist but don’t solve the core issue:
- Netty write-buffer high/low watermarks are configurable at the channel level, but without event-driven throttling in the dispatcher, they mainly bound Netty’s own buffer growth. They don’t stop the dispatcher from continuing to enqueue work on behalf of an unwritable consumer.
Root cause
- Lack of a consistent backpressure bridge from Netty to the dispatcher:
- The dispatcher does not maintain a per-consumer “transport-blocked” state driven by Netty channel writability events.
- No uniform suspension of dispatch work (cursor reads, read-ahead) when a consumer’s channel is non-writable.
- Mixed fallback behavior (skip vs. send one entry) leads to unpredictable buffering and stalls.
Impact
-
Broker memory pressure and potential OOM:
- Slow consumers + large entries/batches can saturate outbound buffers, accumulate pending sends, and starve GC.
-
Throughput instability:
- For fast consumers, buffering helps; for slow ones, it backfires, making end-to-end latency and broker memory unpredictable.
-
Operational ambiguity:
- Without metrics and clear signals, operators may struggle to detect which consumers are backpressuring the broker and why dispatch stalls.
High-level remediation plan
-
Event-driven transport backpressure
- Register for Netty channel writability changes and propagate to the consumer/dispatcher layer.
- Maintain a per-consumer “transport-blocked” flag:
- When unwritable: stop scheduling dispatch to that consumer; prevent read-ahead for it (or cap it tightly).
- When writable: schedule a targeted resume task to drain pending messages for that consumer.
-
Gate dispatch by transport writability
- Before enqueuing network writes for a consumer, check writability.
- If non-writable, do not push; instead, mark the consumer blocked and rely on the event-driven resume above.
-
Tune and expose watermarks
- Keep Netty channel
WRITE_BUFFER_HIGH_WATER_MARKandWRITE_BUFFER_LOW_WATER_MARKconfigurable per broker. - Defaults should be conservative enough to prevent OOM, but allow operators to raise them for high-throughput fast consumers.
- Consider guidance or auto-scaling based on heap size or percent-of-heap (educated suggestion; confirm feasibility).
- Keep Netty channel
-
Hybrid wake-up (fallback polling)
- If an event is missed (rare but possible), periodic polling can recheck writability for consumers marked blocked and re-arm their resume.
-
Backward compatibility and rollout knobs
- Feature flag for event-driven backpressure (default on or off depending on risk appetite).
- Per-namespace or per-subscription toggles could be considered but start with per-broker configuration.
Where to change in code (high level; no implementation here)
-
Netty integration layer:
- Subscribe to
channelWritabilityChangedand surface it to the consumer object associated with that channel.
- Subscribe to
-
Consumer/dispatcher boundary:
- Add/track a per-consumer transport-blocked state.
- Ensure dispatcher
readMoreEntries/dispatchpaths early-exit for blocked consumers.
-
Read-ahead and buffering:
- Bound or pause per-consumer read-ahead while blocked; avoid accumulating entry lists destined for unwritable consumers.
-
Resume logic:
- On writable transition, enqueue a resume task that:
- Drains pending entries with backoff if writability flips again.
- Avoids unbounded loops; rely on scheduling yields and checks.
- On writable transition, enqueue a resume task that:
Related work
Related to OOM prevention in the broker:
- PIP-434: Expose Netty channel configuration WRITE_BUFFER_WATER_MARK to pulsar conf and pause receive requests when channel is unwritable
- PIP-442: Add memory limits for CommandGetTopicsOfNamespace and CommandWatchTopicList on Broker and Proxy
Are you willing to submit a PR?
- I'm willing to submit a PR!