From bcb763baa670544ea90bda67243870d5ef085551 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 30 Oct 2025 13:09:15 +0800 Subject: [PATCH 1/3] Remove inactive exclusive consumer after checking liveness --- .../service/AbstractDispatcherSingleActiveConsumer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index baca6bf078cf0..bc49309a4de2c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -176,6 +176,11 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already" + " connected")); } else { + try { + removeConsumer(actConsumer); + } catch (BrokerServiceException e) { + log.warn("[{}] Remove inactive exclusive consumer {}", this.topicName, consumer); + } return addConsumer(consumer); } }); From 00921e0768fe19a7ff89c2692b4da9ff156ee487 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Mon, 3 Nov 2025 16:23:28 +0800 Subject: [PATCH 2/3] update --- ...bstractDispatcherSingleActiveConsumer.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index bc49309a4de2c..668c35c8ee35a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -64,6 +64,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas protected boolean isFirstRead = true; private static final int CONSUMER_CONSISTENT_HASH_REPLICAS = 100; + private volatile int addConsumerCount = 0; public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, String topicName, Subscription subscription, @@ -176,12 +177,7 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already" + " connected")); } else { - try { - removeConsumer(actConsumer); - } catch (BrokerServiceException e) { - log.warn("[{}] Remove inactive exclusive consumer {}", this.topicName, consumer); - } - return addConsumer(consumer); + return internalAddConsumer(actConsumer, consumer); } }); } else { @@ -230,6 +226,24 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { return CompletableFuture.completedFuture(null); } + /** + * This method is used to help debugging addConsumer failed for exclusive subscription. + * @param actConsumer + * @param consumer + * @return + */ + private synchronized CompletableFuture internalAddConsumer(Consumer actConsumer, Consumer consumer) { + addConsumerCount++; + if (addConsumerCount >= 5) { + log.warn("Added consumer failed, consumers {}, active consumer {}, active state : {}", consumers, + actConsumer, actConsumer.cnx().isActive()); + addConsumerCount = 0; + return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already" + + " connected")); + } + return addConsumer(consumer); + } + public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { log.info("Removing consumer {}", consumer); if (!consumers.remove(consumer)) { From 3db463d02037663328ced22c8ef16d1fdeb8c8cc Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Mon, 3 Nov 2025 18:58:38 +0800 Subject: [PATCH 3/3] address comment --- .../AbstractDispatcherSingleActiveConsumer.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 668c35c8ee35a..ec4d400d715eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -64,7 +64,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas protected boolean isFirstRead = true; private static final int CONSUMER_CONSISTENT_HASH_REPLICAS = 100; - private volatile int addConsumerCount = 0; + private int addConsumerFailedAttemptCount = 0; public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, String topicName, Subscription subscription, @@ -233,11 +233,12 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { * @return */ private synchronized CompletableFuture internalAddConsumer(Consumer actConsumer, Consumer consumer) { - addConsumerCount++; - if (addConsumerCount >= 5) { - log.warn("Added consumer failed, consumers {}, active consumer {}, active state : {}", consumers, - actConsumer, actConsumer.cnx().isActive()); - addConsumerCount = 0; + addConsumerFailedAttemptCount++; + if (addConsumerFailedAttemptCount >= 5) { + log.warn("Added consumer failed with attempt count {}, consumers: {}, active consumer: {}," + + "active state : {}", + addConsumerFailedAttemptCount, consumers, actConsumer, actConsumer.cnx().isActive()); + addConsumerFailedAttemptCount = 0; return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already" + " connected")); }