-
Notifications
You must be signed in to change notification settings - Fork 75
Open
Description
⚠️ heap memory leak because the ConcurrentLinkedQueue in Batcher is unbounded and filled
Describe the bug



public final CompletableFuture<CallResultT> submit(BatcherKeyT batcherKey, CallT request) {
if (avgLatencyNanos.estimate() < burstLatencyNanos) {
ICallTask<CallT, CallResultT, BatcherKeyT> callTask = new CallTask<>(batcherKey, request);
boolean offered = callTaskBuffers.offer(callTask);
assert offered;
trigger();
return callTask.resultPromise();
} else {
dropCounter.increment();
return CompletableFuture.failedFuture(new BackPressureException("Too high average latency"));
}
}
private void trigger() {
if (triggering.compareAndSet(false, true)) {
try {
if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < maxPipelineDepth.get()) {
batchAndEmit();
}
} catch (Throwable e) {
log.error("Unexpected exception", e);
} finally {
triggering.set(false);
if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < maxPipelineDepth.get()) {
trigger();
}
}
}
}
private void batchAndEmit() {
pipelineDepth.incrementAndGet();
long buildStart = System.nanoTime();
IBatchCall<CallT, CallResultT, BatcherKeyT> batchCall = batchPool.poll();
assert batchCall != null;
int batchSize = 0;
LinkedList<ICallTask<CallT, CallResultT, BatcherKeyT>> batchedTasks = new LinkedList<>();
ICallTask<CallT, CallResultT, BatcherKeyT> callTask;
while (batchSize < maxBatchSize && (callTask = callTaskBuffers.poll()) != null) {
batchCall.add(callTask);
batchedTasks.add(callTask);
batchSize++;
queueingTimeSummary.record(System.nanoTime() - callTask.ts());
}
batchSizeSummary.record(batchSize);
long execStart = System.nanoTime();
batchBuildTimeSummary.record((execStart - buildStart));
final int finalBatchSize = batchSize;
batchCall.execute()
.whenComplete((v, e) -> {
long execEnd = System.nanoTime();
if (e != null) {
log.error("Unexpected exception during handling batchcall result", e);
// reset max batch size
maxBatchSize = 1;
} else {
long thisLatency = execEnd - execStart;
if (thisLatency > 0) {
updateMaxBatchSize(finalBatchSize, thisLatency);
}
batchExecTimer.record(thisLatency, TimeUnit.NANOSECONDS);
}
batchedTasks.forEach(t -> {
long callLatency = execEnd - t.ts();
avgLatencyNanos.observe(callLatency);
batchCallTimer.record(callLatency, TimeUnit.NANOSECONDS);
});
batchCall.reset();
batchPool.offer(batchCall);
pipelineDepth.getAndDecrement();
if (!callTaskBuffers.isEmpty()) {
trigger();
}
});
}
Environment
- Version: [3.2.1]
- JVM Version: [OpenJDK17]
- Hardware Spec: [32c64g]
- OS: [CentOS 7]
Metadata
Metadata
Assignees
Labels
No labels