6
6
7
7
package kotlinx.rpc.krpc.ktor
8
8
9
+ import io.ktor.client.*
10
+ import io.ktor.client.engine.cio.*
11
+ import io.ktor.client.request.*
12
+ import io.ktor.client.statement.*
9
13
import io.ktor.server.application.*
14
+ import io.ktor.server.engine.*
15
+ import io.ktor.server.netty.*
16
+ import io.ktor.server.response.*
17
+ import io.ktor.server.routing.*
10
18
import io.ktor.server.testing.*
11
- import kotlinx.coroutines.cancel
19
+ import kotlinx.coroutines.*
20
+ import kotlinx.coroutines.debug.DebugProbes
21
+ import kotlinx.coroutines.test.runTest
12
22
import kotlinx.rpc.annotations.Rpc
23
+ import kotlinx.rpc.krpc.client.KrpcClient
24
+ import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLogger
25
+ import kotlinx.rpc.krpc.internal.logging.RpcInternalDumpLoggerContainer
13
26
import kotlinx.rpc.krpc.ktor.client.installKrpc
14
27
import kotlinx.rpc.krpc.ktor.client.rpc
15
28
import kotlinx.rpc.krpc.ktor.client.rpcConfig
@@ -18,7 +31,15 @@ import kotlinx.rpc.krpc.ktor.server.rpc
18
31
import kotlinx.rpc.krpc.serialization.json.json
19
32
import kotlinx.rpc.withService
20
33
import org.junit.Assert.assertEquals
34
+ import org.junit.platform.commons.logging.Logger
35
+ import org.junit.platform.commons.logging.LoggerFactory
36
+ import java.net.ServerSocket
37
+ import java.util.concurrent.Executors
38
+ import java.util.concurrent.TimeUnit
39
+ import kotlin.coroutines.cancellation.CancellationException
40
+ import kotlin.test.Ignore
21
41
import kotlin.test.Test
42
+ import kotlin.time.Duration.Companion.seconds
22
43
23
44
@Rpc
24
45
interface NewService {
@@ -35,6 +56,23 @@ class NewServiceImpl(
35
56
}
36
57
}
37
58
59
+ @Rpc
60
+ interface SlowService {
61
+ suspend fun verySlow (): String
62
+ }
63
+
64
+ class SlowServiceImpl : SlowService {
65
+ val received = CompletableDeferred <Unit >()
66
+
67
+ override suspend fun verySlow (): String {
68
+ received.complete(Unit )
69
+
70
+ delay(Int .MAX_VALUE .toLong())
71
+
72
+ error(" Must not be called" )
73
+ }
74
+ }
75
+
38
76
class KtorTransportTest {
39
77
@Test
40
78
fun testEcho () = testApplication {
@@ -96,4 +134,130 @@ class KtorTransportTest {
96
134
97
135
clientWithNoConfig.cancel()
98
136
}
137
+
138
+ @OptIn(DelicateCoroutinesApi ::class , ExperimentalCoroutinesApi ::class )
139
+ @Test
140
+ @Ignore(" Wait for Ktor fix (https://github.com/ktorio/ktor/pull/4927) or apply workaround if rejected" )
141
+ fun testEndpointsTerminateWhenWsDoes () = runTest(timeout = 15 .seconds) {
142
+ DebugProbes .install()
143
+
144
+ val logger = setupLogger()
145
+
146
+ val port: Int = findFreePort()
147
+
148
+ val newPool = Executors .newCachedThreadPool().asCoroutineDispatcher()
149
+
150
+ val serverReady = CompletableDeferred <Unit >()
151
+ val dropServer = CompletableDeferred <Unit >()
152
+
153
+ val service = SlowServiceImpl ()
154
+
155
+ val serverJob = GlobalScope .launch(CoroutineName (" server" )) {
156
+ withContext(newPool) {
157
+ val server = embeddedServer(
158
+ factory = Netty ,
159
+ port = port,
160
+ parentCoroutineContext = newPool,
161
+ ) {
162
+ install(Krpc )
163
+
164
+ routing {
165
+ get {
166
+ call.respondText(" hello" )
167
+ }
168
+
169
+ rpc(" /rpc" ) {
170
+ rpcConfig {
171
+ serialization {
172
+ json()
173
+ }
174
+ }
175
+
176
+ registerService<SlowService > { service }
177
+ }
178
+ }
179
+ }.start(wait = false )
180
+
181
+ serverReady.complete(Unit )
182
+
183
+ dropServer.await()
184
+
185
+ server.stop(shutdownGracePeriod = 100L , shutdownTimeout = 100L , timeUnit = TimeUnit .MILLISECONDS )
186
+ }
187
+
188
+ logger.info { " Server stopped" }
189
+ }
190
+
191
+ val ktorClient = HttpClient (CIO ) {
192
+ installKrpc {
193
+ serialization {
194
+ json()
195
+ }
196
+ }
197
+ }
198
+
199
+ serverReady.await()
200
+
201
+ assertEquals(" hello" , ktorClient.get(" http://0.0.0.0:$port " ).bodyAsText())
202
+
203
+ val rpcClient = ktorClient.rpc(" ws://0.0.0.0:$port /rpc" )
204
+
205
+ launch {
206
+ try {
207
+ rpcClient.withService<SlowService >().verySlow()
208
+ error(" Must not be called" )
209
+ } catch (_: CancellationException ) {
210
+ logger.info { " Cancellation exception caught for RPC request" }
211
+ ensureActive()
212
+ }
213
+ }
214
+
215
+ service.received.await()
216
+
217
+ logger.info { " Received RPC request" }
218
+
219
+ dropServer.complete(Unit )
220
+
221
+ logger.info { " Waiting for RPC client to complete" }
222
+
223
+ (rpcClient as KrpcClient ).awaitCompletion()
224
+
225
+ logger.info { " RPC client completed" }
226
+
227
+ ktorClient.close()
228
+ newPool.close()
229
+
230
+ serverJob.cancel()
231
+ }
232
+
233
+ private fun findFreePort (): Int {
234
+ val port: Int
235
+ while (true ) {
236
+ val socket = try {
237
+ ServerSocket (0 )
238
+ } catch (_: Throwable ) {
239
+ continue
240
+ }
241
+
242
+ port = socket.localPort
243
+ socket.close()
244
+ break
245
+ }
246
+ return port
247
+ }
248
+
249
+ private fun setupLogger (): Logger {
250
+ val logger = LoggerFactory .getLogger(KtorTransportTest ::class .java)
251
+
252
+ RpcInternalDumpLoggerContainer .set(object : RpcInternalDumpLogger {
253
+
254
+ override val isEnabled: Boolean = true
255
+
256
+ override fun dump (vararg tags : String , message : () -> String ) {
257
+ logger.info { " [${tags.joinToString()} ] ${message()} " }
258
+ }
259
+ })
260
+
261
+ return logger
262
+ }
99
263
}
0 commit comments