Skip to content

Commit 03dd2d3

Browse files
committed
use dedicated bookkeeper-ml-offload-scheduler to avoid any potential blocking of core services
1 parent ee33c99 commit 03dd2d3

File tree

6 files changed

+44
-9
lines changed

6 files changed

+44
-9
lines changed

conf/broker.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,6 +1249,9 @@ managedLedgerDigestType=CRC32C
12491249
# Number of threads to be used for managed ledger scheduled tasks
12501250
managedLedgerNumSchedulerThreads=
12511251

1252+
# Number of threads to be use for managed ledger scheduled offload operations.
1253+
managedLedgerNumOffloadSchedulerThreads=
1254+
12521255
# Amount of memory to use for caching data payload in managed ledger. This memory
12531256
# is allocated from JVM direct memory and it's shared across all the topics
12541257
# running in the same broker. By default, uses 1/5th of available direct memory

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ public class ManagedLedgerFactoryConfig {
147147
*/
148148
private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
149149

150+
/**
151+
* Number of threads to use for ML offload operations.
152+
*/
153+
private int numManagedLedgerOffloadSchedulerThreads = Runtime.getRuntime().availableProcessors();
154+
150155
/**
151156
* ManagedCursorInfo compression threshold. If the origin metadata size below configuration.
152157
* compression will not apply.

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
122122
@Getter
123123
private final ScheduledExecutorService cacheEvictionExecutor;
124124

125+
// Dedicated thread pool for offload operations to isolate from core services
126+
@Getter
127+
private final OrderedScheduler offloadScheduler;
128+
125129
@Getter
126130
protected final ManagedLedgerFactoryMBeanImpl mbean;
127131

@@ -234,6 +238,17 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
234238
.build();
235239
cacheEvictionExecutor = Executors
236240
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
241+
242+
// Create dedicated scheduler for offload operations to prevent blocking core services
243+
// Use a conservative thread count to minimize resource overhead while ensuring adequate capacity
244+
int offloadThreads = config.getNumManagedLedgerOffloadSchedulerThreads();
245+
offloadScheduler = OrderedScheduler.newSchedulerBuilder()
246+
.numThreads(offloadThreads)
247+
.statsLogger(statsLogger)
248+
.traceTaskExecution(config.isTraceTaskExecution())
249+
.name("bookkeeper-ml-offload-scheduler")
250+
.build();
251+
log.info("Created offload scheduler with {} threads for ML operations isolation", offloadThreads);
237252
this.metadataServiceAvailable = true;
238253
this.bookkeeperFactory = bookKeeperGroupFactory;
239254
this.isBookkeeperManaged = isBookkeeperManaged;
@@ -647,6 +662,11 @@ public CompletableFuture<Void> shutdownAsync() throws ManagedLedgerException {
647662
flushCursorsTask.cancel(true);
648663
cacheEvictionExecutor.shutdownNow();
649664

665+
// Shutdown offload scheduler
666+
if (offloadScheduler != null) {
667+
offloadScheduler.shutdownNow();
668+
}
669+
650670
List<String> ledgerNames = new ArrayList<>(this.ledgers.keySet());
651671
List<CompletableFuture<Void>> futures = new ArrayList<>(ledgerNames.size());
652672
int numLedgers = ledgerNames.size();
@@ -1037,7 +1057,7 @@ public void operationFailed(MetaStoreException e) {
10371057
return OffloadUtils.cleanupOffloaded(li.ledgerId, uuid, mlConfig,
10381058
OffloadUtils.getOffloadDriverMetadata(ls,
10391059
mlConfig.getLedgerOffloader().getOffloadDriverMetadata()),
1040-
"Deletion", managedLedgerName, scheduledExecutor);
1060+
"Deletion", managedLedgerName, offloadScheduler);
10411061
}
10421062

10431063
return CompletableFuture.completedFuture(null);

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2771,7 +2771,9 @@ public void maybeOffloadInBackground(CompletableFuture<Position> promise) {
27712771
final long offloadThresholdInSeconds =
27722772
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
27732773
if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
2774-
executor.execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise));
2774+
// Use dedicated offload scheduler to avoid any potential blocking of core services
2775+
factory.getOffloadScheduler()
2776+
.execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise));
27752777
}
27762778
}
27772779

@@ -2791,7 +2793,8 @@ private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInS
27912793
}
27922794

27932795
if (!offloadMutex.tryLock()) {
2794-
scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise),
2796+
// Use dedicated offload scheduler to avoid blocking core services
2797+
factory.getOffloadScheduler().schedule(() -> maybeOffloadInBackground(finalPromise),
27952798
100, TimeUnit.MILLISECONDS);
27962799
return;
27972800
}
@@ -3384,7 +3387,7 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
33843387
UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
33853388
OffloadUtils.cleanupOffloaded(ledgerId, uuid, config,
33863389
OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
3387-
"Trimming", name, scheduledExecutor);
3390+
"Trimming", name, factory.getOffloadScheduler());
33883391
}
33893392
}
33903393

@@ -3657,7 +3660,7 @@ void offloadLoop(CompletableFuture<Position> promise, Queue<LedgerInfo> ledgersT
36573660
TimeUnit.HOURS.toMillis(1)).limit(10),
36583661
FAIL_ON_CONFLICT,
36593662
() -> completeLedgerInfoForOffloaded(ledgerId, uuid),
3660-
scheduledExecutor, name)
3663+
factory.getOffloadScheduler(), name)
36613664
.whenComplete((ignore2, exception) -> {
36623665
if (exception != null) {
36633666
Throwable e = FutureUtil.unwrapCompletionException(exception);
@@ -3678,7 +3681,7 @@ void offloadLoop(CompletableFuture<Position> promise, Queue<LedgerInfo> ledgersT
36783681
OffloadUtils.cleanupOffloaded(
36793682
ledgerId, uuid, config,
36803683
driverMetadata,
3681-
"Metastore failure", name, scheduledExecutor);
3684+
"Metastore failure", name, factory.getOffloadScheduler());
36823685
}
36833686
});
36843687
})
@@ -3740,8 +3743,8 @@ private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation tran
37403743
CompletableFuture<Void> finalPromise) {
37413744
synchronized (this) {
37423745
if (!metadataMutex.tryLock()) {
3743-
// retry in 100 milliseconds
3744-
scheduledExecutor.schedule(
3746+
// retry in 100 milliseconds using offload scheduler to avoid blocking core services
3747+
factory.getOffloadScheduler().schedule(
37453748
() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise), 100,
37463749
TimeUnit.MILLISECONDS);
37473750
} else { // lock acquired
@@ -3805,7 +3808,7 @@ private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUI
38053808
config.getLedgerOffloader().getOffloadDriverMetadata()),
38063809
"Previous failed offload",
38073810
name,
3808-
scheduledExecutor);
3811+
factory.getOffloadScheduler());
38093812
}
38103813
LedgerInfo.Builder builder = oldInfo.toBuilder();
38113814
builder.getOffloadContextBuilder()

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2313,6 +2313,8 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
23132313
)
23142314
private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors();
23152315

2316+
private int managedLedgerNumOffloadSchedulerThreads = Runtime.getRuntime().availableProcessors();
2317+
23162318
@FieldContext(
23172319
category = CATEGORY_STORAGE_ML,
23182320
doc = "Max number of entries to append to a ledger before triggering a rollover.\n\n"

pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata
113113
conf.getManagedLedgerInfoCompressionThresholdInBytes());
114114
managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds());
115115
managedLedgerFactoryConfig.setManagedCursorInfoCompressionType(conf.getManagedCursorInfoCompressionType());
116+
managedLedgerFactoryConfig.setNumManagedLedgerOffloadSchedulerThreads(
117+
conf.getManagedLedgerNumOffloadSchedulerThreads());
116118
managedLedgerFactoryConfig.setManagedCursorInfoCompressionThresholdInBytes(
117119
conf.getManagedCursorInfoCompressionThresholdInBytes());
118120

0 commit comments

Comments
 (0)