Skip to content

Commit 3b42710

Browse files
committed
added observer method with DataSubscriptionList param
1 parent 3919d25 commit 3b42710

File tree

1 file changed

+32
-17
lines changed

1 file changed

+32
-17
lines changed

objectbox-java/src/main/java/io/objectbox/reactive/SubscriptionBuilder.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import javax.annotation.Nullable;
66

7+
import io.objectbox.annotation.apihint.Beta;
78
import io.objectbox.annotation.apihint.Internal;
89

910
/**
@@ -37,7 +38,7 @@ public class SubscriptionBuilder<T> {
3738
private DataTransformer<T, Object> transformer;
3839
private Scheduler scheduler;
3940
private ErrorObserver errorObserver;
40-
// private boolean sync;
41+
// private boolean sync;
4142

4243

4344
@Internal
@@ -47,13 +48,13 @@ public SubscriptionBuilder(DataPublisher<T> publisher, @Nullable Object param, E
4748
this.threadPool = threadPool;
4849
}
4950

50-
// public Observable<T> runFirst(Runnable firstRunnable) {
51-
// if (firstRunnable != null) {
52-
// throw new IllegalStateException("Only one asyncRunnable allowed");
53-
// }
54-
// this.firstRunnable = firstRunnable;
55-
// return this;
56-
// }
51+
// public Observable<T> runFirst(Runnable firstRunnable) {
52+
// if (firstRunnable != null) {
53+
// throw new IllegalStateException("Only one asyncRunnable allowed");
54+
// }
55+
// this.firstRunnable = firstRunnable;
56+
// return this;
57+
// }
5758

5859
/**
5960
* Uses a weak reference for the observer.
@@ -75,10 +76,10 @@ public SubscriptionBuilder<T> onlyChanges() {
7576
return this;
7677
}
7778

78-
// public Observable<T> sync() {
79-
// sync = true;
80-
// return this;
81-
// }
79+
// public Observable<T> sync() {
80+
// sync = true;
81+
// return this;
82+
// }
8283

8384
/**
8485
* Transforms the original data from the publisher to something that is more helpful to your application.
@@ -99,8 +100,9 @@ public <TO> SubscriptionBuilder<TO> transform(final DataTransformer<T, TO> trans
99100
}
100101

101102
/**
102-
* The given {@link ErrorObserver} is notified when the {@link DataTransformer} ({@link #transform(DataTransformer)}) or
103-
* {@link DataObserver} ({@link #observer(DataObserver)}) threw an exception.
103+
* The given {@link ErrorObserver} is notified when the {@link DataTransformer}
104+
* ({@link #transform(DataTransformer)}) or {@link DataObserver} ({@link #observer(DataObserver)})
105+
* threw an exception.
104106
*/
105107
public SubscriptionBuilder<T> onError(ErrorObserver errorObserver) {
106108
if (this.errorObserver != null) {
@@ -126,6 +128,8 @@ public SubscriptionBuilder<T> on(Scheduler scheduler) {
126128

127129
/**
128130
* The given observer is subscribed to the publisher. This method MUST be called to complete a subscription.
131+
* <p>
132+
* Note: you must keep the returned {@link DataSubscription} to cancel it.
129133
*
130134
* @return an subscription object used for canceling further notifications to the observer
131135
*/
@@ -147,20 +151,31 @@ public DataSubscription observer(DataObserver<T> observer) {
147151
observer = new ActionObserver(subscription);
148152
}
149153

150-
if(single) {
151-
if(onlyChanges) {
154+
if (single) {
155+
if (onlyChanges) {
152156
throw new IllegalStateException("Illegal combination of single() and onlyChanges()");
153157
}
154158
publisher.publishSingle(observer, publisherParam);
155159
} else {
156160
publisher.subscribe(observer, publisherParam);
157-
if(!onlyChanges) {
161+
if (!onlyChanges) {
158162
publisher.publishSingle(observer, publisherParam);
159163
}
160164
}
161165
return subscription;
162166
}
163167

168+
/**
169+
* Convenience for calling {@link #observer(DataObserver)} with adding the resulting {@link DataSubscription} to the
170+
* given {@link DataSubscriptionList}.
171+
*/
172+
@Beta
173+
public DataSubscription observer(DataObserver<T> observer, DataSubscriptionList dataSubscriptionList) {
174+
DataSubscription dataSubscription = observer(observer);
175+
dataSubscriptionList.add(dataSubscription);
176+
return dataSubscription;
177+
}
178+
164179
class ActionObserver implements DataObserver<T>, DelegatingObserver<T> {
165180
private final DataSubscriptionImpl subscription;
166181
private SchedulerRunOnError schedulerRunOnError;

0 commit comments

Comments
 (0)