Skip to content

Commit 18f9be5

Browse files
To deliver in order run transform on same thread as ActionObserver was notified on.
1 parent e845fe4 commit 18f9be5

File tree

4 files changed

+25
-25
lines changed

4 files changed

+25
-25
lines changed

objectbox-java/src/main/java/io/objectbox/BoxStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,7 @@ long internalHandle() {
918918
* Note that failed or aborted transaction do not trigger observers.
919919
*/
920920
public SubscriptionBuilder<Class> subscribe() {
921-
return new SubscriptionBuilder<>(objectClassPublisher, null, threadPool);
921+
return new SubscriptionBuilder<>(objectClassPublisher, null);
922922
}
923923

924924
@Experimental
@@ -977,7 +977,7 @@ public void setDbExceptionListener(DbExceptionListener dbExceptionListener) {
977977
*/
978978
@SuppressWarnings("unchecked")
979979
public <T> SubscriptionBuilder<Class<T>> subscribe(Class<T> forClass) {
980-
return new SubscriptionBuilder<>((DataPublisher) objectClassPublisher, forClass, threadPool);
980+
return new SubscriptionBuilder<>((DataPublisher) objectClassPublisher, forClass);
981981
}
982982

983983
@Internal

objectbox-java/src/main/java/io/objectbox/query/Query.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,7 @@ public Long call(long cursorHandle) {
634634
* it may be GCed and observers may become stale (won't receive anymore data).
635635
*/
636636
public SubscriptionBuilder<List<T>> subscribe() {
637-
return new SubscriptionBuilder<>(publisher, null, box.getStore().internalThreadPool());
637+
return new SubscriptionBuilder<>(publisher, null);
638638
}
639639

640640
/**

objectbox-java/src/main/java/io/objectbox/reactive/SubscriptionBuilder.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@
1616

1717
package io.objectbox.reactive;
1818

19-
import java.util.concurrent.ExecutorService;
20-
2119
import javax.annotation.Nullable;
2220

23-
import io.objectbox.annotation.apihint.Beta;
2421
import io.objectbox.annotation.apihint.Internal;
2522

2623
/**
@@ -45,7 +42,6 @@
4542
public class SubscriptionBuilder<T> {
4643
private final DataPublisher<T> publisher;
4744
private final Object publisherParam;
48-
private final ExecutorService threadPool;
4945
private DataObserver<T> observer;
5046
// private Runnable firstRunnable;
5147
private boolean weak;
@@ -59,10 +55,9 @@ public class SubscriptionBuilder<T> {
5955

6056

6157
@Internal
62-
public SubscriptionBuilder(DataPublisher<T> publisher, @Nullable Object param, ExecutorService threadPool) {
58+
public SubscriptionBuilder(DataPublisher<T> publisher, @Nullable Object param) {
6359
this.publisher = publisher;
6460
publisherParam = param;
65-
this.threadPool = threadPool;
6661
}
6762

6863
// public Observable<T> runFirst(Runnable firstRunnable) {
@@ -215,22 +210,27 @@ public void onData(final T data) {
215210
}
216211
}
217212

213+
/**
214+
* Runs on the thread of the {@link #onData(Object)} caller to ensure data is delivered
215+
* in the same order as {@link #onData(Object)} was called, to prevent delivering stale data.
216+
* <p>
217+
* For both ObjectClassPublisher and QueryPublisher this is the asynchronous
218+
* thread publish requests are processed on.
219+
* <p>
220+
* This could be optimized in the future to allow parallel execution,
221+
* but this would require an ordering mechanism for the transformed data.
222+
*/
218223
private void transformAndContinue(final T data) {
219-
threadPool.submit(new Runnable() {
220-
@Override
221-
public void run() {
222-
if (subscription.isCanceled()) {
223-
return;
224-
}
225-
try {
226-
// Type erasure FTW
227-
T result = (T) transformer.transform(data);
228-
callOnData(result);
229-
} catch (Throwable th) {
230-
callOnError(th, "Transformer failed without an ErrorObserver set");
231-
}
232-
}
233-
});
224+
if (subscription.isCanceled()) {
225+
return;
226+
}
227+
try {
228+
// Type erasure FTW
229+
T result = (T) transformer.transform(data);
230+
callOnData(result);
231+
} catch (Throwable th) {
232+
callOnError(th, "Transformer failed without an ErrorObserver set");
233+
}
234234
}
235235

236236
private void callOnError(Throwable th, String msgNoErrorObserver) {

objectbox-rxjava/src/test/java/io/objectbox/query/MockQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public MockQuery(boolean hasOrder) {
3535
// when(box.getStore()).thenReturn(boxStore);
3636
query = mock(Query.class);
3737
fakeQueryPublisher = new FakeQueryPublisher();
38-
SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder(fakeQueryPublisher, null, null);
38+
SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder(fakeQueryPublisher, null);
3939
when(query.subscribe()).thenReturn(subscriptionBuilder);
4040
}
4141

0 commit comments

Comments
 (0)