Skip to content

Quick fixes and tests #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions docs/pages/kotlinx-rpc/topics/0-8-0.topic
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,9 @@
class MyClient(
config: KrpcConfig,
transport: KrpcTransport,
) : InitializedKrpcClient(transport, config)
) : InitializedKrpcClient(config, transport)
</code-block>
</compare>
<note>
Notice that the parameter order is reversed in new <code>InitializedKrpcClient</code>.
</note>
</li>
</list>
</chapter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import kotlin.properties.Delegates
* See [KrpcClient.initializeTransport].
*/
public abstract class InitializedKrpcClient(
private val transport: KrpcTransport,
private val config: KrpcConfig.Client,
private val transport: KrpcTransport,
): KrpcClient() {
final override suspend fun initializeTransport(): KrpcTransport {
return transport
Expand Down Expand Up @@ -80,6 +80,20 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
*/
protected abstract fun initializeConfig(): KrpcConfig.Client

/**
* Close this client, removing all the services and stopping accepting messages.
*/
public fun close(message: String? = null) {
internalScope.cancel(message ?: "Client closed")
}

/**
* Waits until the client is closed.
*/
public suspend fun awaitCompletion() {
internalScope.coroutineContext.job.join()
}

/*
* #####################################################################
* # #
Expand Down
7 changes: 7 additions & 0 deletions krpc/krpc-ktor/krpc-ktor-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ kotlin {
implementation(projects.krpc.krpcSerialization.krpcSerializationJson)
implementation(projects.krpc.krpcKtor.krpcKtorServer)
implementation(projects.krpc.krpcKtor.krpcKtorClient)
implementation(projects.krpc.krpcLogging)

implementation(libs.kotlin.test)
implementation(libs.ktor.server.netty)
implementation(libs.ktor.server.test.host)
implementation(libs.ktor.server.websockets)
implementation(libs.ktor.client.core)
implementation(libs.ktor.client.websockets)
implementation(libs.ktor.client.cio)
implementation(libs.logback.classic)
implementation(libs.coroutines.debug)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,23 @@

package kotlinx.rpc.krpc.ktor

import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.server.testing.*
import kotlinx.coroutines.cancel
import kotlinx.coroutines.*
import kotlinx.coroutines.debug.DebugProbes
import kotlinx.coroutines.test.runTest
import kotlinx.rpc.annotations.Rpc
import kotlinx.rpc.krpc.client.KrpcClient
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer
import kotlinx.rpc.krpc.ktor.client.installKrpc
import kotlinx.rpc.krpc.ktor.client.rpc
import kotlinx.rpc.krpc.ktor.client.rpcConfig
Expand All @@ -18,7 +31,15 @@ import kotlinx.rpc.krpc.ktor.server.rpc
import kotlinx.rpc.krpc.serialization.json.json
import kotlinx.rpc.withService
import org.junit.Assert.assertEquals
import org.junit.platform.commons.logging.Logger
import org.junit.platform.commons.logging.LoggerFactory
import java.net.ServerSocket
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.coroutines.cancellation.CancellationException
import kotlin.test.Ignore
import kotlin.test.Test
import kotlin.time.Duration.Companion.seconds

@Rpc
interface NewService {
Expand All @@ -35,6 +56,23 @@ class NewServiceImpl(
}
}

@Rpc
interface SlowService {
suspend fun verySlow(): String
}

class SlowServiceImpl : SlowService {
val received = CompletableDeferred<Unit>()

override suspend fun verySlow(): String {
received.complete(Unit)

delay(Int.MAX_VALUE.toLong())

error("Must not be called")
}
}

class KtorTransportTest {
@Test
fun testEcho() = testApplication {
Expand Down Expand Up @@ -96,4 +134,130 @@ class KtorTransportTest {

clientWithNoConfig.cancel()
}

@OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
@Test
@Ignore("Wait for Ktor fix (https://github.com/ktorio/ktor/pull/4927) or apply workaround if rejected")
fun testEndpointsTerminateWhenWsDoes() = runTest(timeout = 15.seconds) {
DebugProbes.install()

val logger = setupLogger()

val port: Int = findFreePort()

val newPool = Executors.newCachedThreadPool().asCoroutineDispatcher()

val serverReady = CompletableDeferred<Unit>()
val dropServer = CompletableDeferred<Unit>()

val service = SlowServiceImpl()

val serverJob = GlobalScope.launch(CoroutineName("server")) {
withContext(newPool) {
val server = embeddedServer(
factory = Netty,
port = port,
parentCoroutineContext = newPool,
) {
install(Krpc)

routing {
get {
call.respondText("hello")
}

rpc("/rpc") {
rpcConfig {
serialization {
json()
}
}

registerService<SlowService> { service }
}
}
}.start(wait = false)

serverReady.complete(Unit)

dropServer.await()

server.stop(shutdownGracePeriod = 100L, shutdownTimeout = 100L, timeUnit = TimeUnit.MILLISECONDS)
}

logger.info { "Server stopped" }
}

val ktorClient = HttpClient(CIO) {
installKrpc {
serialization {
json()
}
}
}

serverReady.await()

assertEquals("hello", ktorClient.get("http://0.0.0.0:$port").bodyAsText())

val rpcClient = ktorClient.rpc("ws://0.0.0.0:$port/rpc")

launch {
try {
rpcClient.withService<SlowService>().verySlow()
error("Must not be called")
} catch (_: CancellationException) {
logger.info { "Cancellation exception caught for RPC request" }
ensureActive()
}
}

service.received.await()

logger.info { "Received RPC request" }

dropServer.complete(Unit)

logger.info { "Waiting for RPC client to complete" }

(rpcClient as KrpcClient).awaitCompletion()

logger.info { "RPC client completed" }

ktorClient.close()
newPool.close()

serverJob.cancel()
}

private fun findFreePort(): Int {
val port: Int
while (true) {
val socket = try {
ServerSocket(0)
} catch (_: Throwable) {
continue
}

port = socket.localPort
socket.close()
break
}
return port
}

private fun setupLogger(): Logger {
val logger = LoggerFactory.getLogger(KtorTransportTest::class.java)

RpcInternalDumpLoggerContainer.set(object : RpcInternalDumpLogger {

override val isEnabled: Boolean = true

override fun dump(vararg tags: String, message: () -> String) {
logger.info { "[${tags.joinToString()}] ${message()}" }
}
})

return logger
}
}
16 changes: 16 additions & 0 deletions krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<!--
~ Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
-->

<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="trace">
<appender-ref ref="STDOUT"/>
</root>
<logger name="org.eclipse.jetty" level="INFO"/>
<logger name="io.netty" level="TRACE"/>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ public abstract class KrpcServer(
transport: KrpcTransport,
) : RpcServer, KrpcEndpoint {

/**
* Close this server, removing all the services and stopping accepting messages.
*/
public fun close(message: String? = null) {
internalScope.cancel(message ?: "Server closed")
}

/**
* Waits until the server is closed.
*/
public suspend fun awaitCompletion() {
internalScope.coroutineContext.job.join()
}

/*
* #####################################################################
* # #
Expand Down Expand Up @@ -119,6 +133,7 @@ public abstract class KrpcServer(

override fun <@Rpc Service : Any> deregisterService(serviceKClass: KClass<Service>) {
connector.unsubscribeFromServiceMessages(serviceDescriptorOf(serviceKClass).fqName)
rpcServices.remove(serviceDescriptorOf(serviceKClass).fqName)
}

private fun <@Rpc Service : Any> createNewServiceInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ interface CancellationService {

suspend fun outgoingStream(stream: Flow<Int>)

suspend fun outgoingStreamAsync(stream: Flow<Int>)

suspend fun outgoingStreamWithDelayedResponse(stream: Flow<Int>)

suspend fun outgoingStreamWithException(stream: Flow<Int>)
Expand Down Expand Up @@ -57,6 +59,14 @@ class CancellationServiceImpl : CancellationService {
consume(stream)
}

@OptIn(DelicateCoroutinesApi::class)
override suspend fun outgoingStreamAsync(stream: Flow<Int>) {
GlobalScope.launch {
consume(stream)
}
firstIncomingConsumed.await()
}

override suspend fun outgoingStreamWithDelayedResponse(stream: Flow<Int>) {
consume(stream)

Expand Down
Loading