Skip to content

Commit df7b1ce

Browse files
committed
Fixes for failed CI
1 parent 71fbe2a commit df7b1ce

File tree

13 files changed

+77
-18
lines changed

13 files changed

+77
-18
lines changed

compiler-plugin/compiler-plugin-k2/src/main/pre_2_0_10/kotlinx/rpc/codegen/checkers/FirRpcCheckersVS.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ class FirRpcServiceDeclarationCheckerVS(
6565
}
6666
}
6767

68-
class FirRpcStrictModeClassCheckerVS(private val ctx: FirCheckersContext) : FirRegularClassChecker(MppCheckerKind.Common) {
68+
class FirRpcStrictModeClassCheckerVS : FirRegularClassChecker(MppCheckerKind.Common) {
6969
override fun check(declaration: FirRegularClass, context: CheckerContext, reporter: DiagnosticReporter) {
70-
FirRpcStrictModeClassChecker.check(ctx, declaration, context, reporter)
70+
FirRpcStrictModeClassChecker.check(declaration, context, reporter)
7171
}
7272
}

compiler-plugin/compiler-plugin-k2/src/main/pre_2_0_21/kotlinx/rpc/codegen/checkers/FirRpcCheckersVS.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ class FirRpcServiceDeclarationCheckerVS(
6565
}
6666
}
6767

68-
class FirRpcStrictModeClassCheckerVS(private val ctx: FirCheckersContext) : FirRegularClassChecker(MppCheckerKind.Common) {
68+
class FirRpcStrictModeClassCheckerVS : FirRegularClassChecker(MppCheckerKind.Common) {
6969
override fun check(declaration: FirRegularClass, context: CheckerContext, reporter: DiagnosticReporter) {
70-
FirRpcStrictModeClassChecker.check(ctx, declaration, context, reporter)
70+
FirRpcStrictModeClassChecker.check(declaration, context, reporter)
7171
}
7272
}

compiler-plugin/compiler-plugin-k2/src/main/v_2_2/kotlinx/rpc/codegen/checkers/FirRpcCheckersVS.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ class FirRpcServiceDeclarationCheckerVS(
7171
}
7272
}
7373

74-
class FirRpcStrictModeClassCheckerVS(private val ctx: FirCheckersContext) : FirRegularClassChecker(MppCheckerKind.Common) {
74+
class FirRpcStrictModeClassCheckerVS : FirRegularClassChecker(MppCheckerKind.Common) {
7575
context(context: CheckerContext, reporter: DiagnosticReporter)
7676
override fun check(declaration: FirRegularClass) {
77-
FirRpcStrictModeClassChecker.check(ctx, declaration, context, reporter)
77+
FirRpcStrictModeClassChecker.check(declaration, context, reporter)
7878
}
7979
}

compiler-plugin/compiler-plugin-k2/src/main/v_2_2_2/kotlinx/rpc/codegen/checkers/FirRpcCheckersVS.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ class FirRpcServiceDeclarationCheckerVS(
7171
}
7272
}
7373

74-
class FirRpcStrictModeClassCheckerVS(private val ctx: FirCheckersContext) : FirRegularClassChecker(MppCheckerKind.Common) {
74+
class FirRpcStrictModeClassCheckerVS : FirRegularClassChecker(MppCheckerKind.Common) {
7575
context(context: CheckerContext, reporter: DiagnosticReporter)
7676
override fun check(declaration: FirRegularClass) {
77-
FirRpcStrictModeClassChecker.check(ctx, declaration, context, reporter)
77+
FirRpcStrictModeClassChecker.check(declaration, context, reporter)
7878
}
7979
}

krpc/krpc-client/api/krpc-client.api

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
public abstract class kotlinx/rpc/krpc/client/InitializedKrpcClient : kotlinx/rpc/krpc/client/KrpcClient {
22
public fun <init> (Lkotlinx/rpc/krpc/KrpcConfig$Client;Lkotlinx/rpc/krpc/KrpcTransport;)V
3-
public final fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
4-
public final fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3+
protected final fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
4+
protected final fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
55
}
66

77
public abstract class kotlinx/rpc/krpc/client/KrpcClient : kotlinx/rpc/RpcClient, kotlinx/rpc/krpc/internal/KrpcEndpoint {
88
public fun <init> ()V
9+
public final fun awaitCompletion (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
910
public final fun call (Lkotlinx/rpc/RpcCall;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1011
public final fun callServerStreaming (Lkotlinx/rpc/RpcCall;)Lkotlinx/coroutines/flow/Flow;
11-
public abstract fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
12-
public abstract fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
12+
public final fun close (Ljava/lang/String;)V
13+
public static synthetic fun close$default (Lkotlinx/rpc/krpc/client/KrpcClient;Ljava/lang/String;ILjava/lang/Object;)V
14+
protected abstract fun initializeConfig ()Lkotlinx/rpc/krpc/KrpcConfig$Client;
15+
protected abstract fun initializeTransport (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
1316
}
1417

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,21 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
8484
* Close this client, removing all the services and stopping accepting messages.
8585
*/
8686
public fun close(message: String? = null) {
87+
if (!isTransportReady) {
88+
return
89+
}
90+
8791
internalScope.cancel(message ?: "Client closed")
8892
}
8993

9094
/**
9195
* Waits until the client is closed.
9296
*/
9397
public suspend fun awaitCompletion() {
98+
if (!isTransportReady) {
99+
return
100+
}
101+
94102
internalScope.coroutineContext.job.join()
95103
}
96104

@@ -133,7 +141,8 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
133141
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
134142

135143
@OptIn(DelicateCoroutinesApi::class)
136-
GlobalScope.launch {
144+
@Suppress("detekt.GlobalCoroutineUsage")
145+
GlobalScope.launch(CoroutineName("client-request-channels-closing")) {
137146
requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) }
138147
requestChannels.clear()
139148
}

krpc/krpc-ktor/krpc-ktor-client/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/client/KtorClientDsl.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import io.ktor.client.plugins.*
99
import io.ktor.client.plugins.websocket.*
1010
import io.ktor.client.request.*
1111
import io.ktor.util.*
12-
import kotlinx.rpc.RpcClient
1312
import kotlinx.rpc.krpc.KrpcConfigBuilder
1413

1514
private val KrpcRequestConfigAttributeKey = AttributeKey<KrpcConfigBuilder.Client.() -> Unit>(

krpc/krpc-ktor/krpc-ktor-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/ktor/KtorTransportTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ class KtorTransportTest {
152152

153153
val service = SlowServiceImpl()
154154

155+
@Suppress("detekt.GlobalCoroutineUsage")
155156
val serverJob = GlobalScope.launch(CoroutineName("server")) {
156157
withContext(newPool) {
157158
val server = embeddedServer(

krpc/krpc-server/api/krpc-server.api

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
public abstract class kotlinx/rpc/krpc/server/KrpcServer : kotlinx/rpc/RpcServer, kotlinx/rpc/krpc/internal/KrpcEndpoint {
22
public fun <init> (Lkotlinx/rpc/krpc/KrpcConfig$Server;Lkotlinx/rpc/krpc/KrpcTransport;)V
3+
public final fun awaitCompletion (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
4+
public final fun close (Ljava/lang/String;)V
5+
public static synthetic fun close$default (Lkotlinx/rpc/krpc/server/KrpcServer;Ljava/lang/String;ILjava/lang/Object;)V
36
public fun deregisterService (Lkotlin/reflect/KClass;)V
47
public final fun registerService (Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function0;)V
58
}

krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,9 @@ abstract class KrpcTransportTestBase {
391391
return@runTest
392392
}
393393

394-
val inContinuation = Semaphore(1)
394+
val inContinuation = Semaphore(permits = 1, acquiredPermits = 1)
395395
val running = AtomicTest()
396+
val gate = CompletableDeferred<Unit>()
396397

397398
// start a coroutine that block the thread after continuation
398399
val c1 = async(Dispatchers.Default) { // make a rpc call
@@ -405,13 +406,14 @@ abstract class KrpcTransportTestBase {
405406
inContinuation.release()
406407

407408
// let's block the thread
409+
gate.complete(Unit)
408410
while (running.atomic.value) {
409411
// do nothing
410412
}
411413
}
412414

413415
// wait, till the Rpc continuation thread is blocked
414-
delay(100)
416+
gate.await()
415417
assertTrue(inContinuation.tryAcquire())
416418

417419
val c2 = async { // make a call

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.rpc.krpc.test
66

7+
import kotlinx.atomicfu.atomic
78
import kotlinx.coroutines.CoroutineScope
89
import kotlinx.coroutines.Job
910
import kotlinx.coroutines.SupervisorJob
@@ -12,6 +13,8 @@ import kotlinx.coroutines.job
1213
import kotlinx.rpc.krpc.KrpcTransport
1314
import kotlinx.rpc.krpc.KrpcTransportMessage
1415
import kotlin.coroutines.CoroutineContext
16+
import kotlin.time.Clock
17+
import kotlin.time.ExperimentalTime
1518

1619
class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope {
1720
override val coroutineContext = parentScope?.run { SupervisorJob(coroutineContext.job) }
@@ -20,10 +23,15 @@ class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope {
2023
private val clientIncoming = Channel<KrpcTransportMessage>()
2124
private val serverIncoming = Channel<KrpcTransportMessage>()
2225

26+
val lastMessageSentOnClient = atomic(0L)
27+
val lastMessageSentOnServer = atomic(0L)
28+
2329
val client: KrpcTransport = object : KrpcTransport {
2430
override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext.job)
2531

32+
@OptIn(ExperimentalTime::class)
2633
override suspend fun send(message: KrpcTransportMessage) {
34+
lastMessageSentOnClient.getAndSet(Clock.System.now().toEpochMilliseconds())
2735
serverIncoming.send(message)
2836
}
2937

@@ -35,7 +43,9 @@ class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope {
3543
val server: KrpcTransport = object : KrpcTransport {
3644
override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext)
3745

46+
@OptIn(ExperimentalTime::class)
3847
override suspend fun send(message: KrpcTransportMessage) {
48+
lastMessageSentOnServer.getAndSet(Clock.System.now().toEpochMilliseconds())
3949
clientIncoming.send(message)
4050
}
4151

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class CancellationServiceImpl : CancellationService {
6161

6262
@OptIn(DelicateCoroutinesApi::class)
6363
override suspend fun outgoingStreamAsync(stream: Flow<Int>) {
64+
@Suppress("detekt.GlobalCoroutineUsage")
6465
GlobalScope.launch {
6566
consume(stream)
6667
}

krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/api/WireSamplingTestScope.kt

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,13 @@ import org.jetbrains.krpc.test.api.util.SamplingServiceImpl
4343
import java.nio.file.Files
4444
import java.nio.file.Path
4545
import kotlin.io.path.name
46+
import kotlin.reflect.full.callSuspend
47+
import kotlin.reflect.full.memberFunctions
4648
import kotlin.reflect.full.memberProperties
4749
import kotlin.reflect.jvm.isAccessible
50+
import kotlin.time.Clock
4851
import kotlin.time.Duration.Companion.seconds
52+
import kotlin.time.ExperimentalTime
4953

5054
@Suppress("RedundantUnitReturnType")
5155
fun wireSamplingTest(name: String, sampling: suspend WireSamplingTestScope.() -> Unit): TestResult {
@@ -76,6 +80,8 @@ class WireSamplingTestScope(private val sampleName: String, scope: TestScope) :
7680
val fails = mutableListOf<String>()
7781
formats.forEach { format ->
7882
val finishedToolkit = WireToolkit(this, format).apply {
83+
initClient()
84+
7985
server // init server
8086

8187
service.block()
@@ -111,6 +117,13 @@ class WireSamplingTestScope(private val sampleName: String, scope: TestScope) :
111117
fails.failIfAnyCauses()
112118
}
113119

120+
private suspend fun WireToolkit.initClient() {
121+
KrpcClient::class.memberFunctions.single { it.name == "initializeTransport" }.apply {
122+
isAccessible = true
123+
callSuspend(client)
124+
}
125+
}
126+
114127
var skipOldServerTests: Boolean = true
115128

116129
suspend fun runSimulatorTests() {
@@ -147,6 +160,7 @@ class WireSamplingTestScope(private val sampleName: String, scope: TestScope) :
147160
val oldClientToolkit = WireToolkit(this, format, logger)
148161
logger.info { "Running wire test: old client (version: $version) with current server on $format format" }
149162

163+
oldClientToolkit.initClient()
150164
oldClientToolkit.server // init server
151165
for ((role, _, message) in dump.filter { it.phase == Phase.Send }) {
152166
when (role) {
@@ -255,8 +269,25 @@ private class WireToolkit(scope: CoroutineScope, format: SamplingFormat, val log
255269
}
256270

257271
suspend fun stop() {
258-
transport.coroutineContext.job.cancelAndJoin()
272+
@OptIn(ExperimentalTime::class)
273+
while (true) {
274+
val now = Clock.System.now().toEpochMilliseconds()
275+
if (now - transport.lastMessageSentOnClient.value > 400 &&
276+
now - transport.lastMessageSentOnServer.value > 400
277+
) {
278+
break
279+
}
280+
281+
delay(100)
282+
}
283+
259284
RpcInternalDumpLoggerContainer.set(null)
285+
286+
client.close()
287+
server.close()
288+
client.awaitCompletion()
289+
server.awaitCompletion()
290+
transport.coroutineContext.job.cancelAndJoin()
260291
}
261292

262293
init {
@@ -301,7 +332,7 @@ data class DumpLog(
301332
.map { it.trim() }
302333
.filter { !it.startsWith("//") && it.isNotBlank() }
303334
.map { line ->
304-
val (prefix, log) = line.split("\$", limit = 2).map { it.trim() }
335+
val (prefix, log) = line.split("$", limit = 2).map { it.trim() }
305336
val (role, phase) = prefix.split(" ")
306337

307338
DumpLog(Role.fromText(role), Phase.fromText(phase), log)

0 commit comments

Comments
 (0)