diff --git a/aws-datastore/build.gradle.kts b/aws-datastore/build.gradle.kts index 767bd466a6..df182042a0 100644 --- a/aws-datastore/build.gradle.kts +++ b/aws-datastore/build.gradle.kts @@ -28,6 +28,8 @@ android { } dependencies { + compileOnly(libs.rxlint) + implementation(project(":core")) implementation(project(":aws-core")) implementation(project(":aws-api-appsync")) diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java index 8555b94079..cc5fa1a181 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java @@ -66,6 +66,8 @@ import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.disposables.CompositeDisposable; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.schedulers.Schedulers; /** @@ -99,6 +101,12 @@ public final class AWSDataStorePlugin extends DataStorePlugin { private final ReachabilityMonitor reachabilityMonitor; + // Subscriptions that should be disposed when datastore is stopped + private final CompositeDisposable startedDisposables = new CompositeDisposable(); + + // Subscriptions that have the same lifetime as the plugin + private final CompositeDisposable pluginDisposables = new CompositeDisposable(); + private AWSDataStorePlugin( @NonNull ModelProvider modelProvider, @NonNull SchemaRegistry schemaRegistry, @@ -288,7 +296,11 @@ private void configure(Context context, DataStoreConfiguration configuration) { reachabilityMonitor.configure(context); - waitForInitialization().subscribe(this::observeNetworkStatus); + Disposable subscription = waitForInitialization().subscribe( + this::observeNetworkStatus, + error -> LOG.error("Datastore did not initialize", error) + ); + pluginDisposables.add(subscription); } private void publishNetworkStatusEvent(boolean active) { @@ -298,8 +310,12 @@ private void publishNetworkStatusEvent(boolean active) { @SuppressLint({"CheckResult", "RxLeakedSubscription", "RxSubscribeOnError"}) private void observeNetworkStatus() { - reachabilityMonitor.getObservable() - .subscribe(this::publishNetworkStatusEvent); + Disposable subscription = reachabilityMonitor.getObservable() + .subscribe( + this::publishNetworkStatusEvent, + error -> LOG.warn("Unable to subscribe to network status events", error) + ); + pluginDisposables.add(subscription); } @SuppressLint("CheckResult") @@ -307,8 +323,14 @@ private void observeNetworkStatus() { @Override public void initialize(@NonNull Context context) throws AmplifyException { try { - initializeStorageAdapter(context) - .blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + boolean initialized = initializeStorageAdapter(context) + .blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (!initialized) { + throw new DataStoreException( + "Storage adapter did not initialize within allotted timeout", + AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION + ); + } } catch (Throwable initError) { throw new AmplifyException( "Failed to initialize the local storage adapter for the DataStore plugin.", @@ -332,8 +354,8 @@ private Completable initializeStorageAdapter(Context context) { @SuppressLint("RxDefaultScheduler") private Completable waitForInitialization() { - return Completable.fromAction(() -> categoryInitializationsPending.await()) - .timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS) + return Completable.fromAction(categoryInitializationsPending::await) + .timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS, Schedulers.io()) .subscribeOn(Schedulers.io()) .doOnComplete(() -> LOG.info("DataStore plugin initialized.")) .doOnError(error -> LOG.error("DataStore initialization timed out.", error)); @@ -345,13 +367,14 @@ private Completable waitForInitialization() { @SuppressLint({"RxLeakedSubscription", "CheckResult"}) @Override public void start(@NonNull Action onComplete, @NonNull Consumer onError) { - waitForInitialization() + Disposable subscription = waitForInitialization() .andThen(orchestrator.start()) .subscribeOn(Schedulers.io()) .subscribe( onComplete::call, error -> onError.accept(new DataStoreException("Failed to start DataStore.", error, "Retry.")) ); + startedDisposables.add(subscription); } /** @@ -360,13 +383,15 @@ public void start(@NonNull Action onComplete, @NonNull Consumer onError) { - waitForInitialization() + startedDisposables.dispose(); + Disposable subscription = waitForInitialization() .andThen(orchestrator.stop()) .subscribeOn(Schedulers.io()) .subscribe( onComplete::call, error -> onError.accept(new DataStoreException("Failed to stop DataStore.", error, "Retry.")) ); + startedDisposables.add(subscription); } /** @@ -379,12 +404,19 @@ public void stop(@NonNull Action onComplete, @NonNull Consumer onError) { - stop(() -> Completable.create(emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError)) - .subscribeOn(Schedulers.io()) - .subscribe(onComplete::call, - throwable -> onError.accept(new DataStoreException("Clear operation failed", - throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION))), - onError); + stop( + () -> { + Disposable completable = Completable.create( + emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError) + ) + .subscribeOn(Schedulers.io()) + .subscribe(onComplete::call, + throwable -> onError.accept(new DataStoreException("Clear operation failed", + throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION))); + pluginDisposables.add(completable); + }, + onError + ); } /** diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java index fc5f746f80..9f712c2efb 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java @@ -404,9 +404,11 @@ private void onApiSyncFailure(Throwable exception) { return; } LOG.warn("API sync failed - transitioning to LOCAL_ONLY.", exception); - Completable.fromAction(this::transitionToLocalOnly) - .doOnError(error -> LOG.warn("Transition to LOCAL_ONLY failed.", error)) - .subscribe(); + try { + transitionToLocalOnly(); + } catch (Exception error) { + LOG.warn("Transition to LOCAL_ONLY failed.", error); + } } private void disposeNetworkChanges() { @@ -422,13 +424,16 @@ private void monitorNetworkChanges() { monitorNetworkChangesDisposable = reachabilityMonitor.getObservable() .skip(1) // We skip the current online state, we only care about transitions .filter(ignore -> !State.STOPPED.equals(currentState.get())) - .subscribe(isOnline -> { - if (isOnline) { - transitionToApiSync(); - } else { - transitionToLocalOnly(); - } - }); + .subscribe( + isOnline -> { + if (isOnline) { + transitionToApiSync(); + } else { + transitionToLocalOnly(); + } + }, + error -> LOG.warn("Error observing network changes", error) + ); } /** diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryStrategy.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryStrategy.java index dd17839713..794a349400 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryStrategy.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryStrategy.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.schedulers.Schedulers; /** * Class that defines inner classes and interfaces related to retry strategies. @@ -71,7 +72,11 @@ public boolean retryHandler(int attemptNumber, Throwable throwable) { } else { final long waitTimeSeconds = Double.valueOf(Math.pow(2, attemptNumber % maxExponent)).longValue(); LOG.debug("Waiting " + waitTimeSeconds + " seconds before retrying"); - Completable.timer(TimeUnit.SECONDS.toMillis(waitTimeSeconds), TimeUnit.MILLISECONDS).blockingAwait(); + Completable.timer( + TimeUnit.SECONDS.toMillis(waitTimeSeconds), + TimeUnit.MILLISECONDS, + Schedulers.io() + ).blockingAwait(); return true; } } diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SubscriptionProcessor.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SubscriptionProcessor.java index eb06426b35..74ef3705ef 100644 --- a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SubscriptionProcessor.java +++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SubscriptionProcessor.java @@ -260,7 +260,10 @@ void startDrainingMutationBuffer() { .flatMapCompletable(this::mergeEvent) .doOnError(failure -> LOG.warn("Reading subscriptions buffer has failed.", failure)) .doOnComplete(() -> LOG.warn("Reading from subscriptions buffer is completed.")) - .subscribe() + .subscribe( + () -> LOG.info("Subscription data buffer processing complete"), + error -> LOG.warn("Error draining subscription data buffer", error) + ) ); } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c99c98f4b3..9fe330886c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -45,6 +45,7 @@ oauth2 = "0.26.0" okhttp = "5.0.0-alpha.11" robolectric = "4.7" rxjava = "3.0.6" +rxlint = "1.7.8" slf4j = "2.0.6" sqlcipher = "4.5.4" tensorflow = "2.0.0" @@ -97,6 +98,7 @@ maplibre-sdk = { module = "org.maplibre.gl:android-sdk", version.ref = "maplibre oauth2 = { module = "com.google.auth:google-auth-library-oauth2-http", version.ref = "oauth2" } okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" } rxjava = { module = "io.reactivex.rxjava3:rxjava", version.ref = "rxjava" } +rxlint = { module = "nl.littlerobots.rxlint:rxlint", version.ref = "rxlint" } slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j"} sqlcipher= { module = "net.zetetic:android-database-sqlcipher", version.ref = "sqlcipher" } tensorflow = { module = "org.tensorflow:tensorflow-lite", version.ref="tensorflow" }