Skip to content

Commit 62b1336

Browse files
committed
KRPC-171 Non-suspending Ktor client creation
1 parent ac827a1 commit 62b1336

File tree

11 files changed

+142
-66
lines changed

11 files changed

+142
-66
lines changed

krpc/krpc-client/api/krpc-client.api

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1-
public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/RpcClient, kotlinx/rpc/krpc/internal/KrpcEndpoint {
1+
public abstract class kotlinx/rpc/krpc/client/InitializedKrpcClient : kotlinx/rpc/krpc/client/KrpcClient {
22
public fun <init> (Lkotlinx/rpc/krpc/KrpcConfig$Client;Lkotlinx/rpc/krpc/KrpcTransport;)V
3+
public final fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
4+
public final fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
5+
}
6+
7+
public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/RpcClient, kotlinx/rpc/krpc/internal/KrpcEndpoint {
8+
public fun <init> ()V
39
public final fun call (Lkotlinx/rpc/RpcCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
410
public final fun callServerStreaming (Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/flow/Flow;
11+
public abstract fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
12+
public abstract fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
513
}
614

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import kotlinx.serialization.modules.SerializersModule
3434
import kotlin.collections.first
3535
import kotlin.concurrent.Volatile
3636
import kotlin.coroutines.cancellation.CancellationException
37+
import kotlin.properties.Delegates
3738

3839
@Deprecated("Use KrpcClient instead", ReplaceWith("KrpcClient"), level = DeprecationLevel.ERROR)
3940
public typealias KRPCClient = KrpcClient
@@ -56,15 +57,58 @@ public typealias KRPCClient = KrpcClient
5657
* IMPORTANT: Must be exclusive to this client, otherwise unexpected behavior may occur.
5758
*/
5859
@OptIn(InternalCoroutinesApi::class)
59-
public abstract class KrpcClient(
60-
private val config: KrpcConfig.Client,
61-
transport: KrpcTransport,
62-
) : RpcClient, KrpcEndpoint {
60+
public abstract class KrpcClient : RpcClient, KrpcEndpoint {
61+
/**
62+
* Called once to provide [KrpcTransport] for this client
63+
*/
64+
public abstract suspend fun initializeTransport(): KrpcTransport
65+
66+
private var isTransportReady: Boolean = false
67+
private var transport: KrpcTransport by Delegates.notNull()
68+
69+
/**
70+
* Called once to provide [KrpcConfig.Client] for this client
71+
*/
72+
public abstract fun initializeConfig(): KrpcConfig.Client
73+
74+
private val config: KrpcConfig.Client by lazy {
75+
initializeConfig()
76+
}
77+
78+
@Volatile
79+
private var clientCancelled = false
80+
81+
private fun checkTransportReadiness() {
82+
if (!isTransportReady) {
83+
error(
84+
"Internal error, please contact developers for the support. " +
85+
"Transport is not initialized, first scope access must come from an RPC request."
86+
)
87+
}
88+
}
89+
6390
@InternalRpcApi
64-
public val internalScope: CoroutineScope = CoroutineScope(SupervisorJob(transport.coroutineContext.job))
91+
public val internalScope: CoroutineScope by lazy {
92+
checkTransportReadiness()
93+
94+
val context = SupervisorJob(transport.coroutineContext.job)
95+
96+
context.job.invokeOnCompletion(onCancelling = true) {
97+
clientCancelled = true
98+
99+
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
100+
101+
requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) }
102+
requestChannels.clear()
103+
}
104+
105+
CoroutineScope(context)
106+
}
65107

66108
// we make a child here, so we can send cancellation messages before closing the connection
67109
private val connector by lazy {
110+
checkTransportReadiness()
111+
68112
KrpcClientConnector(config.serialFormatInitializer.build(), transport, config.waitForServices)
69113
}
70114

@@ -89,36 +133,19 @@ public abstract class KrpcClient(
89133
// callId to serviceTypeString
90134
private val cancellingRequests = RpcInternalConcurrentHashMap<String, String>()
91135

92-
@Volatile
93-
private var clientCancelled = false
94-
95-
init {
96-
internalScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) {
97-
clientCancelled = true
98-
99-
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
100-
101-
requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) }
102-
requestChannels.clear()
103-
}
104-
105-
internalScope.launch(CoroutineName("krpc-client-generic-messages")) {
106-
connector.subscribeToGenericMessages(::handleGenericMessage)
107-
}
108-
}
109-
110-
private val initHandshake: Job = internalScope.launch(CoroutineName("krpc-client-handshake")) {
111-
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))
112-
113-
connector.subscribeToProtocolMessages(::handleProtocolMessage)
114-
}
115-
116136
/**
117137
* Starts the handshake process and awaits for completion.
118138
* If the handshake was completed before, nothing happens.
119139
*/
120-
private suspend fun awaitHandshakeCompletion() {
121-
initHandshake.join()
140+
private suspend fun initializeAndAwaitHandshakeCompletion() {
141+
transport = initializeTransport()
142+
isTransportReady = true
143+
144+
connector.subscribeToGenericMessages(::handleGenericMessage)
145+
connector.subscribeToProtocolMessages(::handleProtocolMessage)
146+
147+
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))
148+
122149
serverSupportedPlugins.await()
123150
}
124151

@@ -150,7 +177,7 @@ public abstract class KrpcClient(
150177
@Suppress("detekt.CyclomaticComplexMethod")
151178
final override fun <T> callServerStreaming(call: RpcCall): Flow<T> {
152179
return flow {
153-
awaitHandshakeCompletion()
180+
initializeAndAwaitHandshakeCompletion()
154181

155182
val id = callCounter.incrementAndGet()
156183
val callable = call.descriptor.getCallable(call.callableName)
@@ -446,3 +473,19 @@ public abstract class KrpcClient(
446473
}
447474
}
448475
}
476+
477+
/**
478+
* Represents an initialized RPC client that wraps a predefined configuration and transport.
479+
*/
480+
public abstract class InitializedKrpcClient(
481+
private val config: KrpcConfig.Client,
482+
private val transport: KrpcTransport,
483+
): KrpcClient() {
484+
final override suspend fun initializeTransport(): KrpcTransport {
485+
return transport
486+
}
487+
488+
final override fun initializeConfig(): KrpcConfig.Client {
489+
return config
490+
}
491+
}

krpc/krpc-ktor/krpc-ktor-client/api/krpc-ktor-client.api

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ public final class kotlinx/rpc/krpc/ktor/client/KrpcKt {
88
}
99

1010
public final class kotlinx/rpc/krpc/ktor/client/KtorClientDslKt {
11-
public static final fun rpc (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12-
public static final fun rpc (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
13-
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
14-
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
11+
public static final fun rpc (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
12+
public static final fun rpc (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
13+
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
14+
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
1515
public static final fun rpcConfig (Lio/ktor/client/request/HttpRequestBuilder;Lkotlin/jvm/functions/Function1;)V
1616
public static synthetic fun rpcConfig$default (Lio/ktor/client/request/HttpRequestBuilder;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
1717
}
1818

1919
public abstract interface class kotlinx/rpc/krpc/ktor/client/KtorRpcClient : kotlinx/rpc/RpcClient {
20-
public abstract fun getWebSocketSession ()Lio/ktor/websocket/WebSocketSession;
20+
public abstract fun getWebSocketSession ()Lkotlinx/coroutines/Deferred;
2121
}
2222

krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorClientDsl.kt

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import io.ktor.client.request.*
1111
import io.ktor.util.*
1212
import kotlinx.rpc.RpcClient
1313
import kotlinx.rpc.krpc.KrpcConfigBuilder
14-
import kotlinx.rpc.krpc.rpcClientConfig
1514

1615
private val KrpcRequestConfigAttributeKey = AttributeKey<KrpcConfigBuilder.Client.() -> Unit>(
1716
name = "KrpcRequestConfigAttributeKey"
@@ -36,7 +35,7 @@ public fun HttpRequestBuilder.rpcConfig(configBuilder: KrpcConfigBuilder.Client.
3635
* @param block Optional configuration for the
3736
* @return An instance of [RpcClient] that is configured to send messages to the server.
3837
*/
39-
public suspend fun HttpClient.rpc(
38+
public fun HttpClient.rpc(
4039
urlString: String,
4140
block: HttpRequestBuilder.() -> Unit = {},
4241
): KtorRpcClient {
@@ -53,24 +52,19 @@ public suspend fun HttpClient.rpc(
5352
* @param block Optional configuration for the
5453
* @return An instance of [RpcClient] that is configured to send messages to the server.
5554
*/
56-
public suspend fun HttpClient.rpc(
55+
public fun HttpClient.rpc(
5756
block: HttpRequestBuilder.() -> Unit = {},
5857
): KtorRpcClient {
5958
pluginOrNull(WebSockets)
6059
?: error("RPC for client requires $WebSockets plugin to be installed firstly")
6160

62-
var requestConfigBuilder: KrpcConfigBuilder.Client.() -> Unit = {}
63-
val session = webSocketSession {
64-
block()
61+
val pluginConfigBuilder = attributes.getOrNull(KrpcClientPluginAttributesKey)
62+
63+
return KtorKrpcClientImpl(pluginConfigBuilder) { configSetter ->
64+
webSocketSession {
65+
block()
6566

66-
attributes.getOrNull(KrpcRequestConfigAttributeKey)?.let {
67-
requestConfigBuilder = it
67+
attributes.getOrNull(KrpcRequestConfigAttributeKey)?.let { configSetter(it) }
6868
}
6969
}
70-
71-
val pluginConfigBuilder = attributes.getOrNull(KrpcClientPluginAttributesKey)
72-
val rpcConfig = pluginConfigBuilder?.apply(requestConfigBuilder)?.build()
73-
?: rpcClientConfig(requestConfigBuilder)
74-
75-
return KtorKrpcClientImpl(session, rpcConfig)
7670
}

krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorRpcClient.kt

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,46 @@
55
package kotlinx.rpc.krpc.ktor.client
66

77
import io.ktor.websocket.*
8+
import kotlinx.coroutines.CompletableDeferred
9+
import kotlinx.coroutines.Deferred
810
import kotlinx.rpc.RpcClient
911
import kotlinx.rpc.krpc.KrpcConfig
12+
import kotlinx.rpc.krpc.KrpcConfigBuilder
13+
import kotlinx.rpc.krpc.KrpcTransport
1014
import kotlinx.rpc.krpc.client.KrpcClient
1115
import kotlinx.rpc.krpc.ktor.KtorTransport
16+
import kotlinx.rpc.krpc.rpcClientConfig
1217

1318
/**
1419
* [RpcClient] implementation for Ktor, containing [webSocketSession] object,
1520
* that is used to maintain connection.
1621
*/
1722
public interface KtorRpcClient : RpcClient {
18-
public val webSocketSession: WebSocketSession
23+
public val webSocketSession: Deferred<WebSocketSession>
1924
}
2025

2126
internal class KtorKrpcClientImpl(
22-
override val webSocketSession: WebSocketSession,
23-
config: KrpcConfig.Client,
24-
): KrpcClient(config, KtorTransport(webSocketSession)), KtorRpcClient
27+
private val pluginConfigBuilder: KrpcConfigBuilder.Client?,
28+
private val webSocketSessionFactory: suspend (
29+
configSetter: (KrpcConfigBuilder.Client.() -> Unit) -> Unit,
30+
) -> WebSocketSession,
31+
): KrpcClient(), KtorRpcClient {
32+
private var requestConfigBuilder: KrpcConfigBuilder.Client.() -> Unit = {}
2533

34+
private val _webSocketSession = CompletableDeferred<WebSocketSession>()
35+
override val webSocketSession: Deferred<WebSocketSession> = _webSocketSession
36+
37+
override suspend fun initializeTransport(): KrpcTransport {
38+
val session = webSocketSessionFactory {
39+
requestConfigBuilder = it
40+
}
41+
42+
_webSocketSession.complete(session)
43+
return KtorTransport(session)
44+
}
45+
46+
override fun initializeConfig(): KrpcConfig.Client {
47+
return pluginConfigBuilder?.apply(requestConfigBuilder)?.build()
48+
?: rpcClientConfig(requestConfigBuilder)
49+
}
50+
}

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestClient.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.rpc.krpc.test
66

77
import kotlinx.rpc.krpc.KrpcConfig
88
import kotlinx.rpc.krpc.KrpcTransport
9+
import kotlinx.rpc.krpc.client.InitializedKrpcClient
910
import kotlinx.rpc.krpc.client.KrpcClient
1011

1112
/**
@@ -17,4 +18,4 @@ import kotlinx.rpc.krpc.client.KrpcClient
1718
class KrpcTestClient(
1819
config: KrpcConfig.Client,
1920
transport: KrpcTransport,
20-
) : KrpcClient(config, transport)
21+
) : InitializedKrpcClient(config, transport)

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransportTest.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import kotlinx.rpc.krpc.serialization.cbor.cbor
1212
import kotlinx.rpc.krpc.serialization.json.json
1313
import kotlinx.rpc.krpc.serialization.protobuf.protobuf
1414
import kotlinx.serialization.ExperimentalSerializationApi
15+
import kotlin.test.Test
1516

1617
abstract class LocalTransportTest : KrpcTransportTestBase() {
1718
private val transport = LocalTransport()
@@ -50,11 +51,15 @@ class ProtoBufLocalTransportTest : LocalTransportTest() {
5051
}
5152

5253
// 'null' is not supported in ProtoBuf
54+
@Test
5355
override fun nullable(): TestResult = runTest { }
5456

57+
@Test
5558
override fun testByteArraySerialization(): TestResult = runTest { }
5659

60+
@Test
5761
override fun testNullables(): TestResult = runTest { }
5862

63+
@Test
5964
override fun testNullableLists(): TestResult = runTest { }
6065
}

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/ProtocolTestBase.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import kotlinx.coroutines.test.TestScope
1212
import kotlinx.rpc.annotations.Rpc
1313
import kotlinx.rpc.internal.utils.hex.rpcInternalHexToReadableBinary
1414
import kotlinx.rpc.krpc.KrpcConfig
15-
import kotlinx.rpc.krpc.client.KrpcClient
15+
import kotlinx.rpc.krpc.client.InitializedKrpcClient
1616
import kotlinx.rpc.krpc.internal.logging.RpcInternalCommonLogger
1717
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger
1818
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer
@@ -105,4 +105,4 @@ class ProtocolTestServer(
105105
class ProtocolTestClient(
106106
config: KrpcConfig.Client,
107107
transport: LocalTransport,
108-
) : KrpcClient(config, transport.client)
108+
) : InitializedKrpcClient(config, transport.client)

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import kotlin.time.Duration.Companion.seconds
2525
fun runCancellationTest(body: suspend CancellationToolkit.() -> Unit): TestResult {
2626
return runTest(timeout = 15.seconds) {
2727
debugCoroutines()
28-
CancellationToolkit(this).apply { body() }
28+
CancellationToolkit(this).apply {
29+
client.initializeTransport()
30+
31+
body()
32+
}
2933
}
3034
}
3135

tests/compiler-plugin-tests/src/main/kotlin/kotlinx/rpc/codegen/test/TestRpcClient.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,13 @@
44

55
package kotlinx.rpc.codegen.test
66

7-
import kotlinx.coroutines.*
87
import kotlinx.coroutines.flow.Flow
98
import kotlinx.coroutines.flow.flow
109
import kotlinx.rpc.RpcCall
1110
import kotlinx.rpc.RpcClient
12-
import kotlin.coroutines.CoroutineContext
1311

1412
@Suppress("UNCHECKED_CAST", "unused")
1513
object TestRpcClient : RpcClient {
16-
override val coroutineContext: CoroutineContext = Job()
17-
1814
override suspend fun <T> call(call: RpcCall): T {
1915
return "call_42" as T
2016
}

tests/krpc-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/compatibility/LocalTransport.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import kotlinx.coroutines.job
1212
import kotlinx.rpc.krpc.KrpcConfig
1313
import kotlinx.rpc.krpc.KrpcTransport
1414
import kotlinx.rpc.krpc.KrpcTransportMessage
15-
import kotlinx.rpc.krpc.client.KrpcClient
15+
import kotlinx.rpc.krpc.client.InitializedKrpcClient
1616
import kotlinx.rpc.krpc.server.KrpcServer
1717
import kotlin.coroutines.CoroutineContext
1818

@@ -24,7 +24,7 @@ class KrpcTestServer(
2424
class KrpcTestClient(
2525
config: KrpcConfig.Client,
2626
transport: KrpcTransport,
27-
) : KrpcClient(config, transport)
27+
) : InitializedKrpcClient(config, transport)
2828

2929
class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope {
3030
override val coroutineContext = parentScope?.run { SupervisorJob(coroutineContext.job) }

0 commit comments

Comments
 (0)