Skip to content

Commit 5d20fa8

Browse files
authored
Merge pull request #2053 from OneSignal/fix/op-repo-add-bucketing-to-limit-network-calls-further
[Fix] Prevent operations added to queue while it's processing from being executed without delay
2 parents 8898cb8 + 8001c6e commit 5d20fa8

File tree

2 files changed

+179
-25
lines changed

2 files changed

+179
-25
lines changed

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ internal class OperationRepo(
2626
internal class OperationQueueItem(
2727
val operation: Operation,
2828
val waiter: WaiterWithValue<Boolean>? = null,
29+
val bucket: Int,
2930
var retries: Int = 0,
3031
) {
3132
override fun toString(): String {
32-
return Pair(operation.toString(), retries).toString() + "\n"
33+
return "bucket:$bucket, retries:$retries, operation:$operation\n"
3334
}
3435
}
3536

@@ -38,6 +39,27 @@ internal class OperationRepo(
3839
private val waiter = WaiterWithValue<Boolean>()
3940
private var paused = false
4041

42+
/** *** Buckets ***
43+
* Purpose: Bucketing is a pattern we are using to help save network
44+
* calls. It works together with opRepoExecutionInterval to define
45+
* a time window operations can be added to the bucket.
46+
*
47+
* When enqueue() is called it creates a new OperationQueueItem with it's
48+
* bucket = enqueueIntoBucket. Just before we start processing a bucket we
49+
* enqueueIntoBucket++, this ensures anything new that comes in while
50+
* executing doesn't cause it to skip the opRepoExecutionInterval delay.
51+
*
52+
* NOTE: Bucketing only effects the starting operation we grab.
53+
* The reason is we still want getGroupableOperations() to find
54+
* other operations it can execute in one go (same network call).
55+
* It's more efficient overall, as it lowers the total number of
56+
* network calls.
57+
*/
58+
private var enqueueIntoBucket = 0
59+
private val executeBucket: Int get() {
60+
return if (enqueueIntoBucket == 0) 0 else enqueueIntoBucket - 1
61+
}
62+
4163
init {
4264
val executorsMap: MutableMap<String, IOperationExecutor> = mutableMapOf()
4365

@@ -49,7 +71,7 @@ internal class OperationRepo(
4971
this.executorsMap = executorsMap
5072

5173
for (operation in _operationModelStore.list()) {
52-
internalEnqueue(OperationQueueItem(operation), flush = false, addToStore = false)
74+
internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush = false, addToStore = false)
5375
}
5476
}
5577

@@ -73,7 +95,7 @@ internal class OperationRepo(
7395
Logging.log(LogLevel.DEBUG, "OperationRepo.enqueue(operation: $operation, flush: $flush)")
7496

7597
operation.id = UUID.randomUUID().toString()
76-
internalEnqueue(OperationQueueItem(operation), flush, true)
98+
internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush, true)
7799
}
78100

79101
override suspend fun enqueueAndWait(
@@ -84,7 +106,7 @@ internal class OperationRepo(
84106

85107
operation.id = UUID.randomUUID().toString()
86108
val waiter = WaiterWithValue<Boolean>()
87-
internalEnqueue(OperationQueueItem(operation, waiter), flush, true)
109+
internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true)
88110
return waiter.waitForWake()
89111
}
90112

@@ -109,13 +131,14 @@ internal class OperationRepo(
109131
*/
110132
private suspend fun processQueueForever() {
111133
waitForNewOperationAndExecutionInterval()
134+
enqueueIntoBucket++
112135
while (true) {
113136
if (paused) {
114137
Logging.debug("OperationRepo is paused")
115138
return
116139
}
117140

118-
val ops = getNextOps()
141+
val ops = getNextOps(executeBucket)
119142
Logging.debug("processQueueForever:ops:\n$ops")
120143

121144
if (ops != null) {
@@ -125,6 +148,7 @@ internal class OperationRepo(
125148
delay(_configModelStore.model.opRepoPostWakeDelay)
126149
} else {
127150
waitForNewOperationAndExecutionInterval()
151+
enqueueIntoBucket++
128152
}
129153
}
130154
}
@@ -151,7 +175,7 @@ internal class OperationRepo(
151175
}
152176
}
153177

154-
private suspend fun executeOperations(ops: List<OperationQueueItem>) {
178+
internal suspend fun executeOperations(ops: List<OperationQueueItem>) {
155179
try {
156180
val startingOp = ops.first()
157181
val executor =
@@ -227,7 +251,7 @@ internal class OperationRepo(
227251
synchronized(queue) {
228252
for (op in response.operations.reversed()) {
229253
op.id = UUID.randomUUID().toString()
230-
val queueItem = OperationQueueItem(op)
254+
val queueItem = OperationQueueItem(op, bucket = 0)
231255
queue.add(0, queueItem)
232256
_operationModelStore.add(0, queueItem.operation)
233257
}
@@ -249,9 +273,12 @@ internal class OperationRepo(
249273
delay(delayFor)
250274
}
251275

252-
internal fun getNextOps(): List<OperationQueueItem>? {
276+
internal fun getNextOps(bucketFilter: Int): List<OperationQueueItem>? {
253277
return synchronized(queue) {
254-
val startingOp = queue.firstOrNull { it.operation.canStartExecute }
278+
val startingOp =
279+
queue.firstOrNull {
280+
it.operation.canStartExecute && it.bucket <= bucketFilter
281+
}
255282

256283
if (startingOp != null) {
257284
queue.remove(startingOp)

OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt

Lines changed: 143 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.onesignal.core.internal.operations
22

33
import com.onesignal.common.threading.Waiter
4+
import com.onesignal.common.threading.WaiterWithValue
45
import com.onesignal.core.internal.operations.impl.OperationModelStore
56
import com.onesignal.core.internal.operations.impl.OperationRepo
7+
import com.onesignal.core.internal.operations.impl.OperationRepo.OperationQueueItem
68
import com.onesignal.core.internal.time.impl.Time
79
import com.onesignal.debug.LogLevel
810
import com.onesignal.debug.internal.logging.Logging
@@ -11,6 +13,7 @@ import io.kotest.core.spec.style.FunSpec
1113
import io.kotest.matchers.shouldBe
1214
import io.mockk.CapturingSlot
1315
import io.mockk.coEvery
16+
import io.mockk.coVerify
1417
import io.mockk.coVerifyOrder
1518
import io.mockk.every
1619
import io.mockk.just
@@ -43,17 +46,16 @@ private class Mocks {
4346
mockExecutor
4447
}
4548

46-
val operationRepo: OperationRepo =
47-
run {
48-
spyk(
49-
OperationRepo(
50-
listOf(executor),
51-
operationModelStore,
52-
configModelStore,
53-
Time(),
54-
),
55-
)
56-
}
49+
val operationRepo: OperationRepo by lazy {
50+
spyk(
51+
OperationRepo(
52+
listOf(executor),
53+
operationModelStore,
54+
configModelStore,
55+
Time(),
56+
),
57+
)
58+
}
5759
}
5860

5961
class OperationRepoTests : FunSpec({
@@ -102,7 +104,7 @@ class OperationRepoTests : FunSpec({
102104
// 1st: gets the operation
103105
// 2nd: will be empty
104106
// 3rd: shouldn't be called, loop should be waiting on next operation
105-
mocks.operationRepo.getNextOps()
107+
mocks.operationRepo.getNextOps(withArg { Any() })
106108
}
107109
}
108110

@@ -135,7 +137,8 @@ class OperationRepoTests : FunSpec({
135137
test("enqueue operation executes and is removed when executed after retry") {
136138
// Given
137139
val mocks = Mocks()
138-
coEvery { mocks.operationRepo.delayBeforeRetry(any()) } just runs
140+
val opRepo = mocks.operationRepo
141+
coEvery { opRepo.delayBeforeRetry(any()) } just runs
139142
coEvery {
140143
mocks.executor.execute(any())
141144
} returns ExecutionResponse(ExecutionResult.FAIL_RETRY) andThen ExecutionResponse(ExecutionResult.SUCCESS)
@@ -144,8 +147,8 @@ class OperationRepoTests : FunSpec({
144147
val operation = mockOperation(operationIdSlot = operationIdSlot)
145148

146149
// When
147-
mocks.operationRepo.start()
148-
val response = mocks.operationRepo.enqueueAndWait(operation)
150+
opRepo.start()
151+
val response = opRepo.enqueueAndWait(operation)
149152

150153
// Then
151154
response shouldBe true
@@ -158,7 +161,7 @@ class OperationRepoTests : FunSpec({
158161
it[0] shouldBe operation
159162
},
160163
)
161-
mocks.operationRepo.delayBeforeRetry(1)
164+
opRepo.delayBeforeRetry(1)
162165
mocks.executor.execute(
163166
withArg {
164167
it.count() shouldBe 1
@@ -370,6 +373,118 @@ class OperationRepoTests : FunSpec({
370373
}
371374
response shouldBe true
372375
}
376+
377+
// This ensures a misbehaving app can't add operations (such as addTag())
378+
// in a tight loop and cause a number of back-to-back operations without delay.
379+
test("operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval") {
380+
// Given
381+
val mocks = Mocks()
382+
mocks.configModelStore.model.opRepoExecutionInterval = 100
383+
val enqueueAndWaitMaxTime = mocks.configModelStore.model.opRepoExecutionInterval / 2
384+
val opRepo = mocks.operationRepo
385+
386+
val executeOperationsCall = mockExecuteOperations(opRepo)
387+
388+
// When
389+
opRepo.start()
390+
opRepo.enqueue(mockOperationNonGroupable())
391+
executeOperationsCall.waitForWake()
392+
val secondEnqueueResult =
393+
withTimeoutOrNull(enqueueAndWaitMaxTime) {
394+
opRepo.enqueueAndWait(mockOperationNonGroupable())
395+
}
396+
397+
// Then
398+
secondEnqueueResult shouldBe null
399+
coVerify(exactly = 1) {
400+
opRepo.executeOperations(any())
401+
}
402+
}
403+
404+
// This ensures there are no off-by-one errors with the same scenario as above, but on a 2nd
405+
// pass of OperationRepo
406+
test("operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval, 2nd pass") {
407+
// Given
408+
val mocks = Mocks()
409+
mocks.configModelStore.model.opRepoExecutionInterval = 100
410+
val enqueueAndWaitMaxTime = mocks.configModelStore.model.opRepoExecutionInterval / 2
411+
val opRepo = mocks.operationRepo
412+
413+
val executeOperationsCall = mockExecuteOperations(opRepo)
414+
415+
// When
416+
opRepo.start()
417+
opRepo.enqueue(mockOperationNonGroupable())
418+
executeOperationsCall.waitForWake()
419+
opRepo.enqueueAndWait(mockOperationNonGroupable())
420+
val thirdEnqueueResult =
421+
withTimeoutOrNull(enqueueAndWaitMaxTime) {
422+
opRepo.enqueueAndWait(mockOperationNonGroupable())
423+
}
424+
425+
// Then
426+
thirdEnqueueResult shouldBe null
427+
coVerify(exactly = 2) {
428+
opRepo.executeOperations(any())
429+
}
430+
}
431+
432+
// Starting operations are operations we didn't process the last time the app was running.
433+
// We want to ensure we process them, but only after the standard batching delay to be as
434+
// optional as possible with network calls.
435+
test("starting OperationModelStore should be processed, following normal delay rules") {
436+
// Given
437+
val mocks = Mocks()
438+
mocks.configModelStore.model.opRepoExecutionInterval = 100
439+
every { mocks.operationModelStore.list() } returns listOf(mockOperation())
440+
val executeOperationsCall = mockExecuteOperations(mocks.operationRepo)
441+
442+
// When
443+
mocks.operationRepo.start()
444+
val immediateResult =
445+
withTimeoutOrNull(100) {
446+
executeOperationsCall.waitForWake()
447+
}
448+
val delayedResult =
449+
withTimeoutOrNull(200) {
450+
executeOperationsCall.waitForWake()
451+
}
452+
453+
// Then
454+
immediateResult shouldBe null
455+
delayedResult shouldBe true
456+
}
457+
458+
test("ensure results from executeOperations are added to beginning of the queue") {
459+
// Given
460+
val mocks = Mocks()
461+
val executor = mocks.executor
462+
val opWithResult = mockOperationNonGroupable()
463+
val opFromResult = mockOperationNonGroupable()
464+
coEvery {
465+
executor.execute(listOf(opWithResult))
466+
} coAnswers {
467+
ExecutionResponse(ExecutionResult.SUCCESS, operations = listOf(opFromResult))
468+
}
469+
val firstOp = mockOperationNonGroupable()
470+
val secondOp = mockOperationNonGroupable()
471+
472+
// When
473+
mocks.operationRepo.start()
474+
mocks.operationRepo.enqueue(firstOp)
475+
mocks.operationRepo.executeOperations(
476+
listOf(OperationQueueItem(opWithResult, bucket = 0)),
477+
)
478+
mocks.operationRepo.enqueueAndWait(secondOp)
479+
480+
// Then
481+
coVerifyOrder {
482+
executor.execute(withArg { it[0] shouldBe opWithResult })
483+
executor.execute(withArg { it[0] shouldBe opFromResult })
484+
executor.execute(withArg { it[0] shouldBe firstOp })
485+
executor.execute(withArg { it[0] shouldBe secondOp })
486+
}
487+
}
373488
}) {
374489
companion object {
375490
private fun mockOperation(
@@ -395,5 +510,17 @@ class OperationRepoTests : FunSpec({
395510

396511
return operation
397512
}
513+
514+
private fun mockOperationNonGroupable() = mockOperation(groupComparisonType = GroupComparisonType.NONE)
515+
516+
private fun mockExecuteOperations(opRepo: OperationRepo): WaiterWithValue<Boolean> {
517+
val executeWaiter = WaiterWithValue<Boolean>()
518+
coEvery { opRepo.executeOperations(any()) } coAnswers {
519+
executeWaiter.wake(true)
520+
delay(10)
521+
firstArg<List<OperationRepo.OperationQueueItem>>().forEach { it.waiter?.wake(true) }
522+
}
523+
return executeWaiter
524+
}
398525
}
399526
}

0 commit comments

Comments
 (0)