Skip to content

Commit bf2cfda

Browse files
committed
fix multiple enqueues skipping waitForWake
We renamed waitForWake to waitForNewOperationAndExecutionInterval to describe that it is doing two things. Fixed the logic where enqueuing two or more operations would skip our executionInterval logic. Added test coverage for this logic, as well as a 2nd test to ensure flush can still skip waiting for it. Lastly cleaned up the force state in the main processQueueForever loop, we were able to fully encapsulate it into waitForNewOperationAndExecutionInterval. The other part of the logic is once we start processing the queue we want to do so until it's empty again, which ops != null is the only checked needed in processQueueForever to cover that.
1 parent 1c2fd11 commit bf2cfda

File tree

2 files changed

+57
-11
lines changed

2 files changed

+57
-11
lines changed

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,37 +97,47 @@ internal class OperationRepo(
9797
* dedicated thread.
9898
*/
9999
private suspend fun processQueueForever() {
100-
var force = waitForWake()
100+
waitForNewOperationAndExecutionInterval()
101101
while (true) {
102102
if (paused) {
103103
Logging.debug("OperationRepo is paused")
104104
return
105105
}
106106

107107
val ops = getNextOps()
108-
Logging.debug("processQueueForever:force:$force, ops:$ops")
108+
Logging.debug("processQueueForever:ops:$ops")
109109

110110
if (ops != null) {
111111
executeOperations(ops)
112112
// Allows for any subsequent operations (beyond the first one
113113
// that woke us) to be enqueued before we pull from the queue.
114114
delay(_configModelStore.model.opRepoPostWakeDelay)
115-
} else if (!force) {
116-
force = waitForWake()
117115
} else {
118-
force = false
116+
waitForNewOperationAndExecutionInterval()
119117
}
120118
}
121119
}
122120

123-
private suspend fun waitForWake(): Boolean {
121+
/**
122+
* Waits until a new operation is enqueued, then wait an additional
123+
* amount of time afterwards, so operations can be grouped/batched.
124+
*/
125+
private suspend fun waitForNewOperationAndExecutionInterval() {
126+
// 1. Wait for an operation to be enqueued
124127
var force = waiter.waitForWake()
125-
if (!force) {
126-
withTimeoutOrNull(_configModelStore.model.opRepoExecutionInterval) {
128+
129+
// 2. Wait at least the time defined in opRepoExecutionInterval
130+
// so operations can be grouped, unless one of them used
131+
// flush=true (AKA force)
132+
var lastTime = _time.currentTimeMillis
133+
var remainingTime = _configModelStore.model.opRepoExecutionInterval
134+
while (!force && remainingTime > 0) {
135+
withTimeoutOrNull(remainingTime) {
127136
force = waiter.waitForWake()
128137
}
138+
remainingTime -= _time.currentTimeMillis - lastTime
139+
lastTime = _time.currentTimeMillis
129140
}
130-
return force
131141
}
132142

133143
private suspend fun executeOperations(ops: List<OperationQueueItem>) {

OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.onesignal.core.internal.operations
33
import com.onesignal.common.threading.Waiter
44
import com.onesignal.core.internal.operations.impl.OperationModelStore
55
import com.onesignal.core.internal.operations.impl.OperationRepo
6+
import com.onesignal.core.internal.time.impl.Time
67
import com.onesignal.debug.LogLevel
78
import com.onesignal.debug.internal.logging.Logging
89
import com.onesignal.mocks.MockHelper
@@ -19,9 +20,12 @@ import io.mockk.slot
1920
import io.mockk.spyk
2021
import io.mockk.verify
2122
import kotlinx.coroutines.delay
23+
import kotlinx.coroutines.withTimeoutOrNull
2224

2325
// Mocks used by every test in this file
2426
private class Mocks {
27+
val configModelStore = MockHelper.configModelStore()
28+
2529
val operationModelStore: OperationModelStore =
2630
run {
2731
val mockOperationModelStore = mockk<OperationModelStore>()
@@ -45,8 +49,8 @@ private class Mocks {
4549
OperationRepo(
4650
listOf(executor),
4751
operationModelStore,
48-
MockHelper.configModelStore(),
49-
MockHelper.time(1000),
52+
configModelStore,
53+
Time(),
5054
),
5155
)
5256
}
@@ -312,6 +316,38 @@ class OperationRepoTests : FunSpec({
312316
mocks.operationModelStore.remove("operationId2")
313317
}
314318
}
319+
320+
test("enqueuing normal operations should not skip minimum wait time") {
321+
// Given
322+
val mocks = Mocks()
323+
mocks.configModelStore.model.opRepoExecutionInterval = 1_000
324+
325+
// When
326+
mocks.operationRepo.start()
327+
mocks.operationRepo.enqueue(mockOperation())
328+
val response =
329+
withTimeoutOrNull(100) {
330+
val value = mocks.operationRepo.enqueueAndWait(mockOperation())
331+
value
332+
}
333+
response shouldBe null
334+
}
335+
336+
test("enqueuing with flush = true should skip minimum wait time") {
337+
// Given
338+
val mocks = Mocks()
339+
mocks.configModelStore.model.opRepoExecutionInterval = 1_000
340+
341+
// When
342+
mocks.operationRepo.start()
343+
mocks.operationRepo.enqueue(mockOperation())
344+
val response =
345+
withTimeoutOrNull(100) {
346+
val value = mocks.operationRepo.enqueueAndWait(mockOperation(), flush = true)
347+
value
348+
}
349+
response shouldBe true
350+
}
315351
}) {
316352
companion object {
317353
private fun mockOperation(

0 commit comments

Comments
 (0)