diff --git a/docs/pages/kotlinx-rpc/topics/0-8-0.topic b/docs/pages/kotlinx-rpc/topics/0-8-0.topic
index 3ae6ff98..b61dfafd 100644
--- a/docs/pages/kotlinx-rpc/topics/0-8-0.topic
+++ b/docs/pages/kotlinx-rpc/topics/0-8-0.topic
@@ -262,12 +262,9 @@
class MyClient(
config: KrpcConfig,
transport: KrpcTransport,
- ) : InitializedKrpcClient(transport, config)
+ ) : InitializedKrpcClient(config, transport)
-
- Notice that the parameter order is reversed in new InitializedKrpcClient
.
-
diff --git a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt
index 6fa3eeba..01b652c6 100644
--- a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt
+++ b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt
@@ -45,8 +45,8 @@ import kotlin.properties.Delegates
* See [KrpcClient.initializeTransport].
*/
public abstract class InitializedKrpcClient(
- private val transport: KrpcTransport,
private val config: KrpcConfig.Client,
+ private val transport: KrpcTransport,
): KrpcClient() {
final override suspend fun initializeTransport(): KrpcTransport {
return transport
@@ -80,6 +80,20 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
*/
protected abstract fun initializeConfig(): KrpcConfig.Client
+ /**
+ * Close this client, removing all the services and stopping accepting messages.
+ */
+ public fun close(message: String? = null) {
+ internalScope.cancel(message ?: "Client closed")
+ }
+
+ /**
+ * Waits until the client is closed.
+ */
+ public suspend fun awaitCompletion() {
+ internalScope.coroutineContext.job.join()
+ }
+
/*
* #####################################################################
* # #
diff --git a/krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts b/krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts
index 89bf807c..6e27be3e 100644
--- a/krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts
+++ b/krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts
@@ -26,10 +26,17 @@ kotlin {
implementation(projects.krpc.krpcSerialization.krpcSerializationJson)
implementation(projects.krpc.krpcKtor.krpcKtorServer)
implementation(projects.krpc.krpcKtor.krpcKtorClient)
+ implementation(projects.krpc.krpcLogging)
implementation(libs.kotlin.test)
implementation(libs.ktor.server.netty)
implementation(libs.ktor.server.test.host)
+ implementation(libs.ktor.server.websockets)
+ implementation(libs.ktor.client.core)
+ implementation(libs.ktor.client.websockets)
+ implementation(libs.ktor.client.cio)
+ implementation(libs.logback.classic)
+ implementation(libs.coroutines.debug)
}
}
}
diff --git a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt
index 0f4fff9e..ff17f5bb 100644
--- a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt
+++ b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt
@@ -6,10 +6,23 @@
package kotlinx.rpc.krpc.ktor
+import io.ktor.client.*
+import io.ktor.client.engine.cio.*
+import io.ktor.client.request.*
+import io.ktor.client.statement.*
import io.ktor.server.application.*
+import io.ktor.server.engine.*
+import io.ktor.server.netty.*
+import io.ktor.server.response.*
+import io.ktor.server.routing.*
import io.ktor.server.testing.*
-import kotlinx.coroutines.cancel
+import kotlinx.coroutines.*
+import kotlinx.coroutines.debug.DebugProbes
+import kotlinx.coroutines.test.runTest
import kotlinx.rpc.annotations.Rpc
+import kotlinx.rpc.krpc.client.KrpcClient
+import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger
+import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer
import kotlinx.rpc.krpc.ktor.client.installKrpc
import kotlinx.rpc.krpc.ktor.client.rpc
import kotlinx.rpc.krpc.ktor.client.rpcConfig
@@ -18,7 +31,15 @@ import kotlinx.rpc.krpc.ktor.server.rpc
import kotlinx.rpc.krpc.serialization.json.json
import kotlinx.rpc.withService
import org.junit.Assert.assertEquals
+import org.junit.platform.commons.logging.Logger
+import org.junit.platform.commons.logging.LoggerFactory
+import java.net.ServerSocket
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+import kotlin.coroutines.cancellation.CancellationException
+import kotlin.test.Ignore
import kotlin.test.Test
+import kotlin.time.Duration.Companion.seconds
@Rpc
interface NewService {
@@ -35,6 +56,23 @@ class NewServiceImpl(
}
}
+@Rpc
+interface SlowService {
+ suspend fun verySlow(): String
+}
+
+class SlowServiceImpl : SlowService {
+ val received = CompletableDeferred()
+
+ override suspend fun verySlow(): String {
+ received.complete(Unit)
+
+ delay(Int.MAX_VALUE.toLong())
+
+ error("Must not be called")
+ }
+}
+
class KtorTransportTest {
@Test
fun testEcho() = testApplication {
@@ -96,4 +134,130 @@ class KtorTransportTest {
clientWithNoConfig.cancel()
}
+
+ @OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
+ @Test
+ @Ignore("Wait for Ktor fix (https://github.com/ktorio/ktor/pull/4927) or apply workaround if rejected")
+ fun testEndpointsTerminateWhenWsDoes() = runTest(timeout = 15.seconds) {
+ DebugProbes.install()
+
+ val logger = setupLogger()
+
+ val port: Int = findFreePort()
+
+ val newPool = Executors.newCachedThreadPool().asCoroutineDispatcher()
+
+ val serverReady = CompletableDeferred()
+ val dropServer = CompletableDeferred()
+
+ val service = SlowServiceImpl()
+
+ val serverJob = GlobalScope.launch(CoroutineName("server")) {
+ withContext(newPool) {
+ val server = embeddedServer(
+ factory = Netty,
+ port = port,
+ parentCoroutineContext = newPool,
+ ) {
+ install(Krpc)
+
+ routing {
+ get {
+ call.respondText("hello")
+ }
+
+ rpc("/rpc") {
+ rpcConfig {
+ serialization {
+ json()
+ }
+ }
+
+ registerService { service }
+ }
+ }
+ }.start(wait = false)
+
+ serverReady.complete(Unit)
+
+ dropServer.await()
+
+ server.stop(shutdownGracePeriod = 100L, shutdownTimeout = 100L, timeUnit = TimeUnit.MILLISECONDS)
+ }
+
+ logger.info { "Server stopped" }
+ }
+
+ val ktorClient = HttpClient(CIO) {
+ installKrpc {
+ serialization {
+ json()
+ }
+ }
+ }
+
+ serverReady.await()
+
+ assertEquals("hello", ktorClient.get("http://0.0.0.0:$port").bodyAsText())
+
+ val rpcClient = ktorClient.rpc("ws://0.0.0.0:$port/rpc")
+
+ launch {
+ try {
+ rpcClient.withService().verySlow()
+ error("Must not be called")
+ } catch (_: CancellationException) {
+ logger.info { "Cancellation exception caught for RPC request" }
+ ensureActive()
+ }
+ }
+
+ service.received.await()
+
+ logger.info { "Received RPC request" }
+
+ dropServer.complete(Unit)
+
+ logger.info { "Waiting for RPC client to complete" }
+
+ (rpcClient as KrpcClient).awaitCompletion()
+
+ logger.info { "RPC client completed" }
+
+ ktorClient.close()
+ newPool.close()
+
+ serverJob.cancel()
+ }
+
+ private fun findFreePort(): Int {
+ val port: Int
+ while (true) {
+ val socket = try {
+ ServerSocket(0)
+ } catch (_: Throwable) {
+ continue
+ }
+
+ port = socket.localPort
+ socket.close()
+ break
+ }
+ return port
+ }
+
+ private fun setupLogger(): Logger {
+ val logger = LoggerFactory.getLogger(KtorTransportTest::class.java)
+
+ RpcInternalDumpLoggerContainer.set(object : RpcInternalDumpLogger {
+
+ override val isEnabled: Boolean = true
+
+ override fun dump(vararg tags: String, message: () -> String) {
+ logger.info { "[${tags.joinToString()}] ${message()}" }
+ }
+ })
+
+ return logger
+ }
}
diff --git a/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/resources/logback.xml b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/resources/logback.xml
new file mode 100644
index 00000000..5ccb239b
--- /dev/null
+++ b/krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/resources/logback.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+ %d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt
index 9b00e88a..898f3a26 100644
--- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt
+++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt
@@ -37,6 +37,20 @@ public abstract class KrpcServer(
transport: KrpcTransport,
) : RpcServer, KrpcEndpoint {
+ /**
+ * Close this server, removing all the services and stopping accepting messages.
+ */
+ public fun close(message: String? = null) {
+ internalScope.cancel(message ?: "Server closed")
+ }
+
+ /**
+ * Waits until the server is closed.
+ */
+ public suspend fun awaitCompletion() {
+ internalScope.coroutineContext.job.join()
+ }
+
/*
* #####################################################################
* # #
@@ -119,6 +133,7 @@ public abstract class KrpcServer(
override fun <@Rpc Service : Any> deregisterService(serviceKClass: KClass) {
connector.unsubscribeFromServiceMessages(serviceDescriptorOf(serviceKClass).fqName)
+ rpcServices.remove(serviceDescriptorOf(serviceKClass).fqName)
}
private fun <@Rpc Service : Any> createNewServiceInstance(
diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt
index d266e211..6ac3397b 100644
--- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt
+++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt
@@ -21,6 +21,8 @@ interface CancellationService {
suspend fun outgoingStream(stream: Flow)
+ suspend fun outgoingStreamAsync(stream: Flow)
+
suspend fun outgoingStreamWithDelayedResponse(stream: Flow)
suspend fun outgoingStreamWithException(stream: Flow)
@@ -57,6 +59,14 @@ class CancellationServiceImpl : CancellationService {
consume(stream)
}
+ @OptIn(DelicateCoroutinesApi::class)
+ override suspend fun outgoingStreamAsync(stream: Flow) {
+ GlobalScope.launch {
+ consume(stream)
+ }
+ firstIncomingConsumed.await()
+ }
+
override suspend fun outgoingStreamWithDelayedResponse(stream: Flow) {
consume(stream)
diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt
index 17d15d78..dcf35615 100644
--- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt
+++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt
@@ -76,7 +76,7 @@ class CancellationTest {
}
unskippableDelay(150) // wait for requests to reach server
- client.internalScope.cancel()
+ client.close()
firstRequestJob.join()
secondRequestJob.join()
@@ -85,8 +85,8 @@ class CancellationTest {
assertEquals(0, serverInstances.sumOf { it.delayCounter.value }, "Expected no requests to succeed")
- client.internalScope.join()
- server.internalScope.join()
+ client.awaitCompletion()
+ server.awaitCompletion()
checkAlive(clientAlive = false, serverAlive = false)
stopAllAndJoin()
@@ -105,7 +105,7 @@ class CancellationTest {
}
unskippableDelay(150) // wait for requests to reach server
- server.internalScope.cancel()
+ server.close()
firstRequestJob.join()
secondRequestJob.join()
@@ -114,8 +114,8 @@ class CancellationTest {
assertEquals(0, serverInstances.sumOf { it.delayCounter.value }, "Expected no requests to succeed")
- client.internalScope.join()
- server.internalScope.join()
+ client.awaitCompletion()
+ server.awaitCompletion()
checkAlive(clientAlive = false, serverAlive = false)
stopAllAndJoin()
@@ -141,6 +141,18 @@ class CancellationTest {
stopAllAndJoin()
}
+ @Test
+ fun testOutgoingFlowLifetime() = runCancellationTest {
+ val fence = CompletableDeferred()
+
+ service.outgoingStreamAsync(resumableFlow(fence))
+
+ serverInstance().consumedAll.await()
+ assertContentEquals(listOf(0), serverInstance().consumedIncomingValues)
+
+ stopAllAndJoin()
+ }
+
@Test
fun testStreamIncoming() = runCancellationTest {
var first: Int = -1
@@ -220,8 +232,8 @@ class CancellationTest {
serverInstance().firstIncomingConsumed.await()
- client.internalScope.cancel("Test request cancelled")
- client.internalScope.join()
+ client.close("Test request cancelled")
+ client.awaitCompletion()
serverInstance().consumedAll.await()
@@ -239,7 +251,7 @@ class CancellationTest {
caught = it
}.collect {
if (it == 0) {
- client.internalScope.cancel()
+ client.close()
} else {
fail("Expected the request to fail with cancellation of the client")
}
@@ -254,7 +266,7 @@ class CancellationTest {
fun testCancelledClientCancelsRequest() = runCancellationTest {
launch {
serverInstance().firstIncomingConsumed.await()
- client.internalScope.cancel("Cancelled by test")
+ client.close("Cancelled by test")
}
try {
@@ -347,8 +359,5 @@ class CancellationTest {
private val CoroutineScope.isCompleted get() = coroutineContext.job.isCompleted
private val CoroutineScope.isCancelled get() = coroutineContext.job.isCancelled
- @Suppress("SuspendFunctionOnCoroutineScope")
- private suspend fun CoroutineScope.join() = apply { coroutineContext.job.join() }
-
private suspend fun CancellationToolkit.stopAllAndJoin() = transport.coroutineContext.job.cancelAndJoin()
}
diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt
index 244d4c47..187df520 100644
--- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt
+++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt
@@ -26,8 +26,6 @@ fun runCancellationTest(body: suspend CancellationToolkit.() -> Unit): TestResul
return runTest(timeout = 15.seconds) {
debugCoroutines()
CancellationToolkit(this).apply {
- client.initializeTransport()
-
body()
}
}