-
Notifications
You must be signed in to change notification settings - Fork 29
Open
Description
We have encountered an edge case that results in ERROR logs, as shown below:
2025-06-30T11:30:33,605+07:00 [pool-7-thread-1] ERROR t.y.t.r.i.PartitionSessionImpl: : [fMYZBI.1/1-p0] DataReceivedEvent callback with 1 message(s) (offsets 0-0) finished with error:
java.util.concurrent.CompletionException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncRun@7585eae rejected from java.util.concurrent.ThreadPoolExecutor@7fef0b40[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 4]
at java.base/java.util.concurrent.CompletableFuture.wrapInCompletionException(CompletableFuture.java:323)
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:359)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:364)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1851)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1840)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1460)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2036)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:189)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncRun@7585eae rejected from java.util.concurrent.ThreadPoolExecutor@7fef0b40[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 4]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2082)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1377)
at java.base/java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1862)
at java.base/java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:2077)
at tech.ydb.topic.read.impl.AsyncReaderImpl.handleReaderClosed(AsyncReaderImpl.java:169)
at tech.ydb.topic.read.impl.AsyncReaderImpl.onShutdown(AsyncReaderImpl.java:182)
at tech.ydb.topic.impl.GrpcStreamRetrier.lambda$shutdownImpl$1(GrpcStreamRetrier.java:120)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1848)
... 5 more
Impact:
These errors can be misleading — particularly when user logic attempts to reopen reader sessions. Users may confuse these logs with ones from subsequent sessions, and it is unexpected to see error logs after all created resources have been properly closed.
Minimal reproduction example:
- YDB_CONTAINER is started with testcontainers outside of the test, however you may use any suitable GrpcTransport
- Topic with topicPath also initialized outside of the test scope
@Test
void testConstantKeepAlive() throws InterruptedException {
// initialize test topic with some data (each string will be sent as the separate message)
writeIntoTopic(List.of("first", "second"));
try (GrpcTransport transport = YDB_CONTAINER.getTransport();
TopicClient topicClient = TopicClient.newClient(transport).build()
) {
// start new executorService only for handling data events
ExecutorService executorService = Executors.newCachedThreadPool();
// use the semaphore to guarantee runtime exception after reader shutdown
Semaphore semaphore = new Semaphore(0);
AsyncReader asyncReader = topicClient.createAsyncReader(
ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder()
.setPath(topicPath)
.build())
.setConsumerName(consumerName)
.build(),
ReadEventHandlersSettings.newBuilder()
.setEventHandler(_ -> {
semaphore.acquireUninterruptibly();
throw new RuntimeException();
})
.setExecutor(executorService)
.build()
);
asyncReader.init().join();
// wait for some time to allow read session to be properly initialized
Thread.sleep(1000);
asyncReader.shutdown().join();
// shutdown the executor service after the user code finished shutting down the actual reader
executorService.shutdown();
// release the semaphore and trigger exceptions inside of the hanging executorService thread
semaphore.release();
// wait for some time for the global FJP to run onShutdown() callback from GrpcStreamRetrier.java:124
Thread.sleep(1000);
}
}
Proposed solution:
It appears that AsyncReaderImpl
should keep track of whether it has already been closed in order to properly handle a double-close scenario
Metadata
Metadata
Assignees
Labels
No labels