Skip to content

Commit 8596e6a

Browse files
liangyepianzhoulhotari
authored andcommitted
[fix][broker] fix wrong method name checkTopicExists. (#24293)
The current method `checkTopicExists` is an asynchronous method but follows a synchronous naming convention (lacking the `Async` suffix). This naming inconsistency can mislead developers into assuming it's a blocking operation, potentially causing misuse in client code. Since this method is `public`, we cannot remove it directly without breaking backward compatibility. 1. **Introduce a new asynchronous method**: - Added `checkTopicExistsAsync()` with the correct asynchronous naming convention. - Internally delegates to the original `checkTopicExists()` method to retain existing logic. 2. **Deprecate the original method**: - Marked `checkTopicExists()` as `@Deprecated` with a note directing users to the new `checkTopicExistsAsync()`. 3. **Refactor internal usages**: - Updated all internal calls to use `checkTopicExistsAsync()` instead of the deprecated method. 4. **Documentation updates**: - Added Javadoc to `checkTopicExists()` clarifying its deprecated status and replacement. This approach maintains backward compatibility while aligning method names with their asynchronous behavior. (cherry picked from commit af24849)
1 parent 0ca37c3 commit 8596e6a

File tree

9 files changed

+27
-14
lines changed

9 files changed

+27
-14
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
808808
}
809809

810810
protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
811-
return pulsar().getNamespaceService().checkTopicExists(topicName)
811+
return pulsar().getNamespaceService().checkTopicExistsAsync(topicName)
812812
.thenAccept(info -> {
813813
boolean exists = info.isExists();
814814
info.recycle();
@@ -5638,7 +5638,7 @@ protected CompletableFuture<Void> validateShadowTopics(List<String> shadowTopics
56385638
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
56395639
"Only persistent topic can be set as shadow topic"));
56405640
}
5641-
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
5641+
futures.add(pulsar().getNamespaceService().checkTopicExistsAsync(shadowTopicName)
56425642
.thenAccept(info -> {
56435643
boolean exists = info.isExists();
56445644
info.recycle();

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName
7575
return CompletableFuture.completedFuture(true);
7676
}
7777
// Case-2: Persistent topic.
78-
return pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> {
78+
return pulsar().getNamespaceService().checkTopicExistsAsync(topicName).thenCompose(info -> {
7979
boolean exists = info.isExists();
8080
info.recycle();
8181
if (exists) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1368,8 +1368,21 @@ public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(Names
13681368
}
13691369

13701370
/***
1371-
* Check topic exists( partitioned or non-partitioned ).
1371+
* Checks whether the topic exists( partitioned or non-partitioned ).
13721372
*/
1373+
public CompletableFuture<TopicExistsInfo> checkTopicExistsAsync(TopicName topic) {
1374+
return checkTopicExists(topic);
1375+
}
1376+
1377+
/**
1378+
* Checks whether the topic exists( partitioned or non-partitioned ).
1379+
*
1380+
* @deprecated This method uses a misleading synchronous name for an asynchronous operation.
1381+
* Use {@link #checkTopicExistsAsync(TopicName topic)} instead.
1382+
*
1383+
* @see #checkTopicExistsAsync(TopicName topic)
1384+
*/
1385+
@Deprecated
13731386
public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) {
13741387
return pulsar.getBrokerService()
13751388
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3238,7 +3238,7 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
32383238
if (pulsar.getNamespaceService() == null) {
32393239
return FutureUtil.failedFuture(new NamingException("namespace service is not ready"));
32403240
}
3241-
return pulsar.getNamespaceService().checkTopicExists(topicName).thenComposeAsync(topicExistsInfo -> {
3241+
return pulsar.getNamespaceService().checkTopicExistsAsync(topicName).thenComposeAsync(topicExistsInfo -> {
32423242
final boolean topicExists = topicExistsInfo.isExists();
32433243
final TopicType topicType = topicExistsInfo.getTopicType();
32443244
final Integer partitions = topicExistsInfo.getPartitions();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
593593
&& brokerAllowAutoCreate;
594594
if (!autoCreateIfNotExist) {
595595
NamespaceService namespaceService = getBrokerService().getPulsar().getNamespaceService();
596-
namespaceService.checkTopicExists(topicName).thenAccept(topicExistsInfo -> {
596+
namespaceService.checkTopicExistsAsync(topicName).thenAccept(topicExistsInfo -> {
597597
lookupSemaphore.release();
598598
if (!topicExistsInfo.isExists()) {
599599
writeAndFlush(Commands.newPartitionMetadataResponse(

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ public void testLookUpWithException() throws Exception {
360360
CompletableFuture existFuture = new CompletableFuture();
361361
existFuture.complete(TopicExistsInfo.newNonPartitionedTopicExists());
362362
doReturn(future).when(nameSpaceService).getBrokerServiceUrlAsync(any(), any());
363-
doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
363+
doReturn(existFuture).when(nameSpaceService).checkTopicExistsAsync(any());
364364
CompletableFuture existBooleanFuture = new CompletableFuture();
365365
existBooleanFuture.complete(false);
366366
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
@@ -385,7 +385,7 @@ public void testLookUpTopicNotExist() throws Exception {
385385
existFuture.complete(TopicExistsInfo.newTopicNotExists());
386386
CompletableFuture existBooleanFuture = new CompletableFuture();
387387
existBooleanFuture.complete(false);
388-
doReturn(existFuture).when(nameSpaceService).checkTopicExists(any());
388+
doReturn(existFuture).when(nameSpaceService).checkTopicExistsAsync(any());
389389
doReturn(existBooleanFuture).when(nameSpaceService).checkNonPartitionedTopicExists(any());
390390
doReturn(nameSpaceService).when(pulsar).getNamespaceService();
391391
AsyncResponse asyncResponse = mock(AsyncResponse.class);

pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void testLookupTopicNotExist() throws Exception {
152152
NamespaceService namespaceService = pulsar.getNamespaceService();
153153
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
154154
future.complete(TopicExistsInfo.newTopicNotExists());
155-
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
155+
doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class));
156156
CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
157157
booleanFuture.complete(false);
158158
doReturn(booleanFuture).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
@@ -266,7 +266,7 @@ public void testValidateReplicationSettingsOnNamespace() throws Exception {
266266
NamespaceService namespaceService = pulsar.getNamespaceService();
267267
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
268268
future.complete(TopicExistsInfo.newTopicNotExists());
269-
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
269+
doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class));
270270
CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
271271
booleanFuture.complete(false);
272272
doReturn(future).when(namespaceService).checkNonPartitionedTopicExists(any(TopicName.class));
@@ -303,7 +303,7 @@ public void topicNotFound() throws Exception {
303303
NamespaceService namespaceService = pulsar.getNamespaceService();
304304
CompletableFuture<TopicExistsInfo> future = new CompletableFuture<>();
305305
future.complete(TopicExistsInfo.newTopicNotExists());
306-
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
306+
doReturn(future).when(namespaceService).checkTopicExistsAsync(any(TopicName.class));
307307

308308
// Get the current semaphore first
309309
Integer state1 = pulsar.getBrokerService().getLookupRequestSemaphore().availablePermits();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public void testRemoveCluster() throws Exception {
220220
assertFalse(tps.containsKey(topicP1));
221221
assertFalse(tps.containsKey(topicChangeEvents));
222222
assertFalse(pulsar1.getNamespaceService()
223-
.checkTopicExists(TopicName.get(topicChangeEvents))
223+
.checkTopicExistsAsync(TopicName.get(topicChangeEvents))
224224
.get(5, TimeUnit.SECONDS).isExists());
225225
// Verify: schema will be removed in local cluster, and remote cluster will not.
226226
List<CompletableFuture<StoredSchema>> schemaList13

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ public void testRemoveCluster() throws Exception {
213213
= pulsar1.getBrokerService().getTopics();
214214
assertFalse(tps.containsKey(topic));
215215
assertFalse(tps.containsKey(topicChangeEvents));
216-
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic))
216+
assertFalse(pulsar1.getNamespaceService().checkTopicExistsAsync(TopicName.get(topic))
217217
.get(5, TimeUnit.SECONDS).isExists());
218218
assertFalse(pulsar1.getNamespaceService()
219-
.checkTopicExists(TopicName.get(topicChangeEvents))
219+
.checkTopicExistsAsync(TopicName.get(topicChangeEvents))
220220
.get(5, TimeUnit.SECONDS).isExists());
221221
});
222222

0 commit comments

Comments
 (0)