diff --git a/krpc/krpc-client/api/krpc-client.api b/krpc/krpc-client/api/krpc-client.api index 3f308207f..a98f1637b 100644 --- a/krpc/krpc-client/api/krpc-client.api +++ b/krpc/krpc-client/api/krpc-client.api @@ -1,6 +1,14 @@ -public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/RpcClient, kotlinx/rpc/krpc/internal/KrpcEndpoint { +public abstract class kotlinx/rpc/krpc/client/InitializedKrpcClient : kotlinx/rpc/krpc/client/KrpcClient { public fun (Lkotlinx/rpc/krpc/KrpcConfig$Client;Lkotlinx/rpc/krpc/KrpcTransport;)V + public final fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client; + public final fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/RpcClient, kotlinx/rpc/krpc/internal/KrpcEndpoint { + public fun ()V public final fun call (Lkotlinx/rpc/RpcCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public final fun callServerStreaming (Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/flow/Flow; + public abstract fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client; + public abstract fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } diff --git a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt index e77de945c..e9b768b32 100644 --- a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt +++ b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt @@ -34,6 +34,7 @@ import kotlinx.serialization.modules.SerializersModule import kotlin.collections.first import kotlin.concurrent.Volatile import kotlin.coroutines.cancellation.CancellationException +import kotlin.properties.Delegates @Deprecated("Use KrpcClient instead", ReplaceWith("KrpcClient"), level = DeprecationLevel.ERROR) public typealias KRPCClient = KrpcClient @@ -56,15 +57,61 @@ public typealias KRPCClient = KrpcClient * IMPORTANT: Must be exclusive to this client, otherwise unexpected behavior may occur. */ @OptIn(InternalCoroutinesApi::class) -public abstract class KrpcClient( - private val config: KrpcConfig.Client, - transport: KrpcTransport, -) : RpcClient, KrpcEndpoint { +public abstract class KrpcClient : RpcClient, KrpcEndpoint { + /** + * Called once to provide [KrpcTransport] for this client + */ + public abstract suspend fun initializeTransport(): KrpcTransport + + private var isTransportReady: Boolean = false + private var transport: KrpcTransport by Delegates.notNull() + + /** + * Called once to provide [KrpcConfig.Client] for this client + */ + public abstract fun initializeConfig(): KrpcConfig.Client + + private val config: KrpcConfig.Client by lazy { + initializeConfig() + } + + @Volatile + private var clientCancelled = false + + private fun checkTransportReadiness() { + if (!isTransportReady) { + error( + "Internal error, please contact developers for the support. " + + "Transport is not initialized, first scope access must come from an RPC request." + ) + } + } + @InternalRpcApi - public val internalScope: CoroutineScope = CoroutineScope(SupervisorJob(transport.coroutineContext.job)) + public val internalScope: CoroutineScope by lazy { + checkTransportReadiness() + + val context = SupervisorJob(transport.coroutineContext.job) + + context.job.invokeOnCompletion(onCancelling = true) { + clientCancelled = true + + sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true) + + @OptIn(DelicateCoroutinesApi::class) + GlobalScope.launch { + requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) } + requestChannels.clear() + } + } + + CoroutineScope(context) + } // we make a child here, so we can send cancellation messages before closing the connection private val connector by lazy { + checkTransportReadiness() + KrpcClientConnector(config.serialFormatInitializer.build(), transport, config.waitForServices) } @@ -89,36 +136,19 @@ public abstract class KrpcClient( // callId to serviceTypeString private val cancellingRequests = RpcInternalConcurrentHashMap() - @Volatile - private var clientCancelled = false - - init { - internalScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) { - clientCancelled = true - - sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true) - - requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) } - requestChannels.clear() - } - - internalScope.launch(CoroutineName("krpc-client-generic-messages")) { - connector.subscribeToGenericMessages(::handleGenericMessage) - } - } - - private val initHandshake: Job = internalScope.launch(CoroutineName("krpc-client-handshake")) { - connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL)) - - connector.subscribeToProtocolMessages(::handleProtocolMessage) - } - /** * Starts the handshake process and awaits for completion. * If the handshake was completed before, nothing happens. */ - private suspend fun awaitHandshakeCompletion() { - initHandshake.join() + private suspend fun initializeAndAwaitHandshakeCompletion() { + transport = initializeTransport() + isTransportReady = true + + connector.subscribeToGenericMessages(::handleGenericMessage) + connector.subscribeToProtocolMessages(::handleProtocolMessage) + + connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL)) + serverSupportedPlugins.await() } @@ -150,7 +180,11 @@ public abstract class KrpcClient( @Suppress("detekt.CyclomaticComplexMethod") final override fun callServerStreaming(call: RpcCall): Flow { return flow { - awaitHandshakeCompletion() + if (clientCancelled) { + error("Client cancelled") + } + + initializeAndAwaitHandshakeCompletion() val id = callCounter.incrementAndGet() val callable = call.descriptor.getCallable(call.callableName) @@ -446,3 +480,19 @@ public abstract class KrpcClient( } } } + +/** + * Represents an initialized RPC client that wraps a predefined configuration and transport. + */ +public abstract class InitializedKrpcClient( + private val config: KrpcConfig.Client, + private val transport: KrpcTransport, +): KrpcClient() { + final override suspend fun initializeTransport(): KrpcTransport { + return transport + } + + final override fun initializeConfig(): KrpcConfig.Client { + return config + } +} diff --git a/krpc/krpc-ktor/krpc-ktor-client/api/krpc-ktor-client.api b/krpc/krpc-ktor/krpc-ktor-client/api/krpc-ktor-client.api index 2cc3957bd..ddff8bb1e 100644 --- a/krpc/krpc-ktor/krpc-ktor-client/api/krpc-ktor-client.api +++ b/krpc/krpc-ktor/krpc-ktor-client/api/krpc-ktor-client.api @@ -8,15 +8,15 @@ public final class kotlinx/rpc/krpc/ktor/client/KrpcKt { } public final class kotlinx/rpc/krpc/ktor/client/KtorClientDslKt { - public static final fun rpc (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun rpc (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; - public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun rpc (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient; + public static final fun rpc (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient; + public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient; + public static synthetic fun rpc$default (Lio/ktor/client/HttpClient;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/rpc/krpc/ktor/client/KtorRpcClient; public static final fun rpcConfig (Lio/ktor/client/request/HttpRequestBuilder;Lkotlin/jvm/functions/Function1;)V public static synthetic fun rpcConfig$default (Lio/ktor/client/request/HttpRequestBuilder;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V } public abstract interface class kotlinx/rpc/krpc/ktor/client/KtorRpcClient : kotlinx/rpc/RpcClient { - public abstract fun getWebSocketSession ()Lio/ktor/websocket/WebSocketSession; + public abstract fun getWebSocketSession ()Lkotlinx/coroutines/Deferred; } diff --git a/krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorClientDsl.kt b/krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorClientDsl.kt index f54ee0a3c..dcf3acc30 100644 --- a/krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorClientDsl.kt +++ b/krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorClientDsl.kt @@ -11,7 +11,6 @@ import io.ktor.client.request.* import io.ktor.util.* import kotlinx.rpc.RpcClient import kotlinx.rpc.krpc.KrpcConfigBuilder -import kotlinx.rpc.krpc.rpcClientConfig private val KrpcRequestConfigAttributeKey = AttributeKey Unit>( name = "KrpcRequestConfigAttributeKey" @@ -36,7 +35,7 @@ public fun HttpRequestBuilder.rpcConfig(configBuilder: KrpcConfigBuilder.Client. * @param block Optional configuration for the * @return An instance of [RpcClient] that is configured to send messages to the server. */ -public suspend fun HttpClient.rpc( +public fun HttpClient.rpc( urlString: String, block: HttpRequestBuilder.() -> Unit = {}, ): KtorRpcClient { @@ -53,24 +52,19 @@ public suspend fun HttpClient.rpc( * @param block Optional configuration for the * @return An instance of [RpcClient] that is configured to send messages to the server. */ -public suspend fun HttpClient.rpc( +public fun HttpClient.rpc( block: HttpRequestBuilder.() -> Unit = {}, ): KtorRpcClient { pluginOrNull(WebSockets) ?: error("RPC for client requires $WebSockets plugin to be installed firstly") - var requestConfigBuilder: KrpcConfigBuilder.Client.() -> Unit = {} - val session = webSocketSession { - block() + val pluginConfigBuilder = attributes.getOrNull(KrpcClientPluginAttributesKey) + + return KtorKrpcClientImpl(pluginConfigBuilder) { configSetter -> + webSocketSession { + block() - attributes.getOrNull(KrpcRequestConfigAttributeKey)?.let { - requestConfigBuilder = it + attributes.getOrNull(KrpcRequestConfigAttributeKey)?.let { configSetter(it) } } } - - val pluginConfigBuilder = attributes.getOrNull(KrpcClientPluginAttributesKey) - val rpcConfig = pluginConfigBuilder?.apply(requestConfigBuilder)?.build() - ?: rpcClientConfig(requestConfigBuilder) - - return KtorKrpcClientImpl(session, rpcConfig) } diff --git a/krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorRpcClient.kt b/krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorRpcClient.kt index 922f88441..169718dff 100644 --- a/krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorRpcClient.kt +++ b/krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorRpcClient.kt @@ -5,21 +5,46 @@ package kotlinx.rpc.krpc.ktor.client import io.ktor.websocket.* +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Deferred import kotlinx.rpc.RpcClient import kotlinx.rpc.krpc.KrpcConfig +import kotlinx.rpc.krpc.KrpcConfigBuilder +import kotlinx.rpc.krpc.KrpcTransport import kotlinx.rpc.krpc.client.KrpcClient import kotlinx.rpc.krpc.ktor.KtorTransport +import kotlinx.rpc.krpc.rpcClientConfig /** * [RpcClient] implementation for Ktor, containing [webSocketSession] object, * that is used to maintain connection. */ public interface KtorRpcClient : RpcClient { - public val webSocketSession: WebSocketSession + public val webSocketSession: Deferred } internal class KtorKrpcClientImpl( - override val webSocketSession: WebSocketSession, - config: KrpcConfig.Client, -): KrpcClient(config, KtorTransport(webSocketSession)), KtorRpcClient + private val pluginConfigBuilder: KrpcConfigBuilder.Client?, + private val webSocketSessionFactory: suspend ( + configSetter: (KrpcConfigBuilder.Client.() -> Unit) -> Unit, + ) -> WebSocketSession, +): KrpcClient(), KtorRpcClient { + private var requestConfigBuilder: KrpcConfigBuilder.Client.() -> Unit = {} + private val _webSocketSession = CompletableDeferred() + override val webSocketSession: Deferred = _webSocketSession + + override suspend fun initializeTransport(): KrpcTransport { + val session = webSocketSessionFactory { + requestConfigBuilder = it + } + + _webSocketSession.complete(session) + return KtorTransport(session) + } + + override fun initializeConfig(): KrpcConfig.Client { + return pluginConfigBuilder?.apply(requestConfigBuilder)?.build() + ?: rpcClientConfig(requestConfigBuilder) + } +} diff --git a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestClient.kt b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestClient.kt index 5799f4328..8008a31be 100644 --- a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestClient.kt +++ b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTestClient.kt @@ -6,6 +6,7 @@ package kotlinx.rpc.krpc.test import kotlinx.rpc.krpc.KrpcConfig import kotlinx.rpc.krpc.KrpcTransport +import kotlinx.rpc.krpc.client.InitializedKrpcClient import kotlinx.rpc.krpc.client.KrpcClient /** @@ -17,4 +18,4 @@ import kotlinx.rpc.krpc.client.KrpcClient class KrpcTestClient( config: KrpcConfig.Client, transport: KrpcTransport, -) : KrpcClient(config, transport) +) : InitializedKrpcClient(config, transport) diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransportTest.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransportTest.kt index b8d033106..2cd704c01 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransportTest.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransportTest.kt @@ -12,6 +12,7 @@ import kotlinx.rpc.krpc.serialization.cbor.cbor import kotlinx.rpc.krpc.serialization.json.json import kotlinx.rpc.krpc.serialization.protobuf.protobuf import kotlinx.serialization.ExperimentalSerializationApi +import kotlin.test.Test abstract class LocalTransportTest : KrpcTransportTestBase() { private val transport = LocalTransport() @@ -50,11 +51,15 @@ class ProtoBufLocalTransportTest : LocalTransportTest() { } // 'null' is not supported in ProtoBuf + @Test override fun nullable(): TestResult = runTest { } + @Test override fun testByteArraySerialization(): TestResult = runTest { } + @Test override fun testNullables(): TestResult = runTest { } + @Test override fun testNullableLists(): TestResult = runTest { } } diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/ProtocolTestBase.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/ProtocolTestBase.kt index 9f83fcbcd..f714c20d7 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/ProtocolTestBase.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/ProtocolTestBase.kt @@ -12,7 +12,7 @@ import kotlinx.coroutines.test.TestScope import kotlinx.rpc.annotations.Rpc import kotlinx.rpc.internal.utils.hex.rpcInternalHexToReadableBinary import kotlinx.rpc.krpc.KrpcConfig -import kotlinx.rpc.krpc.client.KrpcClient +import kotlinx.rpc.krpc.client.InitializedKrpcClient import kotlinx.rpc.krpc.internal.logging.RpcInternalCommonLogger import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer @@ -105,4 +105,4 @@ class ProtocolTestServer( class ProtocolTestClient( config: KrpcConfig.Client, transport: LocalTransport, -) : KrpcClient(config, transport.client) +) : InitializedKrpcClient(config, transport.client) diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt index d8b713553..244d4c473 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt @@ -25,7 +25,11 @@ import kotlin.time.Duration.Companion.seconds fun runCancellationTest(body: suspend CancellationToolkit.() -> Unit): TestResult { return runTest(timeout = 15.seconds) { debugCoroutines() - CancellationToolkit(this).apply { body() } + CancellationToolkit(this).apply { + client.initializeTransport() + + body() + } } } diff --git a/krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/api/WireSamplingTestScope.kt b/krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/api/WireSamplingTestScope.kt index 575a3014c..016d87681 100644 --- a/krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/api/WireSamplingTestScope.kt +++ b/krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/api/WireSamplingTestScope.kt @@ -6,6 +6,8 @@ package kotlinx.rpc.krpc.test.api import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay import kotlinx.coroutines.job import kotlinx.coroutines.launch import kotlinx.coroutines.test.TestResult @@ -13,7 +15,9 @@ import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.rpc.annotations.Rpc import kotlinx.rpc.internal.utils.hex.rpcInternalHexToReadableBinary +import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap import kotlinx.rpc.krpc.KrpcTransportMessage +import kotlinx.rpc.krpc.client.KrpcClient import kotlinx.rpc.krpc.internal.logging.RpcInternalCommonLogger import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer @@ -39,6 +43,8 @@ import org.jetbrains.krpc.test.api.util.SamplingServiceImpl import java.nio.file.Files import java.nio.file.Path import kotlin.io.path.name +import kotlin.reflect.full.memberProperties +import kotlin.reflect.jvm.isAccessible import kotlin.time.Duration.Companion.seconds @Suppress("RedundantUnitReturnType") @@ -74,6 +80,18 @@ class WireSamplingTestScope(private val sampleName: String, scope: TestScope) : service.block() + @Suppress("UNCHECKED_CAST") + val requests = KrpcClient::class.memberProperties + .single { it.name == "requestChannels" } + .apply { + isAccessible = true + } + .get(client) as RpcInternalConcurrentHashMap> + + while (requests.values.isNotEmpty()) { + delay(100) + } + stop() } diff --git a/tests/compiler-plugin-tests/src/main/kotlin/kotlinx/rpc/codegen/test/TestRpcClient.kt b/tests/compiler-plugin-tests/src/main/kotlin/kotlinx/rpc/codegen/test/TestRpcClient.kt index 0fd640cd8..d52504bad 100644 --- a/tests/compiler-plugin-tests/src/main/kotlin/kotlinx/rpc/codegen/test/TestRpcClient.kt +++ b/tests/compiler-plugin-tests/src/main/kotlin/kotlinx/rpc/codegen/test/TestRpcClient.kt @@ -4,17 +4,13 @@ package kotlinx.rpc.codegen.test -import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.rpc.RpcCall import kotlinx.rpc.RpcClient -import kotlin.coroutines.CoroutineContext @Suppress("UNCHECKED_CAST", "unused") object TestRpcClient : RpcClient { - override val coroutineContext: CoroutineContext = Job() - override suspend fun call(call: RpcCall): T { return "call_42" as T } diff --git a/tests/krpc-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/compatibility/LocalTransport.kt b/tests/krpc-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/compatibility/LocalTransport.kt index 9739e2234..58aad8f0e 100644 --- a/tests/krpc-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/compatibility/LocalTransport.kt +++ b/tests/krpc-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/compatibility/LocalTransport.kt @@ -12,7 +12,7 @@ import kotlinx.coroutines.job import kotlinx.rpc.krpc.KrpcConfig import kotlinx.rpc.krpc.KrpcTransport import kotlinx.rpc.krpc.KrpcTransportMessage -import kotlinx.rpc.krpc.client.KrpcClient +import kotlinx.rpc.krpc.client.InitializedKrpcClient import kotlinx.rpc.krpc.server.KrpcServer import kotlin.coroutines.CoroutineContext @@ -24,7 +24,7 @@ class KrpcTestServer( class KrpcTestClient( config: KrpcConfig.Client, transport: KrpcTransport, -) : KrpcClient(config, transport) +) : InitializedKrpcClient(config, transport) class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope { override val coroutineContext = parentScope?.run { SupervisorJob(coroutineContext.job) }