Skip to content

Cold Ktor client #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion krpc/krpc-client/api/krpc-client.api
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/RpcClient, kotlinx/rpc/krpc/internal/KrpcEndpoint {
public abstract class kotlinx/rpc/krpc/client/InitializedKrpcClient : kotlinx/rpc/krpc/client/KrpcClient {
public fun <init> (Lkotlinx/rpc/krpc/KrpcConfig$Client;Lkotlinx/rpc/krpc/KrpcTransport;)V
public final fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
public final fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/RpcClient, kotlinx/rpc/krpc/internal/KrpcEndpoint {
public fun <init> ()V
public final fun call (Lkotlinx/rpc/RpcCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun callServerStreaming (Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/flow/Flow;
public abstract fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
public abstract fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import kotlinx.serialization.modules.SerializersModule
import kotlin.collections.first
import kotlin.concurrent.Volatile
import kotlin.coroutines.cancellation.CancellationException
import kotlin.properties.Delegates

@Deprecated("Use KrpcClient instead", ReplaceWith("KrpcClient"), level = DeprecationLevel.ERROR)
public typealias KRPCClient = KrpcClient
Expand All @@ -56,15 +57,61 @@ public typealias KRPCClient = KrpcClient
* IMPORTANT: Must be exclusive to this client, otherwise unexpected behavior may occur.
*/
@OptIn(InternalCoroutinesApi::class)
public abstract class KrpcClient(
private val config: KrpcConfig.Client,
transport: KrpcTransport,
) : RpcClient, KrpcEndpoint {
public abstract class KrpcClient : RpcClient, KrpcEndpoint {
/**
* Called once to provide [KrpcTransport] for this client
*/
public abstract suspend fun initializeTransport(): KrpcTransport

private var isTransportReady: Boolean = false
private var transport: KrpcTransport by Delegates.notNull()

/**
* Called once to provide [KrpcConfig.Client] for this client
*/
public abstract fun initializeConfig(): KrpcConfig.Client

private val config: KrpcConfig.Client by lazy {
initializeConfig()
}

@Volatile
private var clientCancelled = false

private fun checkTransportReadiness() {
if (!isTransportReady) {
error(
"Internal error, please contact developers for the support. " +
"Transport is not initialized, first scope access must come from an RPC request."
)
}
}

@InternalRpcApi
public val internalScope: CoroutineScope = CoroutineScope(SupervisorJob(transport.coroutineContext.job))
public val internalScope: CoroutineScope by lazy {
checkTransportReadiness()

val context = SupervisorJob(transport.coroutineContext.job)

context.job.invokeOnCompletion(onCancelling = true) {
clientCancelled = true

sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)

@OptIn(DelicateCoroutinesApi::class)
GlobalScope.launch {
requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) }
requestChannels.clear()
}
}

CoroutineScope(context)
}

// we make a child here, so we can send cancellation messages before closing the connection
private val connector by lazy {
checkTransportReadiness()

KrpcClientConnector(config.serialFormatInitializer.build(), transport, config.waitForServices)
}

Expand All @@ -89,36 +136,19 @@ public abstract class KrpcClient(
// callId to serviceTypeString
private val cancellingRequests = RpcInternalConcurrentHashMap<String, String>()

@Volatile
private var clientCancelled = false

init {
internalScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) {
clientCancelled = true

sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)

requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) }
requestChannels.clear()
}

internalScope.launch(CoroutineName("krpc-client-generic-messages")) {
connector.subscribeToGenericMessages(::handleGenericMessage)
}
}

private val initHandshake: Job = internalScope.launch(CoroutineName("krpc-client-handshake")) {
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))

connector.subscribeToProtocolMessages(::handleProtocolMessage)
}

/**
* Starts the handshake process and awaits for completion.
* If the handshake was completed before, nothing happens.
*/
private suspend fun awaitHandshakeCompletion() {
initHandshake.join()
private suspend fun initializeAndAwaitHandshakeCompletion() {
transport = initializeTransport()
isTransportReady = true

connector.subscribeToGenericMessages(::handleGenericMessage)
connector.subscribeToProtocolMessages(::handleProtocolMessage)

connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))

serverSupportedPlugins.await()
}

Expand Down Expand Up @@ -150,7 +180,11 @@ public abstract class KrpcClient(
@Suppress("detekt.CyclomaticComplexMethod")
final override fun <T> callServerStreaming(call: RpcCall): Flow<T> {
return flow {
awaitHandshakeCompletion()
if (clientCancelled) {
error("Client cancelled")
}

initializeAndAwaitHandshakeCompletion()

val id = callCounter.incrementAndGet()
val callable = call.descriptor.getCallable(call.callableName)
Expand Down Expand Up @@ -446,3 +480,19 @@ public abstract class KrpcClient(
}
}
}

/**
* Represents an initialized RPC client that wraps a predefined configuration and transport.
*/
public abstract class InitializedKrpcClient(
private val config: KrpcConfig.Client,
private val transport: KrpcTransport,
): KrpcClient() {
final override suspend fun initializeTransport(): KrpcTransport {
return transport
}

final override fun initializeConfig(): KrpcConfig.Client {
return config
}
}
10 changes: 5 additions & 5 deletions krpc/krpc-ktor/krpc-ktor-client/api/krpc-ktor-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ public final class kotlinx/rpc/krpc/ktor/client/KrpcKt {
}

public final class kotlinx/rpc/krpc/ktor/client/KtorClientDslKt {
public static final fun rpc (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun rpc (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun rpc (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
public static final fun rpc (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
public static final fun rpcConfig (Lio/ktor/client/request/HttpRequestBuilder;Lkotlin/jvm/functions/Function1;)V
public static synthetic fun rpcConfig$default (Lio/ktor/client/request/HttpRequestBuilder;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
}

public abstract interface class kotlinx/rpc/krpc/ktor/client/KtorRpcClient : kotlinx/rpc/RpcClient {
public abstract fun getWebSocketSession ()Lio/ktor/websocket/WebSocketSession;
public abstract fun getWebSocketSession ()Lkotlinx/coroutines/Deferred;
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import io.ktor.client.request.*
import io.ktor.util.*
import kotlinx.rpc.RpcClient
import kotlinx.rpc.krpc.KrpcConfigBuilder
import kotlinx.rpc.krpc.rpcClientConfig

private val KrpcRequestConfigAttributeKey = AttributeKey<KrpcConfigBuilder.Client.() -> Unit>(
name = "KrpcRequestConfigAttributeKey"
Expand All @@ -36,7 +35,7 @@ public fun HttpRequestBuilder.rpcConfig(configBuilder: KrpcConfigBuilder.Client.
* @param block Optional configuration for the
* @return An instance of [RpcClient] that is configured to send messages to the server.
*/
public suspend fun HttpClient.rpc(
public fun HttpClient.rpc(
urlString: String,
block: HttpRequestBuilder.() -> Unit = {},
): KtorRpcClient {
Expand All @@ -53,24 +52,19 @@ public suspend fun HttpClient.rpc(
* @param block Optional configuration for the
* @return An instance of [RpcClient] that is configured to send messages to the server.
*/
public suspend fun HttpClient.rpc(
public fun HttpClient.rpc(
block: HttpRequestBuilder.() -> Unit = {},
): KtorRpcClient {
pluginOrNull(WebSockets)
?: error("RPC for client requires $WebSockets plugin to be installed firstly")

var requestConfigBuilder: KrpcConfigBuilder.Client.() -> Unit = {}
val session = webSocketSession {
block()
val pluginConfigBuilder = attributes.getOrNull(KrpcClientPluginAttributesKey)

return KtorKrpcClientImpl(pluginConfigBuilder) { configSetter ->
webSocketSession {
block()

attributes.getOrNull(KrpcRequestConfigAttributeKey)?.let {
requestConfigBuilder = it
attributes.getOrNull(KrpcRequestConfigAttributeKey)?.let { configSetter(it) }
}
}

val pluginConfigBuilder = attributes.getOrNull(KrpcClientPluginAttributesKey)
val rpcConfig = pluginConfigBuilder?.apply(requestConfigBuilder)?.build()
?: rpcClientConfig(requestConfigBuilder)

return KtorKrpcClientImpl(session, rpcConfig)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,46 @@
package kotlinx.rpc.krpc.ktor.client

import io.ktor.websocket.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.rpc.RpcClient
import kotlinx.rpc.krpc.KrpcConfig
import kotlinx.rpc.krpc.KrpcConfigBuilder
import kotlinx.rpc.krpc.KrpcTransport
import kotlinx.rpc.krpc.client.KrpcClient
import kotlinx.rpc.krpc.ktor.KtorTransport
import kotlinx.rpc.krpc.rpcClientConfig

/**
* [RpcClient] implementation for Ktor, containing [webSocketSession] object,
* that is used to maintain connection.
*/
public interface KtorRpcClient : RpcClient {
public val webSocketSession: WebSocketSession
public val webSocketSession: Deferred<WebSocketSession>
}

internal class KtorKrpcClientImpl(
override val webSocketSession: WebSocketSession,
config: KrpcConfig.Client,
): KrpcClient(config, KtorTransport(webSocketSession)), KtorRpcClient
private val pluginConfigBuilder: KrpcConfigBuilder.Client?,
private val webSocketSessionFactory: suspend (
configSetter: (KrpcConfigBuilder.Client.() -> Unit) -> Unit,
) -> WebSocketSession,
): KrpcClient(), KtorRpcClient {
private var requestConfigBuilder: KrpcConfigBuilder.Client.() -> Unit = {}

private val _webSocketSession = CompletableDeferred<WebSocketSession>()
override val webSocketSession: Deferred<WebSocketSession> = _webSocketSession

override suspend fun initializeTransport(): KrpcTransport {
val session = webSocketSessionFactory {
requestConfigBuilder = it
}

_webSocketSession.complete(session)
return KtorTransport(session)
}

override fun initializeConfig(): KrpcConfig.Client {
return pluginConfigBuilder?.apply(requestConfigBuilder)?.build()
?: rpcClientConfig(requestConfigBuilder)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kotlinx.rpc.krpc.test

import kotlinx.rpc.krpc.KrpcConfig
import kotlinx.rpc.krpc.KrpcTransport
import kotlinx.rpc.krpc.client.InitializedKrpcClient
import kotlinx.rpc.krpc.client.KrpcClient

/**
Expand All @@ -17,4 +18,4 @@ import kotlinx.rpc.krpc.client.KrpcClient
class KrpcTestClient(
config: KrpcConfig.Client,
transport: KrpcTransport,
) : KrpcClient(config, transport)
) : InitializedKrpcClient(config, transport)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.rpc.krpc.serialization.cbor.cbor
import kotlinx.rpc.krpc.serialization.json.json
import kotlinx.rpc.krpc.serialization.protobuf.protobuf
import kotlinx.serialization.ExperimentalSerializationApi
import kotlin.test.Test

abstract class LocalTransportTest : KrpcTransportTestBase() {
private val transport = LocalTransport()
Expand Down Expand Up @@ -50,11 +51,15 @@ class ProtoBufLocalTransportTest : LocalTransportTest() {
}

// 'null' is not supported in ProtoBuf
@Test
override fun nullable(): TestResult = runTest { }

@Test
override fun testByteArraySerialization(): TestResult = runTest { }

@Test
override fun testNullables(): TestResult = runTest { }

@Test
override fun testNullableLists(): TestResult = runTest { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import kotlinx.coroutines.test.TestScope
import kotlinx.rpc.annotations.Rpc
import kotlinx.rpc.internal.utils.hex.rpcInternalHexToReadableBinary
import kotlinx.rpc.krpc.KrpcConfig
import kotlinx.rpc.krpc.client.KrpcClient
import kotlinx.rpc.krpc.client.InitializedKrpcClient
import kotlinx.rpc.krpc.internal.logging.RpcInternalCommonLogger
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer
Expand Down Expand Up @@ -105,4 +105,4 @@ class ProtocolTestServer(
class ProtocolTestClient(
config: KrpcConfig.Client,
transport: LocalTransport,
) : KrpcClient(config, transport.client)
) : InitializedKrpcClient(config, transport.client)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import kotlin.time.Duration.Companion.seconds
fun runCancellationTest(body: suspend CancellationToolkit.() -> Unit): TestResult {
return runTest(timeout = 15.seconds) {
debugCoroutines()
CancellationToolkit(this).apply { body() }
CancellationToolkit(this).apply {
client.initializeTransport()

body()
}
}
}

Expand Down
Loading