Skip to content

heap memory leak because the ConcurrentLinkedQueue in Batcher is unbounded and filled #181

@masterOcean

Description

@masterOcean

⚠️ heap memory leak because the ConcurrentLinkedQueue in Batcher is unbounded and filled

Describe the bug

Image Image Image the batcher_call_sched metric is normal, but batcher_call_time metric is zero, because the dist req is pushed into the callTaskBuffers, but in batchAndEmit() not polled. I suppose the grpc server did not response for a long time, then pipelineDepth.getAndDecrement() do not execute, so the pipelineDepth.get() < maxPipelineDepth.get() condition in trigger() can not be satisfied, so the dist req is pushed and not polled. then the heap is filled by the dist req.
   public final CompletableFuture<CallResultT> submit(BatcherKeyT batcherKey, CallT request) {
        if (avgLatencyNanos.estimate() < burstLatencyNanos) {
            ICallTask<CallT, CallResultT, BatcherKeyT> callTask = new CallTask<>(batcherKey, request);
            boolean offered = callTaskBuffers.offer(callTask);
            assert offered;
            trigger();
            return callTask.resultPromise();
        } else {
            dropCounter.increment();
            return CompletableFuture.failedFuture(new BackPressureException("Too high average latency"));
        }
    }

    private void trigger() {
        if (triggering.compareAndSet(false, true)) {
            try {
                if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < maxPipelineDepth.get()) {
                    batchAndEmit();
                }
            } catch (Throwable e) {
                log.error("Unexpected exception", e);
            } finally {
                triggering.set(false);
                if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < maxPipelineDepth.get()) {
                    trigger();
                }
            }
        }
    }

 private void batchAndEmit() {
        pipelineDepth.incrementAndGet();
        long buildStart = System.nanoTime();
        IBatchCall<CallT, CallResultT, BatcherKeyT> batchCall = batchPool.poll();
        assert batchCall != null;
        int batchSize = 0;
        LinkedList<ICallTask<CallT, CallResultT, BatcherKeyT>> batchedTasks = new LinkedList<>();
        ICallTask<CallT, CallResultT, BatcherKeyT> callTask;
        while (batchSize < maxBatchSize && (callTask = callTaskBuffers.poll()) != null) {
            batchCall.add(callTask);
            batchedTasks.add(callTask);
            batchSize++;
            queueingTimeSummary.record(System.nanoTime() - callTask.ts());
        }
        batchSizeSummary.record(batchSize);
        long execStart = System.nanoTime();
        batchBuildTimeSummary.record((execStart - buildStart));
        final int finalBatchSize = batchSize;
        batchCall.execute()
            .whenComplete((v, e) -> {
                long execEnd = System.nanoTime();
                if (e != null) {
                    log.error("Unexpected exception during handling batchcall result", e);
                    // reset max batch size
                    maxBatchSize = 1;
                } else {
                    long thisLatency = execEnd - execStart;
                    if (thisLatency > 0) {
                        updateMaxBatchSize(finalBatchSize, thisLatency);
                    }
                    batchExecTimer.record(thisLatency, TimeUnit.NANOSECONDS);
                }
                batchedTasks.forEach(t -> {
                    long callLatency = execEnd - t.ts();
                    avgLatencyNanos.observe(callLatency);
                    batchCallTimer.record(callLatency, TimeUnit.NANOSECONDS);
                });
                batchCall.reset();
                batchPool.offer(batchCall);
                pipelineDepth.getAndDecrement();
                if (!callTaskBuffers.isEmpty()) {
                    trigger();
                }
            });
    }

Environment

  • Version: [3.2.1]
  • JVM Version: [OpenJDK17]
  • Hardware Spec: [32c64g]
  • OS: [CentOS 7]

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions