@@ -97,53 +97,37 @@ internal class OperationRepo(
97
97
* dedicated thread.
98
98
*/
99
99
private suspend fun processQueueForever () {
100
- var lastSyncTime = _time .currentTimeMillis
101
- var force = false
102
-
103
- // This runs forever, until the application is destroyed.
100
+ var force = waitForWake()
104
101
while (true ) {
105
102
if (paused) {
106
103
Logging .debug(" OperationRepo is paused" )
107
104
return
108
105
}
109
- try {
110
- val ops = getNextOps()
111
-
112
- // if the queue is empty at this point, we are no longer in force flush mode. We
113
- // check this now so if the execution is unsuccessful with retry, we don't find ourselves
114
- // continuously retrying without delaying.
115
- if (queue.isEmpty()) {
116
- force = false
117
- }
118
-
119
- if (ops != null ) {
120
- executeOperations(ops!! )
121
- }
122
106
123
- if (! force) {
124
- // potentially delay to prevent this from constant IO if a bunch of
125
- // operations are set sequentially.
126
- val newTime = _time .currentTimeMillis
127
-
128
- val delay = (lastSyncTime - newTime) + _configModelStore .model.opRepoExecutionInterval
129
- lastSyncTime = newTime
130
- if (delay > 0 ) {
131
- withTimeoutOrNull(delay) {
132
- // wait to be woken up for the next pass
133
- force = waiter.waitForWake()
134
- }
107
+ val ops = getNextOps()
108
+ Logging .debug(" processQueueForever:force:$force , ops:$ops " )
135
109
136
- // This secondary delay allows for any subsequent operations (beyond the first one
137
- // that woke us) to be enqueued before we pull from the queue.
138
- delay(_configModelStore .model.opRepoPostWakeDelay)
110
+ if (ops != null ) {
111
+ executeOperations(ops)
112
+ // Allows for any subsequent operations (beyond the first one
113
+ // that woke us) to be enqueued before we pull from the queue.
114
+ delay(_configModelStore .model.opRepoPostWakeDelay)
115
+ } else if (! force) {
116
+ force = waitForWake()
117
+ } else {
118
+ force = false
119
+ }
120
+ }
121
+ }
139
122
140
- lastSyncTime = _time .currentTimeMillis
141
- }
142
- }
143
- } catch (e : Throwable ) {
144
- Logging .log( LogLevel . ERROR , " Error occurred with Operation work loop " , e )
123
+ private suspend fun waitForWake (): Boolean {
124
+ var force = waiter.waitForWake()
125
+ if ( ! force) {
126
+ withTimeoutOrNull( _configModelStore .model.opRepoExecutionInterval ) {
127
+ force = waiter.waitForWake( )
145
128
}
146
129
}
130
+ return force
147
131
}
148
132
149
133
private suspend fun executeOperations (ops : List <OperationQueueItem >) {
@@ -240,7 +224,7 @@ internal class OperationRepo(
240
224
suspend fun delayBeforeRetry (retries : Int ) {
241
225
val delayFor = retries * 15_000L
242
226
if (delayFor < 1 ) return
243
- Logging .error(" Operations being delay for: $delayFor " )
227
+ Logging .error(" Operations being delay for: $delayFor ms " )
244
228
delay(delayFor)
245
229
}
246
230
0 commit comments