Skip to content

Commit 76e5479

Browse files
nodecelhotari
authored andcommitted
[fix][broker] Fix potential deadlock when creating partitioned topic (#24313)
### Motivation There is still a call to the synchronous method in `internalCreatePartitionedTopic`, which can lead to a deadlock under certain conditions. ### Modifications - Replaced synchronous call to `getNamespacePolicies()` with `getNamespacePoliciesAsync()`. - Replaced synchronous call to `getTopicPartitionList()` with `getTopicPartitionListAsync()`. - Updated `internalCreatePartitionedTopic` to propagate the async flow properly. (cherry picked from commit 7ca8c0e)
1 parent 8596e6a commit 76e5479

File tree

1 file changed

+34
-19
lines changed

1 file changed

+34
-19
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ protected WorkerService validateAndGetWorkerService() {
305305
}
306306
}
307307

308+
/**
309+
* @deprecated Use {@link #getNamespacePoliciesAsync(NamespaceName)} instead.
310+
*/
311+
@Deprecated
308312
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
309313
try {
310314
Policies policies = namespaceResources().getPolicies(namespaceName)
@@ -541,6 +545,10 @@ protected CompletableFuture<List<String>> getPartitionedTopicListAsync(TopicDoma
541545
.listPartitionedTopicsAsync(namespaceName, topicDomain);
542546
}
543547

548+
/**
549+
* @deprecated Use {@link #getTopicPartitionListAsync()} instead.
550+
*/
551+
@Deprecated
544552
protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
545553
try {
546554
return getPulsarResources().getTopicResources().getExistingPartitions(topicName)
@@ -552,6 +560,10 @@ protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
552560
}
553561
}
554562

563+
protected CompletableFuture<List<String>> getTopicPartitionListAsync() {
564+
return getPulsarResources().getTopicResources().getExistingPartitions(topicName);
565+
}
566+
555567
protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
556568
boolean createLocalTopicOnly) {
557569
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly, null);
@@ -571,33 +583,36 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
571583
return;
572584
}
573585
validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.CREATE_TOPIC)
574-
.thenRun(() -> {
575-
Policies policies = null;
576-
try {
577-
policies = getNamespacePolicies(namespaceName);
578-
} catch (RestException e) {
579-
if (e.getResponse().getStatus() != Status.NOT_FOUND.getStatusCode()) {
580-
throw e;
586+
.thenCompose((__) -> getNamespacePoliciesAsync(namespaceName).exceptionally(ex -> {
587+
Throwable unwrapped = FutureUtil.unwrapCompletionException(ex);
588+
if (unwrapped instanceof RestException re) {
589+
if (re.getResponse().getStatus() == Status.NOT_FOUND.getStatusCode()) {
590+
return null;
581591
}
582592
}
583-
593+
throw FutureUtil.wrapToCompletionException(ex);
594+
}))
595+
.thenCompose(policies -> {
584596
int maxTopicsPerNamespace = policies != null && policies.max_topics_per_namespace != null
585597
? policies.max_topics_per_namespace : pulsar().getConfig().getMaxTopicsPerNamespace();
586598

587599
// new create check
588600
if (maxTopicsPerNamespace > 0 && !pulsar().getBrokerService().isSystemTopic(topicName)) {
589-
List<String> partitionedTopics = getTopicPartitionList(TopicDomain.persistent);
590-
// exclude created system topic
591-
long topicsCount =
592-
partitionedTopics.stream().filter(t ->
593-
!pulsar().getBrokerService().isSystemTopic(TopicName.get(t))).count();
594-
if (topicsCount + numPartitions > maxTopicsPerNamespace) {
595-
log.error("[{}] Failed to create partitioned topic {}, "
596-
+ "exceed maximum number of topics in namespace", clientAppId(), topicName);
597-
throw new RestException(Status.PRECONDITION_FAILED,
598-
"Exceed maximum number of topics in namespace.");
599-
}
601+
return getTopicPartitionListAsync().thenAccept(partitionedTopics -> {
602+
// exclude created system topic
603+
long topicsCount = partitionedTopics.stream()
604+
.filter(t -> !pulsar().getBrokerService().isSystemTopic(TopicName.get(t)))
605+
.count();
606+
if (topicsCount + numPartitions > maxTopicsPerNamespace) {
607+
log.error("[{}] Failed to create partitioned topic {}, "
608+
+ "exceed maximum number of topics in namespace", clientAppId(),
609+
topicName);
610+
throw new RestException(Status.PRECONDITION_FAILED,
611+
"Exceed maximum number of topics in namespace.");
612+
}
613+
});
600614
}
615+
return CompletableFuture.completedFuture(null);
601616
})
602617
.thenCompose(__ -> checkTopicExistsAsync(topicName))
603618
.thenAccept(exists -> {

0 commit comments

Comments
 (0)