Skip to content

Commit 32f2170

Browse files
committed
[feat][broker] Add configuration options for in-memory topic-level delayed delivery tracker and rename view class for clarity
1 parent e1f2c89 commit 32f2170

File tree

10 files changed

+206
-109
lines changed

10 files changed

+206
-109
lines changed

conf/broker.conf

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,24 @@ delayedDeliveryTickTimeMillis=1000
671671
# delayedDeliveryTickTimeMillis.
672672
isDelayedDeliveryDeliverAtTimeStrict=false
673673

674+
# Minimum interval (in milliseconds) between prune attempts within the in-memory topic-level delayed delivery tracker.
675+
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
676+
# Set to a positive value to override the default adaptive interval based on delayedDeliveryTickTimeMillis.
677+
# Set to 0 or a negative value to use the default adaptive interval.
678+
delayedDeliveryPruneMinIntervalMillis=0
679+
680+
# The ratio [0.0, 1.0] of subscriptions that need to be eligible for delivery in order to trigger an opportunistic
681+
# prune in the in-memory topic-level delayed delivery tracker.
682+
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
683+
# For example, 0.5 means prune when at least half of the subscriptions are eligible. Default is 0.5.
684+
delayedDeliveryPruneEligibleRatio=0.5
685+
686+
# Idle timeout (in milliseconds) for the topic-level in-memory delayed delivery tracker manager.
687+
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
688+
# When the last subscription is unregistered, the manager will be removed from the factory cache after this idle
689+
# timeout, provided no new subscriptions have been registered in the meantime. Set to 0 to remove immediately (default).
690+
delayedDeliveryTopicManagerIdleMillis=0
691+
674692
# The delayed message index bucket min index count.
675693
# When the index count of the current bucket is more than this value and all message indexes of current ledger
676694
# have already been added to the tracker we will seal the bucket.

conf/standalone.conf

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1439,6 +1439,24 @@ delayedDeliveryTickTimeMillis=1000
14391439
# delayedDeliveryTickTimeMillis.
14401440
isDelayedDeliveryDeliverAtTimeStrict=false
14411441

1442+
# Minimum interval (in milliseconds) between prune attempts within the in-memory topic-level delayed delivery tracker.
1443+
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
1444+
# Set to a positive value to override the default adaptive interval based on delayedDeliveryTickTimeMillis.
1445+
# Set to 0 or a negative value to use the default adaptive interval.
1446+
delayedDeliveryPruneMinIntervalMillis=0
1447+
1448+
# The ratio [0.0, 1.0] of subscriptions that need to be eligible for delivery in order to trigger an opportunistic
1449+
# prune in the in-memory topic-level delayed delivery tracker.
1450+
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
1451+
# For example, 0.5 means prune when at least half of the subscriptions are eligible. Default is 0.5.
1452+
delayedDeliveryPruneEligibleRatio=0.5
1453+
1454+
# Idle timeout (in milliseconds) for the topic-level in-memory delayed delivery tracker manager.
1455+
# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory)
1456+
# When the last subscription is unregistered, the manager will be removed from the factory cache after this idle
1457+
# timeout, provided no new subscriptions have been registered in the meantime. Set to 0 to remove immediately (default).
1458+
delayedDeliveryTopicManagerIdleMillis=0
1459+
14421460
# The delayed message index bucket min index count.
14431461
# When the index count of the current bucket is more than this value and all message indexes of current ledger
14441462
# have already been added to the tracker we will seal the bucket.

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,28 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
421421
+ "logic to handle fixed delays in messages in a different way.")
422422
private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;
423423

424+
@FieldContext(category = CATEGORY_SERVER, doc = """
425+
Minimum interval (in milliseconds) between prune attempts within the in-memory topic-level delayed
426+
delivery tracker. Set to a positive value to override the default adaptive interval based on
427+
delayedDeliveryTickTimeMillis. Set to 0 or a negative value to use the default adaptive interval.
428+
""")
429+
private long delayedDeliveryPruneMinIntervalMillis = 0;
430+
431+
@FieldContext(category = CATEGORY_SERVER, doc = """
432+
The ratio [0.0, 1.0] of subscriptions that need to be eligible for delivery in order to trigger an
433+
opportunistic prune in the in-memory topic-level delayed delivery tracker. For example, 0.5 means prune
434+
when at least half of the subscriptions are eligible. Default is 0.5.
435+
""")
436+
private double delayedDeliveryPruneEligibleRatio = 0.5;
437+
438+
@FieldContext(category = CATEGORY_SERVER, doc = """
439+
Idle timeout (in milliseconds) for the topic-level in-memory delayed delivery tracker manager. When the
440+
last subscription is unregistered, the manager will be removed from the factory cache after this idle
441+
timeout, provided no new subscriptions have been registered in the meantime. Set to 0 to remove
442+
immediately (default).
443+
""")
444+
private long delayedDeliveryTopicManagerIdleMillis = 0;
445+
424446
@FieldContext(category = CATEGORY_SERVER, doc = """
425447
The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which \
426448
exceeds this max delay, then it will return an error to the producer. \
Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,18 @@
2424
import org.apache.bookkeeper.mledger.Position;
2525

2626
/**
27-
* View object for a subscription that implements DelayedDeliveryTracker interface.
28-
* This view forwards all operations to the topic-level manager while maintaining
29-
* compatibility with existing dispatcher logic.
27+
* Subscription-scoped tracker implementing {@link DelayedDeliveryTracker} by delegating all
28+
* operations to the topic-level {@link InMemoryTopicDelayedDeliveryTrackerManager}.
3029
*/
3130
@Slf4j
32-
public class InMemoryTopicDelayedDeliveryTrackerView implements DelayedDeliveryTracker {
31+
public class InMemoryTopicDelayedDeliveryTracker implements DelayedDeliveryTracker {
3332

3433
private final InMemoryTopicDelayedDeliveryTrackerManager manager;
3534
private final InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext;
3635
private volatile boolean closed = false;
3736

38-
public InMemoryTopicDelayedDeliveryTrackerView(InMemoryTopicDelayedDeliveryTrackerManager manager,
39-
InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) {
37+
public InMemoryTopicDelayedDeliveryTracker(InMemoryTopicDelayedDeliveryTrackerManager manager,
38+
InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) {
4039
this.manager = manager;
4140
this.subContext = subContext;
4241
}
@@ -117,4 +116,4 @@ private void checkClosed() {
117116
throw new IllegalStateException("DelayedDeliveryTracker is already closed");
118117
}
119118
}
120-
}
119+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactory.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ public class InMemoryTopicDelayedDeliveryTrackerFactory implements DelayedDelive
4444

4545
private long fixedDelayDetectionLookahead;
4646

47+
// New tuning knobs
48+
private long pruneMinIntervalMillis;
49+
private double pruneEligibleRatio;
50+
private long topicManagerIdleMillis;
51+
4752
// Cache of topic-level managers: topic name -> manager instance
4853
private final ConcurrentMap<String, TopicDelayedDeliveryTrackerManager> topicManagers = new ConcurrentHashMap<>();
4954

50-
public InMemoryTopicDelayedDeliveryTrackerFactory() {
51-
52-
}
53-
54-
// Testing-friendly constructor and accessors
5555
@VisibleForTesting
5656
InMemoryTopicDelayedDeliveryTrackerFactory(Timer timer, long tickTimeMillis,
5757
boolean isDelayedDeliveryDeliverAtTimeStrict,
@@ -60,6 +60,9 @@ public InMemoryTopicDelayedDeliveryTrackerFactory() {
6060
this.tickTimeMillis = tickTimeMillis;
6161
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
6262
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
63+
this.pruneMinIntervalMillis = 0;
64+
this.pruneEligibleRatio = 0.5;
65+
this.topicManagerIdleMillis = 0;
6366
}
6467

6568
@VisibleForTesting
@@ -80,6 +83,9 @@ public void initialize(PulsarService pulsarService) {
8083
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
8184
this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
8285
this.fixedDelayDetectionLookahead = config.getDelayedDeliveryFixedDelayDetectionLookahead();
86+
this.pruneMinIntervalMillis = config.getDelayedDeliveryPruneMinIntervalMillis();
87+
this.pruneEligibleRatio = config.getDelayedDeliveryPruneEligibleRatio();
88+
this.topicManagerIdleMillis = config.getDelayedDeliveryTopicManagerIdleMillis();
8389
}
8490

8591
@Override
@@ -106,13 +112,29 @@ DelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers
106112
TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName, k -> {
107113
InMemoryTopicDelayedDeliveryTrackerManager m = new InMemoryTopicDelayedDeliveryTrackerManager(
108114
timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
109-
fixedDelayDetectionLookahead, () -> topicManagers.remove(topicName, holder[0]));
115+
fixedDelayDetectionLookahead, pruneMinIntervalMillis, pruneEligibleRatio, () -> {
116+
if (topicManagerIdleMillis <= 0) {
117+
topicManagers.remove(topicName, holder[0]);
118+
} else {
119+
timer.newTimeout(__ -> {
120+
TopicDelayedDeliveryTrackerManager tm = holder[0];
121+
if (tm instanceof InMemoryTopicDelayedDeliveryTrackerManager) {
122+
if (!((InMemoryTopicDelayedDeliveryTrackerManager) tm).hasActiveSubscriptions()) {
123+
topicManagers.remove(topicName, tm);
124+
}
125+
} else {
126+
// If the manager has been replaced or removed, ensure entry is cleaned up
127+
topicManagers.remove(topicName, tm);
128+
}
129+
}, topicManagerIdleMillis, TimeUnit.MILLISECONDS);
130+
}
131+
});
110132
holder[0] = m;
111133
return m;
112134
});
113135

114-
// Create a per-subscription view from the topic-level manager
115-
return manager.createOrGetView(dispatcher);
136+
// Create a per-subscription tracker from the topic-level manager
137+
return manager.createOrGetTracker(dispatcher);
116138
}
117139

118140
@Override
@@ -132,4 +154,3 @@ public void close() {
132154
}
133155
}
134156
}
135-

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedD
8282
// Minimum interval between prunes
8383
private final long minPruneIntervalNanos;
8484

85+
// Ratio of eligible subscriptions required to opportunistically prune [0.0, 1.0]
86+
private final double pruneEligibleRatio;
87+
8588
// Fixed-delay detection (parity with legacy behavior)
8689
private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0);
8790
private volatile boolean messagesHaveFixedDelay = true;
@@ -131,23 +134,32 @@ long getCutoffTime() {
131134
public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock,
132135
boolean isDelayedDeliveryDeliverAtTimeStrict,
133136
long fixedDelayDetectionLookahead) {
134-
this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, null);
137+
this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead,
138+
0, 0.5, null);
135139
}
136140

137141
public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock,
138142
boolean isDelayedDeliveryDeliverAtTimeStrict,
139143
long fixedDelayDetectionLookahead,
144+
long pruneMinIntervalMillis,
145+
double pruneEligibleRatio,
140146
Runnable onEmptyCallback) {
141147
this.timer = timer;
142148
this.tickTimeMillis = tickTimeMillis;
143149
this.clock = clock;
144150
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
145151
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
146152
this.onEmptyCallback = onEmptyCallback;
147-
// Default prune throttle interval: clamp to [5ms, 50ms] using tickTimeMillis as hint
148-
// TODO: make configurable if needed
149-
long pruneMs = Math.max(5L, Math.min(50L, tickTimeMillis));
153+
// Prune throttle interval: use configured override if positive, otherwise adaptive clamp [5ms, 50ms]
154+
long pruneMs = pruneMinIntervalMillis > 0
155+
? pruneMinIntervalMillis
156+
: Math.max(5L, Math.min(50L, tickTimeMillis));
150157
this.minPruneIntervalNanos = TimeUnit.MILLISECONDS.toNanos(pruneMs);
158+
// Prune eligible ratio: clamp into [0.0, 1.0]
159+
if (Double.isNaN(pruneEligibleRatio)) {
160+
pruneEligibleRatio = 0.5;
161+
}
162+
this.pruneEligibleRatio = Math.max(0.0, Math.min(1.0, pruneEligibleRatio));
151163
}
152164

153165
// We bucket messages by aligning the deliverAt timestamp to the start of the logical tick window:
@@ -167,13 +179,13 @@ private long bucketStart(long timestamp) {
167179
}
168180

169181
@Override
170-
public DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
182+
public DelayedDeliveryTracker createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
171183
String subscriptionName = dispatcher.getSubscription().getName();
172184

173185
SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName,
174186
k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
175187
fixedDelayDetectionLookahead, clock));
176-
return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext);
188+
return new InMemoryTopicDelayedDeliveryTracker(this, subContext);
177189
}
178190

179191
@Override
@@ -209,6 +221,13 @@ public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher)
209221
}
210222
}
211223

224+
/**
225+
* Whether there are active subscriptions registered with this manager.
226+
*/
227+
public boolean hasActiveSubscriptions() {
228+
return !subscriptionContexts.isEmpty();
229+
}
230+
212231
@Override
213232
public void onTickTimeUpdated(long newTickTimeMillis) {
214233
if (this.tickTimeMillis == newTickTimeMillis) {
@@ -560,7 +579,7 @@ public void run(Timeout timeout) throws Exception {
560579
// If a significant portion of subscriptions are eligible, opportunistically prune (throttled)
561580
int subs = subscriptionContexts.size();
562581
int eligible = toTrigger.size();
563-
int threshold = Math.max(1, subs / 2);
582+
int threshold = Math.max(1, (int) Math.ceil(subs * pruneEligibleRatio));
564583
if (eligible >= threshold) {
565584
maybePruneByTime();
566585
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/TopicDelayedDeliveryTrackerManager.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,18 @@
2626
* allowing different implementations (in-memory, bucket-based) to share the same contract.
2727
* <p>
2828
* The manager maintains a single global delayed message index per topic that is shared by all
29-
* subscriptions, and provides per-subscription "view" objects that implement DelayedDeliveryTracker
29+
* subscriptions, and provides per-subscription tracker objects that implement DelayedDeliveryTracker
3030
* interface for compatibility with existing dispatcher logic.
3131
*/
3232
public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable {
3333

3434
/**
35-
* Create or get a delayed delivery tracker view for the specified subscription.
35+
* Create or get a delayed delivery tracker for the specified subscription.
3636
*
3737
* @param dispatcher the dispatcher instance for the subscription
38-
* @return a DelayedDeliveryTracker view for the subscription
38+
* @return a DelayedDeliveryTracker bound to the subscription
3939
*/
40-
DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher);
40+
DelayedDeliveryTracker createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher);
4141

4242
/**
4343
* Unregister a subscription from the manager.
@@ -71,4 +71,4 @@ public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable {
7171
* Close the manager and release all resources.
7272
*/
7373
void close();
74-
}
74+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
5454
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
5555
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
56-
import org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerView;
56+
import org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTracker;
5757
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
5858
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
5959
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
@@ -1485,7 +1485,7 @@ public void markDeletePositionMoveForward() {
14851485
trackerOpt = this.delayedDeliveryTracker;
14861486
}
14871487
trackerOpt.ifPresent(tracker -> {
1488-
if (tracker instanceof InMemoryTopicDelayedDeliveryTrackerView view) {
1488+
if (tracker instanceof InMemoryTopicDelayedDeliveryTracker view) {
14891489
Position md = cursor.getMarkDeletedPosition();
14901490
if (md != null) {
14911491
// Only update the cached mark-delete; avoid heavy operations in this callback

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
5151
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
5252
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
53-
import org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerView;
53+
import org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTracker;
5454
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
5555
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
5656
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
@@ -1316,7 +1316,7 @@ public void markDeletePositionMoveForward() {
13161316
trackerOpt = this.delayedDeliveryTracker;
13171317
}
13181318
trackerOpt.ifPresent(tracker -> {
1319-
if (tracker instanceof InMemoryTopicDelayedDeliveryTrackerView view) {
1319+
if (tracker instanceof InMemoryTopicDelayedDeliveryTracker view) {
13201320
Position md = cursor.getMarkDeletedPosition();
13211321
if (md != null) {
13221322
// Only update the cached mark-delete; avoid heavy operations in this callback

0 commit comments

Comments
 (0)