Skip to content

Commit 938e4f6

Browse files
committed
[feat][broker] Replace highestDeliveryTimeTracked with AtomicLong for thread-safe updates and improve concurrency handling in InMemoryTopicDelayedDeliveryTrackerManager
1 parent e576c65 commit 938e4f6

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedD
8383
private final long minPruneIntervalNanos;
8484

8585
// Fixed-delay detection (parity with legacy behavior)
86-
private volatile long highestDeliveryTimeTracked = 0;
86+
private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0);
8787
private volatile boolean messagesHaveFixedDelay = true;
8888

8989
// Per-bucket locks (timestamp -> lock) for fine-grained concurrency
@@ -302,10 +302,13 @@ boolean addMessageForSub(SubContext subContext, long ledgerId, long entryId, lon
302302
}
303303

304304
private void checkAndUpdateHighest(long deliverAt) {
305-
if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) {
306-
messagesHaveFixedDelay = false;
307-
}
308-
highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, deliverAt);
305+
long current;
306+
do {
307+
current = highestDeliveryTimeTracked.get();
308+
if (deliverAt < (current - tickTimeMillis)) {
309+
messagesHaveFixedDelay = false;
310+
}
311+
} while (deliverAt > current && !highestDeliveryTimeTracked.compareAndSet(current, deliverAt));
309312
}
310313

311314
/**

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerView.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class InMemoryTopicDelayedDeliveryTrackerView implements DelayedDeliveryT
3333

3434
private final InMemoryTopicDelayedDeliveryTrackerManager manager;
3535
private final InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext;
36-
private boolean closed = false;
36+
private volatile boolean closed = false;
3737

3838
public InMemoryTopicDelayedDeliveryTrackerView(InMemoryTopicDelayedDeliveryTrackerManager manager,
3939
InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) {

0 commit comments

Comments
 (0)