Skip to content

Commit ee33c99

Browse files
thetumbledfengwenzhi
andauthored
[improve][broker] PIP-444: Rate limit for deleting ledger to alleviate the zk pressure. (#24760)
Co-authored-by: fengwenzhi <fengwenzhi.max@bigo.sg>
1 parent 9737d03 commit ee33c99

File tree

4 files changed

+100
-5
lines changed

4 files changed

+100
-5
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.time.Clock;
2424
import java.util.Arrays;
2525
import java.util.Map;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Semaphore;
2628
import java.util.concurrent.TimeUnit;
2729
import lombok.Getter;
2830
import lombok.Setter;
@@ -61,6 +63,8 @@ public class ManagedLedgerConfig {
6163
private int metadataMaxEntriesPerLedger = 50000;
6264
private int ledgerRolloverTimeout = 4 * 3600;
6365
private double throttleMarkDelete = 0;
66+
private Semaphore ledgerDeletionSemaphore;
67+
private ExecutorService ledgerDeleteExecutor;
6468
private long retentionTimeMs = 0;
6569
private long retentionSizeInMB = 0;
6670
private boolean autoSkipNonRecoverableData;
@@ -410,6 +414,30 @@ public ManagedLedgerConfig setThrottleMarkDelete(double throttleMarkDelete) {
410414
return this;
411415
}
412416

417+
/**
418+
* @return the semaphore used to limit concurrent ledger deletions
419+
*/
420+
public Semaphore getLedgerDeletionSemaphore() {
421+
return ledgerDeletionSemaphore;
422+
}
423+
424+
public ManagedLedgerConfig setLedgerDeletionSemaphore(Semaphore semaphore) {
425+
this.ledgerDeletionSemaphore = semaphore;
426+
return this;
427+
}
428+
429+
/**
430+
* @return the executor service to be used for deleting ledgers
431+
*/
432+
public ExecutorService getLedgerDeleteExecutor() {
433+
return ledgerDeleteExecutor;
434+
}
435+
436+
public ManagedLedgerConfig setLedgerDeleteExecutor(ExecutorService executor) {
437+
this.ledgerDeleteExecutor = executor;
438+
return this;
439+
}
440+
413441
/**
414442
* Set the retention time for the ManagedLedger.
415443
* <p>

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.concurrent.ExecutionException;
5858
import java.util.concurrent.ExecutorService;
5959
import java.util.concurrent.ScheduledFuture;
60+
import java.util.concurrent.Semaphore;
6061
import java.util.concurrent.TimeUnit;
6162
import java.util.concurrent.TimeoutException;
6263
import java.util.concurrent.atomic.AtomicBoolean;
@@ -354,6 +355,11 @@ public boolean isFenced() {
354355
private long lastEvictOffloadedLedgers;
355356
private static final int MINIMUM_EVICTION_INTERVAL_DIVIDER = 10;
356357

358+
// Semaphore to limit concurrent ledger deletion
359+
private Semaphore deleteLedgerSemaphore = null;
360+
// Executor service for executing ledger deletion tasks
361+
private ExecutorService deleteLedgerExecutor = null;
362+
357363
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
358364
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
359365
final String name) {
@@ -402,6 +408,10 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
402408
this.minBacklogEntriesForCaching = config.getMinimumBacklogEntriesForCaching();
403409
this.maxBacklogBetweenCursorsForCaching = config.getMaxBacklogBetweenCursorsForCaching();
404410
this.managedLedgerAttributes = new ManagedLedgerAttributes(this);
411+
if (config.getLedgerDeletionSemaphore() != null) {
412+
this.deleteLedgerSemaphore = config.getLedgerDeletionSemaphore();
413+
this.deleteLedgerExecutor = config.getLedgerDeleteExecutor();
414+
}
405415
}
406416

407417
synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
@@ -569,7 +579,7 @@ protected synchronized void initializeBookKeeper(final ManagedLedgerInitializeLe
569579
public void operationComplete(Void v, Stat stat) {
570580
ledgersStat = stat;
571581
emptyLedgersToBeDeleted.forEach(ledgerId -> {
572-
bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
582+
asyncDeleteLedgerWithConcurrencyLimit(ledgerId, (rc, ctx) -> {
573583
log.info("[{}] Deleted empty ledger ledgerId={} rc={}", name, ledgerId, rc);
574584
}, null);
575585
});
@@ -1763,7 +1773,7 @@ public void operationFailed(MetaStoreException e) {
17631773
log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
17641774
handleBadVersion(e);
17651775
mbean.startDataLedgerDeleteOp();
1766-
bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
1776+
asyncDeleteLedgerWithConcurrencyLimit(lh.getId(), (rc1, ctx1) -> {
17671777
mbean.endDataLedgerDeleteOp();
17681778
if (rc1 != BKException.Code.OK) {
17691779
log.warn("[{}] Failed to delete ledger {}: {}", name, lh.getId(),
@@ -1846,7 +1856,7 @@ protected synchronized void updateLedgersIdsComplete(@Nullable LedgerHandle orig
18461856
STATE_UPDATER.set(this, State.LedgerOpened);
18471857
// Delete original "currentLedger" if it has been removed from "ledgers".
18481858
if (originalCurrentLedger != null && !ledgers.containsKey(originalCurrentLedger.getId())){
1849-
bookKeeper.asyncDeleteLedger(originalCurrentLedger.getId(), (rc, ctx) -> {
1859+
asyncDeleteLedgerWithConcurrencyLimit(originalCurrentLedger.getId(), (rc, ctx) -> {
18501860
mbean.endDataLedgerDeleteOp();
18511861
log.info("[{}] Delete complete for empty ledger {}. rc={}", name, originalCurrentLedger.getId(), rc);
18521862
}, null);
@@ -3385,7 +3395,7 @@ private CompletableFuture<Void> asyncDeleteLedger(long ledgerId, long retry) {
33853395
}
33863396

33873397
private void asyncDeleteLedgerWithRetry(CompletableFuture<Void> future, long ledgerId, long retry) {
3388-
bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
3398+
asyncDeleteLedgerWithConcurrencyLimit(ledgerId, (rc, ctx) -> {
33893399
if (isNoSuchLedgerExistsException(rc)) {
33903400
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
33913401
future.complete(null);
@@ -3408,6 +3418,33 @@ private void asyncDeleteLedgerWithRetry(CompletableFuture<Void> future, long led
34083418
}, null);
34093419
}
34103420

3421+
/**
3422+
* Delete a ledger asynchronously, applying a concurrency limit if configured.
3423+
* @param ledgerId
3424+
* @param cb
3425+
* @param ctx
3426+
*/
3427+
private void asyncDeleteLedgerWithConcurrencyLimit(long ledgerId,
3428+
org.apache.bookkeeper.client.AsyncCallback.DeleteCallback cb,
3429+
Object ctx) {
3430+
if (deleteLedgerSemaphore != null) {
3431+
AsyncCallback.DeleteCallback cbWrapper = (rc, ctx1) -> {
3432+
deleteLedgerSemaphore.release();
3433+
cb.deleteComplete(rc, ctx1);
3434+
};
3435+
deleteLedgerExecutor.execute(() -> {
3436+
try {
3437+
deleteLedgerSemaphore.acquire();
3438+
bookKeeper.asyncDeleteLedger(ledgerId, cbWrapper, ctx);
3439+
} catch (InterruptedException e) {
3440+
log.error("[{}] Interrupted while waiting to delete ledger {}", name, ledgerId);
3441+
}
3442+
});
3443+
} else {
3444+
bookKeeper.asyncDeleteLedger(ledgerId, cb, ctx);
3445+
}
3446+
}
3447+
34113448
@SuppressWarnings("checkstyle:fallthrough")
34123449
private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
34133450
List<LedgerInfo> ledgers = Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values());
@@ -3422,7 +3459,7 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
34223459
if (log.isDebugEnabled()) {
34233460
log.debug("[{}] Deleting ledger {}", name, ls);
34243461
}
3425-
bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> {
3462+
asyncDeleteLedgerWithConcurrencyLimit(ls.getLedgerId(), (rc, ctx1) -> {
34263463
switch (rc) {
34273464
case Code.NoSuchLedgerExistsException:
34283465
case Code.NoSuchLedgerExistsOnMetadataServerException:

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2262,6 +2262,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
22622262
doc = "Rate limit the amount of writes per second generated by consumer acking the messages"
22632263
)
22642264
private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
2265+
@FieldContext(
2266+
category = CATEGORY_STORAGE_ML,
2267+
doc = "Max number of concurrent requests for deleting ledgers at broker level"
2268+
)
2269+
private int managedLedgerDeleteMaxConcurrentRequests = 1000;
22652270
@FieldContext(
22662271
category = CATEGORY_STORAGE_ML,
22672272
dynamic = true,

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,12 @@ public class BrokerService implements Closeable {
329329
private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher();
330330
private volatile boolean unloaded = false;
331331

332+
// semaphore for limiting the concurrency of ledger deletion at broker level,
333+
// thus all managed ledgers sharing the same semaphore
334+
private final Semaphore ledgerDeletionSemaphore;
335+
336+
private final ExecutorProvider ledgerDeletionExecutorProvider;
337+
332338
public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception {
333339
this.pulsar = pulsar;
334340
this.clock = pulsar.getClock();
@@ -451,6 +457,16 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
451457
.getBrokerEntryPayloadProcessors(), BrokerService.class.getClassLoader());
452458

453459
this.bundlesQuotas = new BundlesQuotas(pulsar);
460+
if (pulsar.getConfiguration().getManagedLedgerDeleteMaxConcurrentRequests() > 0) {
461+
log.info("Setting managed ledger deletion max concurrent requests to {}",
462+
pulsar.getConfiguration().getManagedLedgerDeleteMaxConcurrentRequests());
463+
this.ledgerDeletionSemaphore = new Semaphore(
464+
pulsar.getConfiguration().getManagedLedgerDeleteMaxConcurrentRequests());
465+
this.ledgerDeletionExecutorProvider = new ExecutorProvider(1, "pulsar-ledger-deletion");
466+
} else {
467+
this.ledgerDeletionSemaphore = null;
468+
this.ledgerDeletionExecutorProvider = null;
469+
}
454470
}
455471

456472
protected DispatchRateLimiterFactory createDispatchRateLimiterFactory(ServiceConfiguration config)
@@ -802,6 +818,12 @@ public CompletableFuture<Void> closeAsync() {
802818
try {
803819
log.info("Shutting down Pulsar Broker service");
804820

821+
// shutdown executor for ledger deletion
822+
if (ledgerDeletionExecutorProvider != null) {
823+
log.info("Shutting down executor for ledger deletion...");
824+
ledgerDeletionExecutorProvider.shutdownNow();
825+
}
826+
805827
// unregister non-static metrics collectors
806828
pendingTopicLoadRequests.unregister();
807829
pendingLookupRequests.unregister();
@@ -2057,6 +2079,9 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@NonNull To
20572079
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate() >= 0
20582080
? persistencePolicies.getManagedLedgerMaxMarkDeleteRate()
20592081
: serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit());
2082+
managedLedgerConfig.setLedgerDeletionSemaphore(this.ledgerDeletionSemaphore);
2083+
managedLedgerConfig.setLedgerDeleteExecutor(this.ledgerDeletionExecutorProvider != null
2084+
? this.ledgerDeletionExecutorProvider.getExecutor() : null);
20602085
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
20612086
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
20622087

0 commit comments

Comments
 (0)