11package kaist.iclab.abclogger
22
33import android.app.ActivityManager
4- import android.app.IntentService
5- import android.app.PendingIntent
64import android.content.Context
7- import android.content.Intent
5+ import android.os.SystemClock
86import android.util.Log
97import androidx.core.app.NotificationCompat
108import androidx.work.*
9+ import com.google.common.util.concurrent.MoreExecutors
1110import io.grpc.ManagedChannel
1211import io.grpc.android.AndroidChannelBuilder
1312import kaist.iclab.abclogger.collector.Base
@@ -31,16 +30,14 @@ import kaist.iclab.abclogger.collector.survey.SurveyEntity
3130import kaist.iclab.abclogger.collector.traffic.DataTrafficEntity
3231import kaist.iclab.abclogger.collector.wifi.WifiEntity
3332import kaist.iclab.abclogger.commons.Notifications
34- import kaist.iclab.abclogger.grpc.DataOperationsCoroutineGrpc
33+ import kaist.iclab.abclogger.grpc.DataOperationsGrpc
3534import kaist.iclab.abclogger.grpc.DatumProto
36- import kotlinx.coroutines.*
35+ import kotlinx.coroutines.Dispatchers
36+ import kotlinx.coroutines.asExecutor
37+ import java.util.concurrent.Semaphore
3738import java.util.concurrent.TimeUnit
3839
39- class SyncWorker (context : Context , params : WorkerParameters ) : CoroutineWorker(context, params) {
40- private val cancelIntent: PendingIntent = PendingIntent .getService(
41- applicationContext, REQUEST_CODE_CANCEL_SYNC , Intent (ACTION_CANCEL_SYNC ), PendingIntent .FLAG_UPDATE_CURRENT
42- )
43-
40+ class SyncWorker (context : Context , params : WorkerParameters ) : Worker(context, params) {
4441 private val foregroundInfo = ForegroundInfo (
4542 Notifications .ID_SYNC_PROGRESS ,
4643 Notifications .build(
@@ -50,7 +47,6 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
5047 text = applicationContext.getString(R .string.ntf_text_sync),
5148 progress = 0 ,
5249 indeterminate = true ,
53- intent = cancelIntent,
5450 actions = listOf (
5551 NotificationCompat .Action .Builder (
5652 R .drawable.baseline_close_white_24,
@@ -61,9 +57,8 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
6157 )
6258 )
6359
64- override suspend fun doWork (): Result = withContext(Dispatchers .IO ) {
65- setForeground(foregroundInfo)
66-
60+ override fun doWork (): Result {
61+ setForegroundAsync(foregroundInfo)
6762
6863 val channel: ManagedChannel = AndroidChannelBuilder
6964 .forTarget(if (BuildConfig .IS_TEST_MODE ) BuildConfig .TEST_SERVER_ADDRESS else BuildConfig .SERVER_ADDRESS )
@@ -72,7 +67,7 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
7267 .executor(Dispatchers .IO .asExecutor())
7368 .build()
7469
75- val stub = DataOperationsCoroutineGrpc .newStubWithContext (channel)
70+ val stub = DataOperationsGrpc .newFutureStub (channel)
7671
7772 uploadAll(stub)
7873
@@ -84,37 +79,45 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
8479 e.printStackTrace()
8580 }
8681
87- return @withContext Result .success()
82+ return Result .success()
8883 }
8984
90- private suspend inline fun <reified T : Base > upload (stub : DataOperationsCoroutineGrpc .DataOperationsCoroutineStub ) = coroutineScope {
91- val ids = ObjBox .query<T >()?.build()?.findIds() ? : return @coroutineScope
92- val size = ids.size
9385
94- (0 until size step N_UPLOADS ).forEach { offset ->
95- while (isLowMemory()) {
96- Log .d(" SyncWorker" , " isLowMemory..." )
97- System .runFinalization()
98- System .gc()
99- delay(TimeUnit .SECONDS .toMillis(5 ))
86+ private inline fun <reified T : Base > upload (stub : DataOperationsGrpc .DataOperationsFutureStub ) {
87+ val limiter = Semaphore (N_UPLOADS )
88+ val ids = ObjBox .query<T >()?.build()?.findIds() ? : return
89+
90+ ids.forEach { id ->
91+ if (isStopped) {
92+ return
10093 }
10194
10295 val deadlineStub = stub.withDeadlineAfter(1 , TimeUnit .MINUTES )
96+ try {
97+ while (isLowMemory()) {
98+ System .runFinalization()
99+ System .gc()
100+ SystemClock .sleep(TimeUnit .SECONDS .toMillis(10 ))
101+ }
103102
104- (offset.. offset + N_UPLOADS ).map { index ->
105- async {
106- try {
107- val id = ids[index]
108- val entity = ObjBox .get<T >(id)
109- ? : throw Exception (" No corresponding entity." )
110- val proto = toProto(entity) ? : throw Exception (" No corresponding protobuf." )
103+ limiter.acquire()
111104
112- deadlineStub.createDatum(proto)
113- ObjBox .remove(entity)
114- } catch (e: Exception ) {
115- }
116- }
117- }.awaitAll()
105+ val entity = ObjBox .get<T >(id) ? : throw Exception (" No corresponding entity." )
106+ val proto = toProto(entity) ? : throw Exception (" No corresponding protobuf." )
107+
108+ deadlineStub.createDatum(proto).addListener({
109+ ObjBox .remove(entity)
110+ limiter.release()
111+ }, { runnable: Runnable ->
112+ MoreExecutors .directExecutor().execute(runnable)
113+ })
114+ } catch (e: Exception ) { }
115+ }
116+
117+ val startTime = SystemClock .elapsedRealtime()
118+
119+ while (limiter.availablePermits() < N_UPLOADS && SystemClock .elapsedRealtime() - startTime < WAIT_TIME_RELEASE ) {
120+ SystemClock .sleep(TimeUnit .SECONDS .toMillis(5 ))
118121 }
119122 }
120123
@@ -124,12 +127,11 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
124127 val runtime = Runtime .getRuntime()
125128 val usedMemory = runtime.totalMemory() - runtime.freeMemory()
126129 val usedPercentage = usedMemory.toFloat() / (maxHeapSize * 1e6).toFloat()
127- Log .d(" SyncWorker" , " usedPercentage: $usedPercentage / maxHeapSize: $maxHeapSize " )
128130
129131 return usedPercentage > 0.5F
130132 }
131133
132- private suspend fun uploadAll (stub : DataOperationsCoroutineGrpc . DataOperationsCoroutineStub ) {
134+ private fun uploadAll (stub : DataOperationsGrpc . DataOperationsFutureStub ) {
133135 upload<PhysicalActivityTransitionEntity >(stub)
134136 upload<PhysicalActivityEntity >(stub)
135137 upload<AppUsageEventEntity >(stub)
@@ -343,18 +345,10 @@ class SyncWorker(context: Context, params: WorkerParameters) : CoroutineWorker(c
343345 }?.build()
344346 }
345347
346- class CancelIntentService : IntentService (CancelIntentService : :class.java.name) {
347- override fun onHandleIntent (intent : Intent ? ) {
348- Log .d(" CancelIntentService" , " onHandleIntent()" )
349- requestStop(this )
350- }
351- }
352-
353348 companion object {
354- private const val N_UPLOADS : Int = 100
349+ private const val N_UPLOADS : Int = 250
350+ private val WAIT_TIME_RELEASE : Long = TimeUnit .MINUTES .toMillis(1 )
355351 private val INTERVAL_SYNC = TimeUnit .HOURS .toMillis(1 )
356- private const val REQUEST_CODE_CANCEL_SYNC = 0x12
357- private const val ACTION_CANCEL_SYNC = " ${BuildConfig .APPLICATION_ID } .ACTION_CANCEL_SYNC "
358352 private val WORKER_NAME = SyncWorker ::class .java.name
359353
360354 fun requestStart (context : Context , forceStart : Boolean , enableMetered : Boolean , isPeriodic : Boolean ) {
0 commit comments