Skip to content

Commit fce1b52

Browse files
committed
KRPC-171 Non-suspending Ktor client creation
1 parent ac827a1 commit fce1b52

File tree

12 files changed

+167
-66
lines changed

12 files changed

+167
-66
lines changed

krpc/krpc-client/api/krpc-client.api

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1-
public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/RpcClient, kotlinx/rpc/krpc/internal/KrpcEndpoint {
1+
public abstract class kotlinx/rpc/krpc/client/InitializedKrpcClient : kotlinx/rpc/krpc/client/KrpcClient {
22
public fun <init> (Lkotlinx/rpc/krpc/KrpcConfig$Client;Lkotlinx/rpc/krpc/KrpcTransport;)V
3+
public final fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
4+
public final fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
5+
}
6+
7+
public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/RpcClient, kotlinx/rpc/krpc/internal/KrpcEndpoint {
8+
public fun <init> ()V
39
public final fun call (Lkotlinx/rpc/RpcCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
410
public final fun callServerStreaming (Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/flow/Flow;
11+
public abstract fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
12+
public abstract fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
513
}
614

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 82 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import kotlinx.serialization.modules.SerializersModule
3434
import kotlin.collections.first
3535
import kotlin.concurrent.Volatile
3636
import kotlin.coroutines.cancellation.CancellationException
37+
import kotlin.properties.Delegates
3738

3839
@Deprecated("Use KrpcClient instead", ReplaceWith("KrpcClient"), level = DeprecationLevel.ERROR)
3940
public typealias KRPCClient = KrpcClient
@@ -56,15 +57,61 @@ public typealias KRPCClient = KrpcClient
5657
* IMPORTANT: Must be exclusive to this client, otherwise unexpected behavior may occur.
5758
*/
5859
@OptIn(InternalCoroutinesApi::class)
59-
public abstract class KrpcClient(
60-
private val config: KrpcConfig.Client,
61-
transport: KrpcTransport,
62-
) : RpcClient, KrpcEndpoint {
60+
public abstract class KrpcClient : RpcClient, KrpcEndpoint {
61+
/**
62+
* Called once to provide [KrpcTransport] for this client
63+
*/
64+
public abstract suspend fun initializeTransport(): KrpcTransport
65+
66+
private var isTransportReady: Boolean = false
67+
private var transport: KrpcTransport by Delegates.notNull()
68+
69+
/**
70+
* Called once to provide [KrpcConfig.Client] for this client
71+
*/
72+
public abstract fun initializeConfig(): KrpcConfig.Client
73+
74+
private val config: KrpcConfig.Client by lazy {
75+
initializeConfig()
76+
}
77+
78+
@Volatile
79+
private var clientCancelled = false
80+
81+
private fun checkTransportReadiness() {
82+
if (!isTransportReady) {
83+
error(
84+
"Internal error, please contact developers for the support. " +
85+
"Transport is not initialized, first scope access must come from an RPC request."
86+
)
87+
}
88+
}
89+
6390
@InternalRpcApi
64-
public val internalScope: CoroutineScope = CoroutineScope(SupervisorJob(transport.coroutineContext.job))
91+
public val internalScope: CoroutineScope by lazy {
92+
checkTransportReadiness()
93+
94+
val context = SupervisorJob(transport.coroutineContext.job)
95+
96+
context.job.invokeOnCompletion(onCancelling = true) {
97+
clientCancelled = true
98+
99+
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
100+
101+
@OptIn(DelicateCoroutinesApi::class)
102+
GlobalScope.launch {
103+
requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) }
104+
requestChannels.clear()
105+
}
106+
}
107+
108+
CoroutineScope(context)
109+
}
65110

66111
// we make a child here, so we can send cancellation messages before closing the connection
67112
private val connector by lazy {
113+
checkTransportReadiness()
114+
68115
KrpcClientConnector(config.serialFormatInitializer.build(), transport, config.waitForServices)
69116
}
70117

@@ -89,36 +136,19 @@ public abstract class KrpcClient(
89136
// callId to serviceTypeString
90137
private val cancellingRequests = RpcInternalConcurrentHashMap<String, String>()
91138

92-
@Volatile
93-
private var clientCancelled = false
94-
95-
init {
96-
internalScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) {
97-
clientCancelled = true
98-
99-
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
100-
101-
requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) }
102-
requestChannels.clear()
103-
}
104-
105-
internalScope.launch(CoroutineName("krpc-client-generic-messages")) {
106-
connector.subscribeToGenericMessages(::handleGenericMessage)
107-
}
108-
}
109-
110-
private val initHandshake: Job = internalScope.launch(CoroutineName("krpc-client-handshake")) {
111-
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))
112-
113-
connector.subscribeToProtocolMessages(::handleProtocolMessage)
114-
}
115-
116139
/**
117140
* Starts the handshake process and awaits for completion.
118141
* If the handshake was completed before, nothing happens.
119142
*/
120-
private suspend fun awaitHandshakeCompletion() {
121-
initHandshake.join()
143+
private suspend fun initializeAndAwaitHandshakeCompletion() {
144+
transport = initializeTransport()
145+
isTransportReady = true
146+
147+
connector.subscribeToGenericMessages(::handleGenericMessage)
148+
connector.subscribeToProtocolMessages(::handleProtocolMessage)
149+
150+
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))
151+
122152
serverSupportedPlugins.await()
123153
}
124154

@@ -150,7 +180,11 @@ public abstract class KrpcClient(
150180
@Suppress("detekt.CyclomaticComplexMethod")
151181
final override fun <T> callServerStreaming(call: RpcCall): Flow<T> {
152182
return flow {
153-
awaitHandshakeCompletion()
183+
if (clientCancelled) {
184+
error("Client cancelled")
185+
}
186+
187+
initializeAndAwaitHandshakeCompletion()
154188

155189
val id = callCounter.incrementAndGet()
156190
val callable = call.descriptor.getCallable(call.callableName)
@@ -446,3 +480,19 @@ public abstract class KrpcClient(
446480
}
447481
}
448482
}
483+
484+
/**
485+
* Represents an initialized RPC client that wraps a predefined configuration and transport.
486+
*/
487+
public abstract class InitializedKrpcClient(
488+
private val config: KrpcConfig.Client,
489+
private val transport: KrpcTransport,
490+
): KrpcClient() {
491+
final override suspend fun initializeTransport(): KrpcTransport {
492+
return transport
493+
}
494+
495+
final override fun initializeConfig(): KrpcConfig.Client {
496+
return config
497+
}
498+
}

krpc/krpc-ktor/krpc-ktor-client/api/krpc-ktor-client.api

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ public final class kotlinx/rpc/krpc/ktor/client/KrpcKt {
88
}
99

1010
public final class kotlinx/rpc/krpc/ktor/client/KtorClientDslKt {
11-
public static final fun rpc (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12-
public static final fun rpc (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
13-
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
14-
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
11+
public static final fun rpc (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
12+
public static final fun rpc (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
13+
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
14+
public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient;
1515
public static final fun rpcConfig (Lio/ktor/client/request/HttpRequestBuilder;Lkotlin/jvm/functions/Function1;)V
1616
public static synthetic fun rpcConfig$default (Lio/ktor/client/request/HttpRequestBuilder;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
1717
}
1818

1919
public abstract interface class kotlinx/rpc/krpc/ktor/client/KtorRpcClient : kotlinx/rpc/RpcClient {
20-
public abstract fun getWebSocketSession ()Lio/ktor/websocket/WebSocketSession;
20+
public abstract fun getWebSocketSession ()Lkotlinx/coroutines/Deferred;
2121
}
2222

krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorClientDsl.kt

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import io.ktor.client.request.*
1111
import io.ktor.util.*
1212
import kotlinx.rpc.RpcClient
1313
import kotlinx.rpc.krpc.KrpcConfigBuilder
14-
import kotlinx.rpc.krpc.rpcClientConfig
1514

1615
private val KrpcRequestConfigAttributeKey = AttributeKey<KrpcConfigBuilder.Client.() -> Unit>(
1716
name = "KrpcRequestConfigAttributeKey"
@@ -36,7 +35,7 @@ public fun HttpRequestBuilder.rpcConfig(configBuilder: KrpcConfigBuilder.Client.
3635
* @param block Optional configuration for the
3736
* @return An instance of [RpcClient] that is configured to send messages to the server.
3837
*/
39-
public suspend fun HttpClient.rpc(
38+
public fun HttpClient.rpc(
4039
urlString: String,
4140
block: HttpRequestBuilder.() -> Unit = {},
4241
): KtorRpcClient {
@@ -53,24 +52,19 @@ public suspend fun HttpClient.rpc(
5352
* @param block Optional configuration for the
5453
* @return An instance of [RpcClient] that is configured to send messages to the server.
5554
*/
56-
public suspend fun HttpClient.rpc(
55+
public fun HttpClient.rpc(
5756
block: HttpRequestBuilder.() -> Unit = {},
5857
): KtorRpcClient {
5958
pluginOrNull(WebSockets)
6059
?: error("RPC for client requires $WebSockets plugin to be installed firstly")
6160

62-
var requestConfigBuilder: KrpcConfigBuilder.Client.() -> Unit = {}
63-
val session = webSocketSession {
64-
block()
61+
val pluginConfigBuilder = attributes.getOrNull(KrpcClientPluginAttributesKey)
62+
63+
return KtorKrpcClientImpl(pluginConfigBuilder) { configSetter ->
64+
webSocketSession {
65+
block()
6566

66-
attributes.getOrNull(KrpcRequestConfigAttributeKey)?.let {
67-
requestConfigBuilder = it
67+
attributes.getOrNull(KrpcRequestConfigAttributeKey)?.let { configSetter(it) }
6868
}
6969
}
70-
71-
val pluginConfigBuilder = attributes.getOrNull(KrpcClientPluginAttributesKey)
72-
val rpcConfig = pluginConfigBuilder?.apply(requestConfigBuilder)?.build()
73-
?: rpcClientConfig(requestConfigBuilder)
74-
75-
return KtorKrpcClientImpl(session, rpcConfig)
7670
}

krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorRpcClient.kt

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,46 @@
55
package kotlinx.rpc.krpc.ktor.client
66

77
import io.ktor.websocket.*
8+
import kotlinx.coroutines.CompletableDeferred
9+
import kotlinx.coroutines.Deferred
810
import kotlinx.rpc.RpcClient
911
import kotlinx.rpc.krpc.KrpcConfig
12+
import kotlinx.rpc.krpc.KrpcConfigBuilder
13+
import kotlinx.rpc.krpc.KrpcTransport
1014
import kotlinx.rpc.krpc.client.KrpcClient
1115
import kotlinx.rpc.krpc.ktor.KtorTransport
16+
import kotlinx.rpc.krpc.rpcClientConfig
1217

1318
/**
1419
* [RpcClient] implementation for Ktor, containing [webSocketSession] object,
1520
* that is used to maintain connection.
1621
*/
1722
public interface KtorRpcClient : RpcClient {
18-
public val webSocketSession: WebSocketSession
23+
public val webSocketSession: Deferred<WebSocketSession>
1924
}
2025

2126
internal class KtorKrpcClientImpl(
22-
override val webSocketSession: WebSocketSession,
23-
config: KrpcConfig.Client,
24-
): KrpcClient(config, KtorTransport(webSocketSession)), KtorRpcClient
27+
private val pluginConfigBuilder: KrpcConfigBuilder.Client?,
28+
private val webSocketSessionFactory: suspend (
29+
configSetter: (KrpcConfigBuilder.Client.() -> Unit) -> Unit,
30+
) -> WebSocketSession,
31+
): KrpcClient(), KtorRpcClient {
32+
private var requestConfigBuilder: KrpcConfigBuilder.Client.() -> Unit = {}
2533

34+
private val _webSocketSession = CompletableDeferred<WebSocketSession>()
35+
override val webSocketSession: Deferred<WebSocketSession> = _webSocketSession
36+
37+
override suspend fun initializeTransport(): KrpcTransport {
38+
val session = webSocketSessionFactory {
39+
requestConfigBuilder = it
40+
}
41+
42+
_webSocketSession.complete(session)
43+
return KtorTransport(session)
44+
}
45+
46+
override fun initializeConfig(): KrpcConfig.Client {
47+
return pluginConfigBuilder?.apply(requestConfigBuilder)?.build()
48+
?: rpcClientConfig(requestConfigBuilder)
49+
}
50+
}

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestClient.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.rpc.krpc.test
66

77
import kotlinx.rpc.krpc.KrpcConfig
88
import kotlinx.rpc.krpc.KrpcTransport
9+
import kotlinx.rpc.krpc.client.InitializedKrpcClient
910
import kotlinx.rpc.krpc.client.KrpcClient
1011

1112
/**
@@ -17,4 +18,4 @@ import kotlinx.rpc.krpc.client.KrpcClient
1718
class KrpcTestClient(
1819
config: KrpcConfig.Client,
1920
transport: KrpcTransport,
20-
) : KrpcClient(config, transport)
21+
) : InitializedKrpcClient(config, transport)

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransportTest.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import kotlinx.rpc.krpc.serialization.cbor.cbor
1212
import kotlinx.rpc.krpc.serialization.json.json
1313
import kotlinx.rpc.krpc.serialization.protobuf.protobuf
1414
import kotlinx.serialization.ExperimentalSerializationApi
15+
import kotlin.test.Test
1516

1617
abstract class LocalTransportTest : KrpcTransportTestBase() {
1718
private val transport = LocalTransport()
@@ -50,11 +51,15 @@ class ProtoBufLocalTransportTest : LocalTransportTest() {
5051
}
5152

5253
// 'null' is not supported in ProtoBuf
54+
@Test
5355
override fun nullable(): TestResult = runTest { }
5456

57+
@Test
5558
override fun testByteArraySerialization(): TestResult = runTest { }
5659

60+
@Test
5761
override fun testNullables(): TestResult = runTest { }
5862

63+
@Test
5964
override fun testNullableLists(): TestResult = runTest { }
6065
}

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/ProtocolTestBase.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import kotlinx.coroutines.test.TestScope
1212
import kotlinx.rpc.annotations.Rpc
1313
import kotlinx.rpc.internal.utils.hex.rpcInternalHexToReadableBinary
1414
import kotlinx.rpc.krpc.KrpcConfig
15-
import kotlinx.rpc.krpc.client.KrpcClient
15+
import kotlinx.rpc.krpc.client.InitializedKrpcClient
1616
import kotlinx.rpc.krpc.internal.logging.RpcInternalCommonLogger
1717
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger
1818
import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer
@@ -105,4 +105,4 @@ class ProtocolTestServer(
105105
class ProtocolTestClient(
106106
config: KrpcConfig.Client,
107107
transport: LocalTransport,
108-
) : KrpcClient(config, transport.client)
108+
) : InitializedKrpcClient(config, transport.client)

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import kotlin.time.Duration.Companion.seconds
2525
fun runCancellationTest(body: suspend CancellationToolkit.() -> Unit): TestResult {
2626
return runTest(timeout = 15.seconds) {
2727
debugCoroutines()
28-
CancellationToolkit(this).apply { body() }
28+
CancellationToolkit(this).apply {
29+
client.initializeTransport()
30+
31+
body()
32+
}
2933
}
3034
}
3135

0 commit comments

Comments
 (0)