@@ -22,7 +22,7 @@ internal class OperationRepo(
22
22
private val _configModelStore : ConfigModelStore ,
23
23
private val _time : ITime ,
24
24
) : IOperationRepo, IStartableService {
25
- private class OperationQueueItem (
25
+ internal class OperationQueueItem (
26
26
val operation : Operation ,
27
27
val waiter : WaiterWithValue <Boolean >? = null ,
28
28
var retries : Int = 0 ,
@@ -97,61 +97,46 @@ internal class OperationRepo(
97
97
* dedicated thread.
98
98
*/
99
99
private suspend fun processQueueForever () {
100
- var lastSyncTime = _time .currentTimeMillis
101
- var force = false
102
-
103
- // This runs forever, until the application is destroyed.
100
+ waitForNewOperationAndExecutionInterval()
104
101
while (true ) {
105
102
if (paused) {
106
103
Logging .debug(" OperationRepo is paused" )
107
104
return
108
105
}
109
- try {
110
- var ops: List <OperationQueueItem >? = null
111
-
112
- synchronized(queue) {
113
- val startingOp = queue.firstOrNull { it.operation.canStartExecute }
114
-
115
- if (startingOp != null ) {
116
- queue.remove(startingOp)
117
- ops = getGroupableOperations(startingOp)
118
- }
119
- }
120
106
121
- // if the queue is empty at this point, we are no longer in force flush mode. We
122
- // check this now so if the execution is unsuccessful with retry, we don't find ourselves
123
- // continuously retrying without delaying.
124
- if (queue.isEmpty()) {
125
- force = false
126
- }
127
-
128
- if (ops != null ) {
129
- executeOperations(ops!! )
130
- }
107
+ val ops = getNextOps()
108
+ Logging .debug(" processQueueForever:ops:$ops " )
131
109
132
- if (! force) {
133
- // potentially delay to prevent this from constant IO if a bunch of
134
- // operations are set sequentially.
135
- val newTime = _time .currentTimeMillis
136
-
137
- val delay = (lastSyncTime - newTime) + _configModelStore .model.opRepoExecutionInterval
138
- lastSyncTime = newTime
139
- if (delay > 0 ) {
140
- withTimeoutOrNull(delay) {
141
- // wait to be woken up for the next pass
142
- force = waiter.waitForWake()
143
- }
144
-
145
- // This secondary delay allows for any subsequent operations (beyond the first one
146
- // that woke us) to be enqueued before we pull from the queue.
147
- delay(_configModelStore .model.opRepoPostWakeDelay)
110
+ if (ops != null ) {
111
+ executeOperations(ops)
112
+ // Allows for any subsequent operations (beyond the first one
113
+ // that woke us) to be enqueued before we pull from the queue.
114
+ delay(_configModelStore .model.opRepoPostWakeDelay)
115
+ } else {
116
+ waitForNewOperationAndExecutionInterval()
117
+ }
118
+ }
119
+ }
148
120
149
- lastSyncTime = _time .currentTimeMillis
150
- }
151
- }
152
- } catch (e: Throwable ) {
153
- Logging .log(LogLevel .ERROR , " Error occurred with Operation work loop" , e)
121
+ /* *
122
+ * Waits until a new operation is enqueued, then wait an additional
123
+ * amount of time afterwards, so operations can be grouped/batched.
124
+ */
125
+ private suspend fun waitForNewOperationAndExecutionInterval () {
126
+ // 1. Wait for an operation to be enqueued
127
+ var force = waiter.waitForWake()
128
+
129
+ // 2. Wait at least the time defined in opRepoExecutionInterval
130
+ // so operations can be grouped, unless one of them used
131
+ // flush=true (AKA force)
132
+ var lastTime = _time .currentTimeMillis
133
+ var remainingTime = _configModelStore .model.opRepoExecutionInterval
134
+ while (! force && remainingTime > 0 ) {
135
+ withTimeoutOrNull(remainingTime) {
136
+ force = waiter.waitForWake()
154
137
}
138
+ remainingTime - = _time .currentTimeMillis - lastTime
139
+ lastTime = _time .currentTimeMillis
155
140
}
156
141
}
157
142
@@ -249,10 +234,23 @@ internal class OperationRepo(
249
234
suspend fun delayBeforeRetry (retries : Int ) {
250
235
val delayFor = retries * 15_000L
251
236
if (delayFor < 1 ) return
252
- Logging .error(" Operations being delay for: $delayFor " )
237
+ Logging .error(" Operations being delay for: $delayFor ms " )
253
238
delay(delayFor)
254
239
}
255
240
241
+ internal fun getNextOps (): List <OperationQueueItem >? {
242
+ return synchronized(queue) {
243
+ val startingOp = queue.firstOrNull { it.operation.canStartExecute }
244
+
245
+ if (startingOp != null ) {
246
+ queue.remove(startingOp)
247
+ getGroupableOperations(startingOp)
248
+ } else {
249
+ null
250
+ }
251
+ }
252
+ }
253
+
256
254
/* *
257
255
* Given a starting operation, find and remove from the queue all other operations that
258
256
* can be executed along with the starting operation. The full list of operations, with
0 commit comments