From e35e132c3aa9974a615b90b89978bc3675ee31fd Mon Sep 17 00:00:00 2001 From: Dandan Meng Date: Fri, 16 May 2025 15:12:08 +0800 Subject: [PATCH 1/3] fix the batch size not respected issue --- .../network/http/BatchingHttpInterceptor.kt | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/BatchingHttpInterceptor.kt b/libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/BatchingHttpInterceptor.kt index 6b95b961dee..000efc1b311 100644 --- a/libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/BatchingHttpInterceptor.kt +++ b/libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/BatchingHttpInterceptor.kt @@ -103,22 +103,31 @@ class BatchingHttpInterceptor @JvmOverloads constructor( val sendNow = mutex.withLock { // if there was an error, the previous job was already canceled, ignore that error pendingRequests.add(pendingRequest) - pendingRequests.size >= maxBatchSize + val batchFull = pendingRequests.size >= maxBatchSize + if (batchFull) { + executePendingRequests(needLock = false) + } + batchFull } - if (sendNow) { - executePendingRequests() - } else { + + if (!sendNow) { scope.launch { delay(batchIntervalMillis - (startMark.elapsedNow().inWholeMilliseconds % batchIntervalMillis) - 1) - executePendingRequests() + executePendingRequests(needLock = true) } } return pendingRequest.deferred.await() } - private suspend fun executePendingRequests() { - val pending = mutex.withLock { + private suspend fun executePendingRequests(needLock: Boolean) { + val pending = if (needLock) { + mutex.withLock { + val copy = pendingRequests.toList() + pendingRequests.clear() + copy + } + } else { val copy = pendingRequests.toList() pendingRequests.clear() copy From a7d7c5880981464c1759ae0f049a1eff6bd76a4b Mon Sep 17 00:00:00 2001 From: BoD Date: Fri, 16 May 2025 12:12:16 +0200 Subject: [PATCH 2/3] Add a unit test --- .../kotlin/test/batching/QueryBatchingTest.kt | 57 +++++++++++++++++-- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt b/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt index 614804b49f9..314b3e53777 100644 --- a/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt +++ b/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt @@ -4,15 +4,22 @@ import batching.GetLaunch2Query import batching.GetLaunchQuery import com.apollographql.apollo.ApolloClient import com.apollographql.apollo.api.AnyAdapter +import com.apollographql.apollo.api.ApolloResponse import com.apollographql.apollo.api.CustomScalarAdapters import com.apollographql.apollo.api.ExecutionOptions.Companion.CAN_BE_BATCHED import com.apollographql.apollo.api.http.HttpHeader import com.apollographql.apollo.api.json.jsonReader +import com.apollographql.apollo.testing.internal.runTest +import com.apollographql.mockserver.MockRequest +import com.apollographql.mockserver.MockRequestBase +import com.apollographql.mockserver.MockResponse import com.apollographql.mockserver.MockServer +import com.apollographql.mockserver.MockServerHandler import com.apollographql.mockserver.awaitRequest import com.apollographql.mockserver.enqueueString -import com.apollographql.apollo.testing.internal.runTest +import kotlinx.coroutines.Deferred import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.delay import okio.Buffer import kotlin.test.Ignore @@ -21,7 +28,9 @@ import kotlin.test.assertEquals import kotlin.test.assertFails import kotlin.test.assertFalse import kotlin.test.assertIs +import kotlin.test.assertNotNull import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds class QueryBatchingTest { private lateinit var mockServer: MockServer @@ -90,7 +99,7 @@ class QueryBatchingTest { // Only one request must have been sent assertFails { - mockServer.awaitRequest() + mockServer.awaitRequest(3.seconds) } } @@ -217,7 +226,7 @@ class QueryBatchingTest { // Only one request must have been sent assertFails { - mockServer.awaitRequest() + mockServer.awaitRequest(3.seconds) } } @@ -251,7 +260,7 @@ class QueryBatchingTest { val request = mockServer.awaitRequest() // Only one request must have been sent assertFails { - mockServer.awaitRequest() + mockServer.awaitRequest(3.seconds) } assertTrue(request.headers["client0"] == "0") assertTrue(request.headers["client1"] == "1") @@ -296,4 +305,44 @@ class QueryBatchingTest { assertFalse(request.headers.keys.contains("query1+query2-different-value")) assertFalse(request.headers.keys.contains(CAN_BE_BATCHED)) } + + @Test + fun batchSizeIsHonoredWithConcurrency() = runTest(before = { setUp() }, after = { tearDown() }) { + mockServer = MockServer.Builder().handler(object : MockServerHandler { + override fun handle(request: MockRequestBase): MockResponse { + val jsonReader = Buffer().write((request as MockRequest).body).jsonReader() + jsonReader.beginArray() + var arrayLength = 0 + while (jsonReader.hasNext()) { + jsonReader.skipValue() + arrayLength++ + } + + // Check we never receive more than maxBatchSize queries + assertTrue(arrayLength <= 10) + + return MockResponse.Builder() + .body("[" + List(arrayLength) { """{"data":{"launch":{"id":"83"}}}""" }.joinToString() + "]") + .build() + } + }).build() + + apolloClient = ApolloClient.Builder() + .serverUrl(mockServer.url()) + .httpBatching(maxBatchSize = 10, batchIntervalMillis = 10) + .build() + + val deferredResults = mutableListOf>>() + repeat(100) { + deferredResults += async { + delay(1) + apolloClient.query(GetLaunchQuery()) + .canBeBatched(true) + .execute() + } + } + for (apolloResponse in deferredResults.awaitAll()) { + assertNotNull(apolloResponse.data) + } + } } From 666f752f305777cfe153a2d696f0ef02264ea200 Mon Sep 17 00:00:00 2001 From: BoD Date: Fri, 16 May 2025 17:51:00 +0200 Subject: [PATCH 3/3] Fix node tests hanging --- .../src/commonTest/kotlin/test/batching/QueryBatchingTest.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt b/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt index 314b3e53777..3a541d6570f 100644 --- a/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt +++ b/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt @@ -308,6 +308,7 @@ class QueryBatchingTest { @Test fun batchSizeIsHonoredWithConcurrency() = runTest(before = { setUp() }, after = { tearDown() }) { + mockServer.close() mockServer = MockServer.Builder().handler(object : MockServerHandler { override fun handle(request: MockRequestBase): MockResponse { val jsonReader = Buffer().write((request as MockRequest).body).jsonReader()