Skip to content

Commit 0f718b7

Browse files
committed
save network calls on OpRepo by adding bucketing
Buckets Purpose: Bucketing is a pattern we are using to help save network calls. It works together with opRepoExecutionInterval to define a time window operations can be added to the bucket. When enqueue() is called it creates a new OperationQueueItem with it's bucket = enqueueIntoBucket. Just before we start processing a bucket we enqueueIntoBucket++, this ensures anything new that comes in while executing doesn't cause it to skip the opRepoExecutionInterval delay. NOTE: Bucketing only effects the starting operation we grab. The reason is we still want getGroupableOperations() to find other operations it can execute in one go (same network call). It's more efficient overall, as it lowers the total number of network calls. This address the failing test "operations enqueued while repo is executing should be executed only after the next opRepoExecutionInterval" we added in commit 0988977
1 parent 0988977 commit 0f718b7

File tree

2 files changed

+36
-9
lines changed

2 files changed

+36
-9
lines changed

OneSignalSDK/onesignal/core/src/main/java/com/onesignal/core/internal/operations/impl/OperationRepo.kt

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ internal class OperationRepo(
2626
internal class OperationQueueItem(
2727
val operation: Operation,
2828
val waiter: WaiterWithValue<Boolean>? = null,
29+
val bucket: Int,
2930
var retries: Int = 0,
3031
) {
3132
override fun toString(): String {
32-
return Pair(operation.toString(), retries).toString() + "\n"
33+
return "bucket:$bucket, retries:$retries, operation:$operation\n"
3334
}
3435
}
3536

@@ -38,6 +39,27 @@ internal class OperationRepo(
3839
private val waiter = WaiterWithValue<Boolean>()
3940
private var paused = false
4041

42+
/** *** Buckets ***
43+
* Purpose: Bucketing is a pattern we are using to help save network
44+
* calls. It works together with opRepoExecutionInterval to define
45+
* a time window operations can be added to the bucket.
46+
*
47+
* When enqueue() is called it creates a new OperationQueueItem with it's
48+
* bucket = enqueueIntoBucket. Just before we start processing a bucket we
49+
* enqueueIntoBucket++, this ensures anything new that comes in while
50+
* executing doesn't cause it to skip the opRepoExecutionInterval delay.
51+
*
52+
* NOTE: Bucketing only effects the starting operation we grab.
53+
* The reason is we still want getGroupableOperations() to find
54+
* other operations it can execute in one go (same network call).
55+
* It's more efficient overall, as it lowers the total number of
56+
* network calls.
57+
*/
58+
private var enqueueIntoBucket = 0
59+
private val executeBucket: Int get() {
60+
return if (enqueueIntoBucket == 0) 0 else enqueueIntoBucket - 1
61+
}
62+
4163
init {
4264
val executorsMap: MutableMap<String, IOperationExecutor> = mutableMapOf()
4365

@@ -49,7 +71,7 @@ internal class OperationRepo(
4971
this.executorsMap = executorsMap
5072

5173
for (operation in _operationModelStore.list()) {
52-
internalEnqueue(OperationQueueItem(operation), flush = false, addToStore = false)
74+
internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush = false, addToStore = false)
5375
}
5476
}
5577

@@ -73,7 +95,7 @@ internal class OperationRepo(
7395
Logging.log(LogLevel.DEBUG, "OperationRepo.enqueue(operation: $operation, flush: $flush)")
7496

7597
operation.id = UUID.randomUUID().toString()
76-
internalEnqueue(OperationQueueItem(operation), flush, true)
98+
internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush, true)
7799
}
78100

79101
override suspend fun enqueueAndWait(
@@ -84,7 +106,7 @@ internal class OperationRepo(
84106

85107
operation.id = UUID.randomUUID().toString()
86108
val waiter = WaiterWithValue<Boolean>()
87-
internalEnqueue(OperationQueueItem(operation, waiter), flush, true)
109+
internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true)
88110
return waiter.waitForWake()
89111
}
90112

@@ -109,13 +131,14 @@ internal class OperationRepo(
109131
*/
110132
private suspend fun processQueueForever() {
111133
waitForNewOperationAndExecutionInterval()
134+
enqueueIntoBucket++
112135
while (true) {
113136
if (paused) {
114137
Logging.debug("OperationRepo is paused")
115138
return
116139
}
117140

118-
val ops = getNextOps()
141+
val ops = getNextOps(executeBucket)
119142
Logging.debug("processQueueForever:ops:\n$ops")
120143

121144
if (ops != null) {
@@ -125,6 +148,7 @@ internal class OperationRepo(
125148
delay(_configModelStore.model.opRepoPostWakeDelay)
126149
} else {
127150
waitForNewOperationAndExecutionInterval()
151+
enqueueIntoBucket++
128152
}
129153
}
130154
}
@@ -227,7 +251,7 @@ internal class OperationRepo(
227251
synchronized(queue) {
228252
for (op in response.operations.reversed()) {
229253
op.id = UUID.randomUUID().toString()
230-
val queueItem = OperationQueueItem(op)
254+
val queueItem = OperationQueueItem(op, bucket = 0)
231255
queue.add(0, queueItem)
232256
_operationModelStore.add(0, queueItem.operation)
233257
}
@@ -249,9 +273,12 @@ internal class OperationRepo(
249273
delay(delayFor)
250274
}
251275

252-
internal fun getNextOps(): List<OperationQueueItem>? {
276+
internal fun getNextOps(bucketFilter: Int): List<OperationQueueItem>? {
253277
return synchronized(queue) {
254-
val startingOp = queue.firstOrNull { it.operation.canStartExecute }
278+
val startingOp =
279+
queue.firstOrNull {
280+
it.operation.canStartExecute && it.bucket <= bucketFilter
281+
}
255282

256283
if (startingOp != null) {
257284
queue.remove(startingOp)

OneSignalSDK/onesignal/core/src/test/java/com/onesignal/core/internal/operations/OperationRepoTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class OperationRepoTests : FunSpec({
103103
// 1st: gets the operation
104104
// 2nd: will be empty
105105
// 3rd: shouldn't be called, loop should be waiting on next operation
106-
mocks.operationRepo.getNextOps()
106+
mocks.operationRepo.getNextOps(withArg { Any() })
107107
}
108108
}
109109

0 commit comments

Comments
 (0)