Skip to content

Commit becc013

Browse files
committed
Fix RxLint errors by adding onError handlers and disposing of subscriptions
1 parent b87357e commit becc013

File tree

4 files changed

+76
-27
lines changed

4 files changed

+76
-27
lines changed

aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package com.amplifyframework.datastore;
1717

18+
import android.annotation.SuppressLint;
1819
import android.content.Context;
1920
import androidx.annotation.NonNull;
2021
import androidx.annotation.Nullable;
@@ -65,11 +66,14 @@
6566
import java.util.concurrent.TimeUnit;
6667

6768
import io.reactivex.rxjava3.core.Completable;
69+
import io.reactivex.rxjava3.disposables.CompositeDisposable;
70+
import io.reactivex.rxjava3.disposables.Disposable;
6871
import io.reactivex.rxjava3.schedulers.Schedulers;
6972

7073
/**
7174
* An AWS implementation of the {@link DataStorePlugin}.
7275
*/
76+
@SuppressLint("RxLeakedSubscription")
7377
public final class AWSDataStorePlugin extends DataStorePlugin<Void> {
7478
private static final Logger LOG = Amplify.Logging.logger(CategoryType.DATASTORE, "amplify:aws-datastore");
7579
private static final long LIFECYCLE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(5);
@@ -98,6 +102,12 @@ public final class AWSDataStorePlugin extends DataStorePlugin<Void> {
98102

99103
private final ReachabilityMonitor reachabilityMonitor;
100104

105+
// Subscriptions that should be disposed when datastore is stopped
106+
private final CompositeDisposable startedDisposables = new CompositeDisposable();
107+
108+
// Subscriptions that have the same lifetime as the plugin
109+
private final CompositeDisposable pluginDisposables = new CompositeDisposable();
110+
101111
private AWSDataStorePlugin(
102112
@NonNull ModelProvider modelProvider,
103113
@NonNull SchemaRegistry schemaRegistry,
@@ -286,7 +296,11 @@ private void configure(Context context, DataStoreConfiguration configuration) {
286296

287297
reachabilityMonitor.configure(context);
288298

289-
waitForInitialization().subscribe(this::observeNetworkStatus);
299+
Disposable subscription = waitForInitialization().subscribe(
300+
this::observeNetworkStatus,
301+
error -> LOG.error("Datastore did not initialize", error)
302+
);
303+
pluginDisposables.add(subscription);
290304
}
291305

292306
private void publishNetworkStatusEvent(boolean active) {
@@ -295,16 +309,27 @@ private void publishNetworkStatusEvent(boolean active) {
295309
}
296310

297311
private void observeNetworkStatus() {
298-
reachabilityMonitor.getObservable()
299-
.subscribe(this::publishNetworkStatusEvent);
312+
Disposable subscription = reachabilityMonitor.getObservable()
313+
.subscribe(
314+
this::publishNetworkStatusEvent,
315+
error -> LOG.warn("Unable to subscribe to network status events", error)
316+
);
317+
pluginDisposables.add(subscription);
300318
}
301319

320+
@SuppressLint("CheckResult")
302321
@WorkerThread
303322
@Override
304323
public void initialize(@NonNull Context context) throws AmplifyException {
305324
try {
306-
initializeStorageAdapter(context)
307-
.blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
325+
boolean initialized = initializeStorageAdapter(context)
326+
.blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
327+
if (!initialized) {
328+
throw new DataStoreException(
329+
"Storage adapter did not initialize within allotted timeout",
330+
AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION
331+
);
332+
}
308333
} catch (Throwable initError) {
309334
throw new AmplifyException(
310335
"Failed to initialize the local storage adapter for the DataStore plugin.",
@@ -327,8 +352,8 @@ private Completable initializeStorageAdapter(Context context) {
327352
}
328353

329354
private Completable waitForInitialization() {
330-
return Completable.fromAction(() -> categoryInitializationsPending.await())
331-
.timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
355+
return Completable.fromAction(categoryInitializationsPending::await)
356+
.timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS, Schedulers.io())
332357
.subscribeOn(Schedulers.io())
333358
.doOnComplete(() -> LOG.info("DataStore plugin initialized."))
334359
.doOnError(error -> LOG.error("DataStore initialization timed out.", error));
@@ -339,27 +364,30 @@ private Completable waitForInitialization() {
339364
*/
340365
@Override
341366
public void start(@NonNull Action onComplete, @NonNull Consumer<DataStoreException> onError) {
342-
waitForInitialization()
367+
Disposable subscription = waitForInitialization()
343368
.andThen(orchestrator.start())
344369
.subscribeOn(Schedulers.io())
345370
.subscribe(
346371
onComplete::call,
347372
error -> onError.accept(new DataStoreException("Failed to start DataStore.", error, "Retry."))
348373
);
374+
startedDisposables.add(subscription);
349375
}
350376

351377
/**
352378
* {@inheritDoc}
353379
*/
354380
@Override
355381
public void stop(@NonNull Action onComplete, @NonNull Consumer<DataStoreException> onError) {
356-
waitForInitialization()
382+
startedDisposables.dispose();
383+
Disposable subscription = waitForInitialization()
357384
.andThen(orchestrator.stop())
358385
.subscribeOn(Schedulers.io())
359386
.subscribe(
360387
onComplete::call,
361388
error -> onError.accept(new DataStoreException("Failed to stop DataStore.", error, "Retry."))
362389
);
390+
startedDisposables.add(subscription);
363391
}
364392

365393
/**
@@ -372,12 +400,19 @@ public void stop(@NonNull Action onComplete, @NonNull Consumer<DataStoreExceptio
372400
*/
373401
@Override
374402
public void clear(@NonNull Action onComplete, @NonNull Consumer<DataStoreException> onError) {
375-
stop(() -> Completable.create(emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError))
376-
.subscribeOn(Schedulers.io())
377-
.subscribe(onComplete::call,
378-
throwable -> onError.accept(new DataStoreException("Clear operation failed",
379-
throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION))),
380-
onError);
403+
stop(
404+
() -> {
405+
Disposable completable = Completable.create(
406+
emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError)
407+
)
408+
.subscribeOn(Schedulers.io())
409+
.subscribe(onComplete::call,
410+
throwable -> onError.accept(new DataStoreException("Clear operation failed",
411+
throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION)));
412+
pluginDisposables.add(completable);
413+
},
414+
onError
415+
);
381416
}
382417

383418
/**

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/Orchestrator.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -404,9 +404,12 @@ private void onApiSyncFailure(Throwable exception) {
404404
return;
405405
}
406406
LOG.warn("API sync failed - transitioning to LOCAL_ONLY.", exception);
407-
Completable.fromAction(this::transitionToLocalOnly)
408-
.doOnError(error -> LOG.warn("Transition to LOCAL_ONLY failed.", error))
409-
.subscribe();
407+
Disposable subscription = Completable.fromAction(this::transitionToLocalOnly)
408+
.subscribe(
409+
() -> { /* no-op */ },
410+
error -> LOG.warn("Transition to LOCAL_ONLY failed.", error)
411+
);
412+
disposables.add(subscription);
410413
}
411414

412415
private void disposeNetworkChanges() {
@@ -422,13 +425,16 @@ private void monitorNetworkChanges() {
422425
monitorNetworkChangesDisposable = reachabilityMonitor.getObservable()
423426
.skip(1) // We skip the current online state, we only care about transitions
424427
.filter(ignore -> !State.STOPPED.equals(currentState.get()))
425-
.subscribe(isOnline -> {
426-
if (isOnline) {
427-
transitionToApiSync();
428-
} else {
429-
transitionToLocalOnly();
430-
}
431-
});
428+
.subscribe(
429+
isOnline -> {
430+
if (isOnline) {
431+
transitionToApiSync();
432+
} else {
433+
transitionToLocalOnly();
434+
}
435+
},
436+
error -> LOG.warn("Error observing network changes", error)
437+
);
432438
}
433439

434440
/**

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/RetryStrategy.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.TimeUnit;
2525

2626
import io.reactivex.rxjava3.core.Completable;
27+
import io.reactivex.rxjava3.schedulers.Schedulers;
2728

2829
/**
2930
* Class that defines inner classes and interfaces related to retry strategies.
@@ -71,7 +72,11 @@ public boolean retryHandler(int attemptNumber, Throwable throwable) {
7172
} else {
7273
final long waitTimeSeconds = Double.valueOf(Math.pow(2, attemptNumber % maxExponent)).longValue();
7374
LOG.debug("Waiting " + waitTimeSeconds + " seconds before retrying");
74-
Completable.timer(TimeUnit.SECONDS.toMillis(waitTimeSeconds), TimeUnit.MILLISECONDS).blockingAwait();
75+
Completable.timer(
76+
TimeUnit.SECONDS.toMillis(waitTimeSeconds),
77+
TimeUnit.MILLISECONDS,
78+
Schedulers.io()
79+
).blockingAwait();
7580
return true;
7681
}
7782
}

aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SubscriptionProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,10 @@ void startDrainingMutationBuffer() {
260260
.flatMapCompletable(this::mergeEvent)
261261
.doOnError(failure -> LOG.warn("Reading subscriptions buffer has failed.", failure))
262262
.doOnComplete(() -> LOG.warn("Reading from subscriptions buffer is completed."))
263-
.subscribe()
263+
.subscribe(
264+
() -> LOG.info("Subscription data buffer processing complete"),
265+
error -> LOG.warn("Error draining subscription data buffer", error)
266+
)
264267
);
265268
}
266269

0 commit comments

Comments
 (0)