From b87357eabfbbb75ae948f6c6c09ef0325713f4de Mon Sep 17 00:00:00 2001 From: Matt Creaser Date: Fri, 7 Jun 2024 10:33:46 -0300 Subject: [PATCH 1/4] Add RxLint lint checks --- aws-datastore/build.gradle.kts | 2 ++ gradle/libs.versions.toml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/aws-datastore/build.gradle.kts b/aws-datastore/build.gradle.kts index 2e350d9e06..9f41b7aeed 100644 --- a/aws-datastore/build.gradle.kts +++ b/aws-datastore/build.gradle.kts @@ -24,6 +24,8 @@ apply(from = rootProject.file("configuration/publishing.gradle")) group = properties["POM_GROUP"].toString() dependencies { + compileOnly(libs.rxlint) + implementation(project(":core")) implementation(project(":aws-core")) implementation(project(":aws-api-appsync")) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c9d8a2eff8..ce7c98b6aa 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -42,6 +42,7 @@ oauth2 = "0.26.0" okhttp = "5.0.0-alpha.11" robolectric = "4.7" rxjava = "3.0.6" +rxlint = "1.7.8" slf4j = "2.0.6" sqlcipher = "4.5.4" tensorflow = "2.0.0" @@ -91,6 +92,7 @@ maplibre-sdk = { module = "org.maplibre.gl:android-sdk", version.ref = "maplibre oauth2 = { module = "com.google.auth:google-auth-library-oauth2-http", version.ref = "oauth2" } okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" } rxjava = { module = "io.reactivex.rxjava3:rxjava", version.ref = "rxjava" } +rxlint = { module = "nl.littlerobots.rxlint:rxlint", version.ref = "rxlint" } slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j"} sqlcipher= { module = "net.zetetic:android-database-sqlcipher", version.ref = "sqlcipher" } tensorflow = { module = "org.tensorflow:tensorflow-lite", version.ref="tensorflow" } From becc013cb6185f3f052e134cd88a7e994ebdb130 Mon Sep 17 00:00:00 2001 From: Matt Creaser Date: Mon, 10 Jun 2024 15:09:23 -0300 Subject: [PATCH 2/4] Fix RxLint errors by adding onError handlers and disposing of subscriptions --- .../datastore/AWSDataStorePlugin.java | 65 ++++++++++++++----- .../datastore/syncengine/Orchestrator.java | 26 +++++--- .../datastore/syncengine/RetryStrategy.java | 7 +- .../syncengine/SubscriptionProcessor.java | 5 +- 4 files changed, 76 insertions(+), 27 deletions(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java index 8377a21ac2..d5219b3d36 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java @@ -15,6 +15,7 @@ package com.amplifyframework.datastore; +import android.annotation.SuppressLint; import android.content.Context; import androidx.annotation.NonNull; import androidx.annotation.Nullable; @@ -65,11 +66,14 @@ import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.disposables.CompositeDisposable; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.schedulers.Schedulers; /** * An AWS implementation of the {@link DataStorePlugin}. */ +@SuppressLint("RxLeakedSubscription") public final class AWSDataStorePlugin extends DataStorePlugin { private static final Logger LOG = Amplify.Logging.logger(CategoryType.DATASTORE, "amplify:aws-datastore"); private static final long LIFECYCLE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5); @@ -98,6 +102,12 @@ public final class AWSDataStorePlugin extends DataStorePlugin { private final ReachabilityMonitor reachabilityMonitor; + // Subscriptions that should be disposed when datastore is stopped + private final CompositeDisposable startedDisposables = new CompositeDisposable(); + + // Subscriptions that have the same lifetime as the plugin + private final CompositeDisposable pluginDisposables = new CompositeDisposable(); + private AWSDataStorePlugin( @NonNull ModelProvider modelProvider, @NonNull SchemaRegistry schemaRegistry, @@ -286,7 +296,11 @@ private void configure(Context context, DataStoreConfiguration configuration) { reachabilityMonitor.configure(context); - waitForInitialization().subscribe(this::observeNetworkStatus); + Disposable subscription = waitForInitialization().subscribe( + this::observeNetworkStatus, + error -> LOG.error("Datastore did not initialize", error) + ); + pluginDisposables.add(subscription); } private void publishNetworkStatusEvent(boolean active) { @@ -295,16 +309,27 @@ private void publishNetworkStatusEvent(boolean active) { } private void observeNetworkStatus() { - reachabilityMonitor.getObservable() - .subscribe(this::publishNetworkStatusEvent); + Disposable subscription = reachabilityMonitor.getObservable() + .subscribe( + this::publishNetworkStatusEvent, + error -> LOG.warn("Unable to subscribe to network status events", error) + ); + pluginDisposables.add(subscription); } + @SuppressLint("CheckResult") @WorkerThread @Override public void initialize(@NonNull Context context) throws AmplifyException { try { - initializeStorageAdapter(context) - .blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + boolean initialized = initializeStorageAdapter(context) + .blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (!initialized) { + throw new DataStoreException( + "Storage adapter did not initialize within allotted timeout", + AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION + ); + } } catch (Throwable initError) { throw new AmplifyException( "Failed to initialize the local storage adapter for the DataStore plugin.", @@ -327,8 +352,8 @@ private Completable initializeStorageAdapter(Context context) { } private Completable waitForInitialization() { - return Completable.fromAction(() -> categoryInitializationsPending.await()) - .timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS) + return Completable.fromAction(categoryInitializationsPending::await) + .timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS, Schedulers.io()) .subscribeOn(Schedulers.io()) .doOnComplete(() -> LOG.info("DataStore plugin initialized.")) .doOnError(error -> LOG.error("DataStore initialization timed out.", error)); @@ -339,13 +364,14 @@ private Completable waitForInitialization() { */ @Override public void start(@NonNull Action onComplete, @NonNull Consumer onError) { - waitForInitialization() + Disposable subscription = waitForInitialization() .andThen(orchestrator.start()) .subscribeOn(Schedulers.io()) .subscribe( onComplete::call, error -> onError.accept(new DataStoreException("Failed to start DataStore.", error, "Retry.")) ); + startedDisposables.add(subscription); } /** @@ -353,13 +379,15 @@ public void start(@NonNull Action onComplete, @NonNull Consumer onError) { - waitForInitialization() + startedDisposables.dispose(); + Disposable subscription = waitForInitialization() .andThen(orchestrator.stop()) .subscribeOn(Schedulers.io()) .subscribe( onComplete::call, error -> onError.accept(new DataStoreException("Failed to stop DataStore.", error, "Retry.")) ); + startedDisposables.add(subscription); } /** @@ -372,12 +400,19 @@ public void stop(@NonNull Action onComplete, @NonNull Consumer onError) { - stop(() -> Completable.create(emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError)) - .subscribeOn(Schedulers.io()) - .subscribe(onComplete::call, - throwable -> onError.accept(new DataStoreException("Clear operation failed", - throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION))), - onError); + stop( + () -> { + Disposable completable = Completable.create( + emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError) + ) + .subscribeOn(Schedulers.io()) + .subscribe(onComplete::call, + throwable -> onError.accept(new DataStoreException("Clear operation failed", + throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION))); + pluginDisposables.add(completable); + }, + onError + ); } /** diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java index fc5f746f80..91c81fb7e8 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java @@ -404,9 +404,12 @@ private void onApiSyncFailure(Throwable exception) { return; } LOG.warn("API sync failed - transitioning to LOCAL_ONLY.", exception); - Completable.fromAction(this::transitionToLocalOnly) - .doOnError(error -> LOG.warn("Transition to LOCAL_ONLY failed.", error)) - .subscribe(); + Disposable subscription = Completable.fromAction(this::transitionToLocalOnly) + .subscribe( + () -> { /* no-op */ }, + error -> LOG.warn("Transition to LOCAL_ONLY failed.", error) + ); + disposables.add(subscription); } private void disposeNetworkChanges() { @@ -422,13 +425,16 @@ private void monitorNetworkChanges() { monitorNetworkChangesDisposable = reachabilityMonitor.getObservable() .skip(1) // We skip the current online state, we only care about transitions .filter(ignore -> !State.STOPPED.equals(currentState.get())) - .subscribe(isOnline -> { - if (isOnline) { - transitionToApiSync(); - } else { - transitionToLocalOnly(); - } - }); + .subscribe( + isOnline -> { + if (isOnline) { + transitionToApiSync(); + } else { + transitionToLocalOnly(); + } + }, + error -> LOG.warn("Error observing network changes", error) + ); } /** diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryStrategy.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryStrategy.java index dd17839713..794a349400 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryStrategy.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryStrategy.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.schedulers.Schedulers; /** * Class that defines inner classes and interfaces related to retry strategies. @@ -71,7 +72,11 @@ public boolean retryHandler(int attemptNumber, Throwable throwable) { } else { final long waitTimeSeconds = Double.valueOf(Math.pow(2, attemptNumber % maxExponent)).longValue(); LOG.debug("Waiting " + waitTimeSeconds + " seconds before retrying"); - Completable.timer(TimeUnit.SECONDS.toMillis(waitTimeSeconds), TimeUnit.MILLISECONDS).blockingAwait(); + Completable.timer( + TimeUnit.SECONDS.toMillis(waitTimeSeconds), + TimeUnit.MILLISECONDS, + Schedulers.io() + ).blockingAwait(); return true; } } diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SubscriptionProcessor.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SubscriptionProcessor.java index eb06426b35..74ef3705ef 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SubscriptionProcessor.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SubscriptionProcessor.java @@ -260,7 +260,10 @@ void startDrainingMutationBuffer() { .flatMapCompletable(this::mergeEvent) .doOnError(failure -> LOG.warn("Reading subscriptions buffer has failed.", failure)) .doOnComplete(() -> LOG.warn("Reading from subscriptions buffer is completed.")) - .subscribe() + .subscribe( + () -> LOG.info("Subscription data buffer processing complete"), + error -> LOG.warn("Error draining subscription data buffer", error) + ) ); } From 6f945fc84c05846b0a9972ca65a7eea67c8f2a3b Mon Sep 17 00:00:00 2001 From: Matt Creaser Date: Mon, 10 Jun 2024 15:17:39 -0300 Subject: [PATCH 3/4] Don't suppress leaked subscription --- .../java/com/amplifyframework/datastore/AWSDataStorePlugin.java | 1 - 1 file changed, 1 deletion(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java index d5219b3d36..acb614b924 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java @@ -73,7 +73,6 @@ /** * An AWS implementation of the {@link DataStorePlugin}. */ -@SuppressLint("RxLeakedSubscription") public final class AWSDataStorePlugin extends DataStorePlugin { private static final Logger LOG = Amplify.Logging.logger(CategoryType.DATASTORE, "amplify:aws-datastore"); private static final long LIFECYCLE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5); From 72f10e468bfd00f59540743bf9863acbb41cfb05 Mon Sep 17 00:00:00 2001 From: Matt Creaser Date: Tue, 11 Jun 2024 09:31:31 -0300 Subject: [PATCH 4/4] Don't need RxJava to call transitionToLocalOnly --- .../datastore/syncengine/Orchestrator.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java index 91c81fb7e8..9f712c2efb 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java @@ -404,12 +404,11 @@ private void onApiSyncFailure(Throwable exception) { return; } LOG.warn("API sync failed - transitioning to LOCAL_ONLY.", exception); - Disposable subscription = Completable.fromAction(this::transitionToLocalOnly) - .subscribe( - () -> { /* no-op */ }, - error -> LOG.warn("Transition to LOCAL_ONLY failed.", error) - ); - disposables.add(subscription); + try { + transitionToLocalOnly(); + } catch (Exception error) { + LOG.warn("Transition to LOCAL_ONLY failed.", error); + } } private void disposeNetworkChanges() {