@@ -64,7 +64,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
6464
6565 protected boolean isFirstRead = true ;
6666 private static final int CONSUMER_CONSISTENT_HASH_REPLICAS = 100 ;
67- private volatile int addConsumerCount = 0 ;
67+ private int addConsumerFailedAttemptCount = 0 ;
6868
6969 public AbstractDispatcherSingleActiveConsumer (SubType subscriptionType , int partitionIndex ,
7070 String topicName , Subscription subscription ,
@@ -233,11 +233,12 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
233233 * @return
234234 */
235235 private synchronized CompletableFuture <Void > internalAddConsumer (Consumer actConsumer , Consumer consumer ) {
236- addConsumerCount ++;
237- if (addConsumerCount >= 5 ) {
238- log .warn ("Added consumer failed, consumers {}, active consumer {}, active state : {}" , consumers ,
239- actConsumer , actConsumer .cnx ().isActive ());
240- addConsumerCount = 0 ;
236+ addConsumerFailedAttemptCount ++;
237+ if (addConsumerFailedAttemptCount >= 5 ) {
238+ log .warn ("Added consumer failed with attempt count {}, consumers: {}, active consumer: {},"
239+ + "active state : {}" ,
240+ addConsumerFailedAttemptCount , consumers , actConsumer , actConsumer .cnx ().isActive ());
241+ addConsumerFailedAttemptCount = 0 ;
241242 return FutureUtil .failedFuture (new ConsumerBusyException ("Exclusive consumer is already"
242243 + " connected" ));
243244 }
0 commit comments