Skip to content

RejectedExecutionException in case if the message processing fails after topic reader session shutdown #513

@Eistern

Description

@Eistern

We have encountered an edge case that results in ERROR logs, as shown below:

2025-06-30T11:30:33,605+07:00 [pool-7-thread-1] ERROR t.y.t.r.i.PartitionSessionImpl:   : [fMYZBI.1/1-p0] DataReceivedEvent callback with 1 message(s) (offsets 0-0) finished with error: 
java.util.concurrent.CompletionException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncRun@7585eae rejected from java.util.concurrent.ThreadPoolExecutor@7fef0b40[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 4]
	at java.base/java.util.concurrent.CompletableFuture.wrapInCompletionException(CompletableFuture.java:323)
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:359)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:364)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1851)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1840)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1460)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2036)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:189)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncRun@7585eae rejected from java.util.concurrent.ThreadPoolExecutor@7fef0b40[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 4]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2082)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1377)
	at java.base/java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1862)
	at java.base/java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:2077)
	at tech.ydb.topic.read.impl.AsyncReaderImpl.handleReaderClosed(AsyncReaderImpl.java:169)
	at tech.ydb.topic.read.impl.AsyncReaderImpl.onShutdown(AsyncReaderImpl.java:182)
	at tech.ydb.topic.impl.GrpcStreamRetrier.lambda$shutdownImpl$1(GrpcStreamRetrier.java:120)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1848)
	... 5 more

Impact:

These errors can be misleading — particularly when user logic attempts to reopen reader sessions. Users may confuse these logs with ones from subsequent sessions, and it is unexpected to see error logs after all created resources have been properly closed.

Minimal reproduction example:

  • YDB_CONTAINER is started with testcontainers outside of the test, however you may use any suitable GrpcTransport
  • Topic with topicPath also initialized outside of the test scope
    @Test
    void testConstantKeepAlive() throws InterruptedException {
        // initialize test topic with some data (each string will be sent as the separate message)
        writeIntoTopic(List.of("first", "second"));

        try (GrpcTransport transport = YDB_CONTAINER.getTransport();
             TopicClient topicClient = TopicClient.newClient(transport).build()
        ) {
            // start new executorService only for handling data events
            ExecutorService executorService = Executors.newCachedThreadPool();

            // use the semaphore to guarantee runtime exception after reader shutdown
            Semaphore semaphore = new Semaphore(0);

            AsyncReader asyncReader = topicClient.createAsyncReader(
                    ReaderSettings.newBuilder()
                            .addTopic(TopicReadSettings.newBuilder()
                                    .setPath(topicPath)
                                    .build())
                            .setConsumerName(consumerName)
                            .build(),
                    ReadEventHandlersSettings.newBuilder()
                            .setEventHandler(_ -> {
                                semaphore.acquireUninterruptibly();
                                throw new RuntimeException();
                            })
                            .setExecutor(executorService)
                            .build()
            );

            asyncReader.init().join();

            // wait for some time to allow read session to be properly initialized
            Thread.sleep(1000);

            asyncReader.shutdown().join();

            // shutdown the executor service after the user code finished shutting down the actual reader
            executorService.shutdown();

            // release the semaphore and trigger exceptions inside of the hanging executorService thread
            semaphore.release();

            // wait for some time for the global FJP to run onShutdown() callback from GrpcStreamRetrier.java:124
            Thread.sleep(1000);
        }
    }

Proposed solution:

It appears that AsyncReaderImpl should keep track of whether it has already been closed in order to properly handle a double-close scenario

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions