diff --git a/libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/BatchingHttpInterceptor.kt b/libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/BatchingHttpInterceptor.kt index 6b95b961dee..000efc1b311 100644 --- a/libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/BatchingHttpInterceptor.kt +++ b/libraries/apollo-runtime/src/commonMain/kotlin/com/apollographql/apollo/network/http/BatchingHttpInterceptor.kt @@ -103,22 +103,31 @@ class BatchingHttpInterceptor @JvmOverloads constructor( val sendNow = mutex.withLock { // if there was an error, the previous job was already canceled, ignore that error pendingRequests.add(pendingRequest) - pendingRequests.size >= maxBatchSize + val batchFull = pendingRequests.size >= maxBatchSize + if (batchFull) { + executePendingRequests(needLock = false) + } + batchFull } - if (sendNow) { - executePendingRequests() - } else { + + if (!sendNow) { scope.launch { delay(batchIntervalMillis - (startMark.elapsedNow().inWholeMilliseconds % batchIntervalMillis) - 1) - executePendingRequests() + executePendingRequests(needLock = true) } } return pendingRequest.deferred.await() } - private suspend fun executePendingRequests() { - val pending = mutex.withLock { + private suspend fun executePendingRequests(needLock: Boolean) { + val pending = if (needLock) { + mutex.withLock { + val copy = pendingRequests.toList() + pendingRequests.clear() + copy + } + } else { val copy = pendingRequests.toList() pendingRequests.clear() copy diff --git a/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt b/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt index 614804b49f9..3a541d6570f 100644 --- a/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt +++ b/tests/integration-tests/src/commonTest/kotlin/test/batching/QueryBatchingTest.kt @@ -4,15 +4,22 @@ import batching.GetLaunch2Query import batching.GetLaunchQuery import com.apollographql.apollo.ApolloClient import com.apollographql.apollo.api.AnyAdapter +import com.apollographql.apollo.api.ApolloResponse import com.apollographql.apollo.api.CustomScalarAdapters import com.apollographql.apollo.api.ExecutionOptions.Companion.CAN_BE_BATCHED import com.apollographql.apollo.api.http.HttpHeader import com.apollographql.apollo.api.json.jsonReader +import com.apollographql.apollo.testing.internal.runTest +import com.apollographql.mockserver.MockRequest +import com.apollographql.mockserver.MockRequestBase +import com.apollographql.mockserver.MockResponse import com.apollographql.mockserver.MockServer +import com.apollographql.mockserver.MockServerHandler import com.apollographql.mockserver.awaitRequest import com.apollographql.mockserver.enqueueString -import com.apollographql.apollo.testing.internal.runTest +import kotlinx.coroutines.Deferred import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.delay import okio.Buffer import kotlin.test.Ignore @@ -21,7 +28,9 @@ import kotlin.test.assertEquals import kotlin.test.assertFails import kotlin.test.assertFalse import kotlin.test.assertIs +import kotlin.test.assertNotNull import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds class QueryBatchingTest { private lateinit var mockServer: MockServer @@ -90,7 +99,7 @@ class QueryBatchingTest { // Only one request must have been sent assertFails { - mockServer.awaitRequest() + mockServer.awaitRequest(3.seconds) } } @@ -217,7 +226,7 @@ class QueryBatchingTest { // Only one request must have been sent assertFails { - mockServer.awaitRequest() + mockServer.awaitRequest(3.seconds) } } @@ -251,7 +260,7 @@ class QueryBatchingTest { val request = mockServer.awaitRequest() // Only one request must have been sent assertFails { - mockServer.awaitRequest() + mockServer.awaitRequest(3.seconds) } assertTrue(request.headers["client0"] == "0") assertTrue(request.headers["client1"] == "1") @@ -296,4 +305,45 @@ class QueryBatchingTest { assertFalse(request.headers.keys.contains("query1+query2-different-value")) assertFalse(request.headers.keys.contains(CAN_BE_BATCHED)) } + + @Test + fun batchSizeIsHonoredWithConcurrency() = runTest(before = { setUp() }, after = { tearDown() }) { + mockServer.close() + mockServer = MockServer.Builder().handler(object : MockServerHandler { + override fun handle(request: MockRequestBase): MockResponse { + val jsonReader = Buffer().write((request as MockRequest).body).jsonReader() + jsonReader.beginArray() + var arrayLength = 0 + while (jsonReader.hasNext()) { + jsonReader.skipValue() + arrayLength++ + } + + // Check we never receive more than maxBatchSize queries + assertTrue(arrayLength <= 10) + + return MockResponse.Builder() + .body("[" + List(arrayLength) { """{"data":{"launch":{"id":"83"}}}""" }.joinToString() + "]") + .build() + } + }).build() + + apolloClient = ApolloClient.Builder() + .serverUrl(mockServer.url()) + .httpBatching(maxBatchSize = 10, batchIntervalMillis = 10) + .build() + + val deferredResults = mutableListOf>>() + repeat(100) { + deferredResults += async { + delay(1) + apolloClient.query(GetLaunchQuery()) + .canBeBatched(true) + .execute() + } + } + for (apolloResponse in deferredResults.awaitAll()) { + assertNotNull(apolloResponse.data) + } + } }