From 07887738d39d1229a97b485358508bac05e37b8c Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Wed, 11 Jun 2025 13:32:18 +0200 Subject: [PATCH 1/2] Various small fixes --- docs/pages/kotlinx-rpc/topics/0-8-0.topic | 5 +-- .../kotlinx/rpc/krpc/client/KrpcClient.kt | 16 ++++++++- .../kotlinx/rpc/krpc/server/KrpcServer.kt | 15 ++++++++ .../test/cancellation/CancellationService.kt | 10 ++++++ .../test/cancellation/CancellationTest.kt | 35 ++++++++++++------- .../test/cancellation/CancellationToolkit.kt | 2 -- 6 files changed, 63 insertions(+), 20 deletions(-) diff --git a/docs/pages/kotlinx-rpc/topics/0-8-0.topic b/docs/pages/kotlinx-rpc/topics/0-8-0.topic index 3ae6ff98..b61dfafd 100644 --- a/docs/pages/kotlinx-rpc/topics/0-8-0.topic +++ b/docs/pages/kotlinx-rpc/topics/0-8-0.topic @@ -262,12 +262,9 @@ class MyClient( config: KrpcConfig, transport: KrpcTransport, - ) : InitializedKrpcClient(transport, config) + ) : InitializedKrpcClient(config, transport) - - Notice that the parameter order is reversed in new InitializedKrpcClient. - diff --git a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt index 6fa3eeba..01b652c6 100644 --- a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt +++ b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt @@ -45,8 +45,8 @@ import kotlin.properties.Delegates * See [KrpcClient.initializeTransport]. */ public abstract class InitializedKrpcClient( - private val transport: KrpcTransport, private val config: KrpcConfig.Client, + private val transport: KrpcTransport, ): KrpcClient() { final override suspend fun initializeTransport(): KrpcTransport { return transport @@ -80,6 +80,20 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint { */ protected abstract fun initializeConfig(): KrpcConfig.Client + /** + * Close this client, removing all the services and stopping accepting messages. + */ + public fun close(message: String? = null) { + internalScope.cancel(message ?: "Client closed") + } + + /** + * Waits until the client is closed. + */ + public suspend fun awaitCompletion() { + internalScope.coroutineContext.job.join() + } + /* * ##################################################################### * # # diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt index 9b00e88a..898f3a26 100644 --- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt +++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt @@ -37,6 +37,20 @@ public abstract class KrpcServer( transport: KrpcTransport, ) : RpcServer, KrpcEndpoint { + /** + * Close this server, removing all the services and stopping accepting messages. + */ + public fun close(message: String? = null) { + internalScope.cancel(message ?: "Server closed") + } + + /** + * Waits until the server is closed. + */ + public suspend fun awaitCompletion() { + internalScope.coroutineContext.job.join() + } + /* * ##################################################################### * # # @@ -119,6 +133,7 @@ public abstract class KrpcServer( override fun <@Rpc Service : Any> deregisterService(serviceKClass: KClass) { connector.unsubscribeFromServiceMessages(serviceDescriptorOf(serviceKClass).fqName) + rpcServices.remove(serviceDescriptorOf(serviceKClass).fqName) } private fun <@Rpc Service : Any> createNewServiceInstance( diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt index d266e211..6ac3397b 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt @@ -21,6 +21,8 @@ interface CancellationService { suspend fun outgoingStream(stream: Flow) + suspend fun outgoingStreamAsync(stream: Flow) + suspend fun outgoingStreamWithDelayedResponse(stream: Flow) suspend fun outgoingStreamWithException(stream: Flow) @@ -57,6 +59,14 @@ class CancellationServiceImpl : CancellationService { consume(stream) } + @OptIn(DelicateCoroutinesApi::class) + override suspend fun outgoingStreamAsync(stream: Flow) { + GlobalScope.launch { + consume(stream) + } + firstIncomingConsumed.await() + } + override suspend fun outgoingStreamWithDelayedResponse(stream: Flow) { consume(stream) diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt index 17d15d78..dcf35615 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt @@ -76,7 +76,7 @@ class CancellationTest { } unskippableDelay(150) // wait for requests to reach server - client.internalScope.cancel() + client.close() firstRequestJob.join() secondRequestJob.join() @@ -85,8 +85,8 @@ class CancellationTest { assertEquals(0, serverInstances.sumOf { it.delayCounter.value }, "Expected no requests to succeed") - client.internalScope.join() - server.internalScope.join() + client.awaitCompletion() + server.awaitCompletion() checkAlive(clientAlive = false, serverAlive = false) stopAllAndJoin() @@ -105,7 +105,7 @@ class CancellationTest { } unskippableDelay(150) // wait for requests to reach server - server.internalScope.cancel() + server.close() firstRequestJob.join() secondRequestJob.join() @@ -114,8 +114,8 @@ class CancellationTest { assertEquals(0, serverInstances.sumOf { it.delayCounter.value }, "Expected no requests to succeed") - client.internalScope.join() - server.internalScope.join() + client.awaitCompletion() + server.awaitCompletion() checkAlive(clientAlive = false, serverAlive = false) stopAllAndJoin() @@ -141,6 +141,18 @@ class CancellationTest { stopAllAndJoin() } + @Test + fun testOutgoingFlowLifetime() = runCancellationTest { + val fence = CompletableDeferred() + + service.outgoingStreamAsync(resumableFlow(fence)) + + serverInstance().consumedAll.await() + assertContentEquals(listOf(0), serverInstance().consumedIncomingValues) + + stopAllAndJoin() + } + @Test fun testStreamIncoming() = runCancellationTest { var first: Int = -1 @@ -220,8 +232,8 @@ class CancellationTest { serverInstance().firstIncomingConsumed.await() - client.internalScope.cancel("Test request cancelled") - client.internalScope.join() + client.close("Test request cancelled") + client.awaitCompletion() serverInstance().consumedAll.await() @@ -239,7 +251,7 @@ class CancellationTest { caught = it }.collect { if (it == 0) { - client.internalScope.cancel() + client.close() } else { fail("Expected the request to fail with cancellation of the client") } @@ -254,7 +266,7 @@ class CancellationTest { fun testCancelledClientCancelsRequest() = runCancellationTest { launch { serverInstance().firstIncomingConsumed.await() - client.internalScope.cancel("Cancelled by test") + client.close("Cancelled by test") } try { @@ -347,8 +359,5 @@ class CancellationTest { private val CoroutineScope.isCompleted get() = coroutineContext.job.isCompleted private val CoroutineScope.isCancelled get() = coroutineContext.job.isCancelled - @Suppress("SuspendFunctionOnCoroutineScope") - private suspend fun CoroutineScope.join() = apply { coroutineContext.job.join() } - private suspend fun CancellationToolkit.stopAllAndJoin() = transport.coroutineContext.job.cancelAndJoin() } diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt index 244d4c47..187df520 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt @@ -26,8 +26,6 @@ fun runCancellationTest(body: suspend CancellationToolkit.() -> Unit): TestResul return runTest(timeout = 15.seconds) { debugCoroutines() CancellationToolkit(this).apply { - client.initializeTransport() - body() } } From 5d1590e0b96228b4d6b7c2470d4995f6779fbdcd Mon Sep 17 00:00:00 2001 From: Alexander Sysoev Date: Wed, 11 Jun 2025 19:46:52 +0200 Subject: [PATCH 2/2] Added test for KTOR-7234 Fix WS session closure --- .../krpc-ktor/krpc-ktor-core/build.gradle.kts | 7 + .../rpc/krpc/ktor/KtorTransportTest.kt | 166 +++++++++++++++++- .../src/jvmTest/resources/logback.xml | 16 ++ 3 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/resources/logback.xml diff --git a/krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts b/krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts index 89bf807c..6e27be3e 100644 --- a/krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts +++ b/krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts @@ -26,10 +26,17 @@ kotlin { implementation(projects.krpc.krpcSerialization.krpcSerializationJson) implementation(projects.krpc.krpcKtor.krpcKtorServer) implementation(projects.krpc.krpcKtor.krpcKtorClient) + implementation(projects.krpc.krpcLogging) implementation(libs.kotlin.test) implementation(libs.ktor.server.netty) implementation(libs.ktor.server.test.host) + implementation(libs.ktor.server.websockets) + implementation(libs.ktor.client.core) + implementation(libs.ktor.client.websockets) + implementation(libs.ktor.client.cio) + implementation(libs.logback.classic) + implementation(libs.coroutines.debug) } } } diff --git a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt index 0f4fff9e..ff17f5bb 100644 --- a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt +++ b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt @@ -6,10 +6,23 @@ package kotlinx.rpc.krpc.ktor +import io.ktor.client.* +import io.ktor.client.engine.cio.* +import io.ktor.client.request.* +import io.ktor.client.statement.* import io.ktor.server.application.* +import io.ktor.server.engine.* +import io.ktor.server.netty.* +import io.ktor.server.response.* +import io.ktor.server.routing.* import io.ktor.server.testing.* -import kotlinx.coroutines.cancel +import kotlinx.coroutines.* +import kotlinx.coroutines.debug.DebugProbes +import kotlinx.coroutines.test.runTest import kotlinx.rpc.annotations.Rpc +import kotlinx.rpc.krpc.client.KrpcClient +import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger +import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer import kotlinx.rpc.krpc.ktor.client.installKrpc import kotlinx.rpc.krpc.ktor.client.rpc import kotlinx.rpc.krpc.ktor.client.rpcConfig @@ -18,7 +31,15 @@ import kotlinx.rpc.krpc.ktor.server.rpc import kotlinx.rpc.krpc.serialization.json.json import kotlinx.rpc.withService import org.junit.Assert.assertEquals +import org.junit.platform.commons.logging.Logger +import org.junit.platform.commons.logging.LoggerFactory +import java.net.ServerSocket +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import kotlin.coroutines.cancellation.CancellationException +import kotlin.test.Ignore import kotlin.test.Test +import kotlin.time.Duration.Companion.seconds @Rpc interface NewService { @@ -35,6 +56,23 @@ class NewServiceImpl( } } +@Rpc +interface SlowService { + suspend fun verySlow(): String +} + +class SlowServiceImpl : SlowService { + val received = CompletableDeferred() + + override suspend fun verySlow(): String { + received.complete(Unit) + + delay(Int.MAX_VALUE.toLong()) + + error("Must not be called") + } +} + class KtorTransportTest { @Test fun testEcho() = testApplication { @@ -96,4 +134,130 @@ class KtorTransportTest { clientWithNoConfig.cancel() } + + @OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class) + @Test + @Ignore("Wait for Ktor fix (https://github.com/ktorio/ktor/pull/4927) or apply workaround if rejected") + fun testEndpointsTerminateWhenWsDoes() = runTest(timeout = 15.seconds) { + DebugProbes.install() + + val logger = setupLogger() + + val port: Int = findFreePort() + + val newPool = Executors.newCachedThreadPool().asCoroutineDispatcher() + + val serverReady = CompletableDeferred() + val dropServer = CompletableDeferred() + + val service = SlowServiceImpl() + + val serverJob = GlobalScope.launch(CoroutineName("server")) { + withContext(newPool) { + val server = embeddedServer( + factory = Netty, + port = port, + parentCoroutineContext = newPool, + ) { + install(Krpc) + + routing { + get { + call.respondText("hello") + } + + rpc("/rpc") { + rpcConfig { + serialization { + json() + } + } + + registerService { service } + } + } + }.start(wait = false) + + serverReady.complete(Unit) + + dropServer.await() + + server.stop(shutdownGracePeriod = 100L, shutdownTimeout = 100L, timeUnit = TimeUnit.MILLISECONDS) + } + + logger.info { "Server stopped" } + } + + val ktorClient = HttpClient(CIO) { + installKrpc { + serialization { + json() + } + } + } + + serverReady.await() + + assertEquals("hello", ktorClient.get("http://0.0.0.0:$port").bodyAsText()) + + val rpcClient = ktorClient.rpc("ws://0.0.0.0:$port/rpc") + + launch { + try { + rpcClient.withService().verySlow() + error("Must not be called") + } catch (_: CancellationException) { + logger.info { "Cancellation exception caught for RPC request" } + ensureActive() + } + } + + service.received.await() + + logger.info { "Received RPC request" } + + dropServer.complete(Unit) + + logger.info { "Waiting for RPC client to complete" } + + (rpcClient as KrpcClient).awaitCompletion() + + logger.info { "RPC client completed" } + + ktorClient.close() + newPool.close() + + serverJob.cancel() + } + + private fun findFreePort(): Int { + val port: Int + while (true) { + val socket = try { + ServerSocket(0) + } catch (_: Throwable) { + continue + } + + port = socket.localPort + socket.close() + break + } + return port + } + + private fun setupLogger(): Logger { + val logger = LoggerFactory.getLogger(KtorTransportTest::class.java) + + RpcInternalDumpLoggerContainer.set(object : RpcInternalDumpLogger { + + override val isEnabled: Boolean = true + + override fun dump(vararg tags: String, message: () -> String) { + logger.info { "[${tags.joinToString()}] ${message()}" } + } + }) + + return logger + } } diff --git a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/resources/logback.xml b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/resources/logback.xml new file mode 100644 index 00000000..5ccb239b --- /dev/null +++ b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + +