Skip to content

Commit bf5e821

Browse files
Merge branch 'gh-793-ensure-publish-order' into release-3.0.0-alpha1
2 parents 04aa534 + c99832e commit bf5e821

File tree

9 files changed

+307
-85
lines changed

9 files changed

+307
-85
lines changed

objectbox-java/src/main/java/io/objectbox/BoxStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,7 @@ long internalHandle() {
918918
* Note that failed or aborted transaction do not trigger observers.
919919
*/
920920
public SubscriptionBuilder<Class> subscribe() {
921-
return new SubscriptionBuilder<>(objectClassPublisher, null, threadPool);
921+
return new SubscriptionBuilder<>(objectClassPublisher, null);
922922
}
923923

924924
@Experimental
@@ -977,7 +977,7 @@ public void setDbExceptionListener(DbExceptionListener dbExceptionListener) {
977977
*/
978978
@SuppressWarnings("unchecked")
979979
public <T> SubscriptionBuilder<Class<T>> subscribe(Class<T> forClass) {
980-
return new SubscriptionBuilder<>((DataPublisher) objectClassPublisher, forClass, threadPool);
980+
return new SubscriptionBuilder<>((DataPublisher) objectClassPublisher, forClass);
981981
}
982982

983983
@Internal

objectbox-java/src/main/java/io/objectbox/ObjectClassPublisher.java

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,28 @@
3131
import io.objectbox.reactive.DataObserver;
3232
import io.objectbox.reactive.DataPublisher;
3333
import io.objectbox.reactive.DataPublisherUtils;
34+
import io.objectbox.reactive.SubscriptionBuilder;
3435

36+
/**
37+
* A {@link DataPublisher} that notifies {@link DataObserver}s about changes in an entity box.
38+
* Publishing is requested when a subscription is {@link SubscriptionBuilder#observer(DataObserver) observed} and
39+
* then by {@link BoxStore} for each {@link BoxStore#txCommitted(Transaction, int[]) txCommitted}.
40+
* Publish requests are processed on a single thread, one at a time, in the order publishing was requested.
41+
*/
42+
@SuppressWarnings("rawtypes")
3543
@Internal
3644
class ObjectClassPublisher implements DataPublisher<Class>, Runnable {
3745
final BoxStore boxStore;
3846
final MultimapSet<Integer, DataObserver<Class>> observersByEntityTypeId = MultimapSet.create(SetType.THREAD_SAFE);
39-
final Deque<int[]> changesQueue = new ArrayDeque<>();
47+
private final Deque<PublishRequest> changesQueue = new ArrayDeque<>();
48+
private static class PublishRequest {
49+
@Nullable private final DataObserver<Class> observer;
50+
private final int[] entityTypeIds;
51+
PublishRequest(@Nullable DataObserver<Class> observer, int[] entityTypeIds) {
52+
this.observer = observer;
53+
this.entityTypeIds = entityTypeIds;
54+
}
55+
}
4056
volatile boolean changePublisherRunning;
4157

4258
ObjectClassPublisher(BoxStore boxStore) {
@@ -76,21 +92,19 @@ private void unsubscribe(DataObserver<Class> observer, int entityTypeId) {
7692
}
7793

7894
@Override
79-
public void publishSingle(final DataObserver<Class> observer, @Nullable final Object forClass) {
80-
boxStore.internalScheduleThread(new Runnable() {
81-
@Override
82-
public void run() {
83-
Collection<Class> entityClasses = forClass != null ? Collections.singletonList((Class) forClass) :
84-
boxStore.getAllEntityClasses();
85-
for (Class entityClass : entityClasses) {
86-
try {
87-
observer.onData(entityClass);
88-
} catch (RuntimeException e) {
89-
handleObserverException(entityClass);
90-
}
91-
}
95+
public void publishSingle(DataObserver<Class> observer, @Nullable Object forClass) {
96+
int[] entityTypeIds = forClass != null
97+
? new int[]{boxStore.getEntityTypeIdOrThrow((Class) forClass)}
98+
: boxStore.getAllEntityTypeIds();
99+
100+
synchronized (changesQueue) {
101+
changesQueue.add(new PublishRequest(observer, entityTypeIds));
102+
// Only one thread at a time.
103+
if (!changePublisherRunning) {
104+
changePublisherRunning = true;
105+
boxStore.internalScheduleThread(this);
92106
}
93-
});
107+
}
94108
}
95109

96110
private void handleObserverException(Class objectClass) {
@@ -107,39 +121,50 @@ private void handleObserverException(Class objectClass) {
107121
*/
108122
void publish(int[] entityTypeIdsAffected) {
109123
synchronized (changesQueue) {
110-
changesQueue.add(entityTypeIdsAffected);
111-
// Only one thread at a time
124+
changesQueue.add(new PublishRequest(null, entityTypeIdsAffected));
125+
// Only one thread at a time.
112126
if (!changePublisherRunning) {
113127
changePublisherRunning = true;
114128
boxStore.internalScheduleThread(this);
115129
}
116130
}
117131
}
118132

133+
/**
134+
* Processes publish requests using a single thread to prevent any data generated by observers to get stale.
135+
* This publisher on its own can NOT deliver stale data (the entity class types do not change).
136+
* However, a {@link DataObserver} of this publisher might apply a {@link io.objectbox.reactive.DataTransformer}
137+
* which queries for data which CAN get stale if delivered out of order.
138+
*/
119139
@Override
120140
public void run() {
121141
try {
122142
while (true) {
123-
// We do not join all available array, just in case the app relies on a specific order
124-
int[] entityTypeIdsAffected;
143+
PublishRequest request;
125144
synchronized (changesQueue) {
126-
entityTypeIdsAffected = changesQueue.pollFirst();
127-
if (entityTypeIdsAffected == null) {
145+
request = changesQueue.pollFirst();
146+
if (request == null) {
128147
changePublisherRunning = false;
129148
break;
130149
}
131150
}
132-
for (int entityTypeId : entityTypeIdsAffected) {
133-
Collection<DataObserver<Class>> observers = observersByEntityTypeId.get(entityTypeId);
134-
if (observers != null && !observers.isEmpty()) {
135-
Class objectClass = boxStore.getEntityClassOrThrow(entityTypeId);
136-
try {
137-
for (DataObserver<Class> observer : observers) {
138-
observer.onData(objectClass);
139-
}
140-
} catch (RuntimeException e) {
141-
handleObserverException(objectClass);
151+
152+
for (int entityTypeId : request.entityTypeIds) {
153+
// If no specific observer specified, notify all current observers.
154+
Collection<DataObserver<Class>> observers = request.observer != null
155+
? Collections.singletonList(request.observer)
156+
: observersByEntityTypeId.get(entityTypeId);
157+
if (observers == null || observers.isEmpty()) {
158+
continue; // No observers for this entity type.
159+
}
160+
161+
Class entityClass = boxStore.getEntityClassOrThrow(entityTypeId);
162+
try {
163+
for (DataObserver<Class> observer : observers) {
164+
observer.onData(entityClass);
142165
}
166+
} catch (RuntimeException e) {
167+
handleObserverException(entityClass);
143168
}
144169
}
145170
}

objectbox-java/src/main/java/io/objectbox/query/Query.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,7 @@ public Long call(long cursorHandle) {
634634
* it may be GCed and observers may become stale (won't receive anymore data).
635635
*/
636636
public SubscriptionBuilder<List<T>> subscribe() {
637-
return new SubscriptionBuilder<>(publisher, null, box.getStore().internalThreadPool());
637+
return new SubscriptionBuilder<>(publisher, null);
638638
}
639639

640640
/**

objectbox-java/src/main/java/io/objectbox/query/QueryPublisher.java

Lines changed: 81 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package io.objectbox.query;
1818

19+
import java.util.ArrayDeque;
20+
import java.util.ArrayList;
21+
import java.util.Deque;
1922
import java.util.List;
2023
import java.util.Set;
2124
import java.util.concurrent.CopyOnWriteArraySet;
@@ -29,13 +32,31 @@
2932
import io.objectbox.reactive.DataPublisher;
3033
import io.objectbox.reactive.DataPublisherUtils;
3134
import io.objectbox.reactive.DataSubscription;
35+
import io.objectbox.reactive.SubscriptionBuilder;
3236

37+
/**
38+
* A {@link DataPublisher} that subscribes to an ObjectClassPublisher if there is at least one observer.
39+
* Publishing is requested if the ObjectClassPublisher reports changes, a subscription is
40+
* {@link SubscriptionBuilder#observer(DataObserver) observed} or {@link Query#publish()} is called.
41+
* For publishing the query is re-run and the result delivered to the current observers.
42+
* Results are published on a single thread, one at a time, in the order publishing was requested.
43+
*/
3344
@Internal
34-
class QueryPublisher<T> implements DataPublisher<List<T>> {
45+
class QueryPublisher<T> implements DataPublisher<List<T>>, Runnable {
3546

3647
private final Query<T> query;
3748
private final Box<T> box;
3849
private final Set<DataObserver<List<T>>> observers = new CopyOnWriteArraySet<>();
50+
private final Deque<DataObserver<List<T>>> publishQueue = new ArrayDeque<>();
51+
private volatile boolean publisherRunning = false;
52+
53+
private static class SubscribedObservers<T> implements DataObserver<List<T>> {
54+
@Override
55+
public void onData(List<T> data) {
56+
}
57+
}
58+
/** Placeholder observer if subscribed observers should be notified. */
59+
private final SubscribedObservers<T> SUBSCRIBED_OBSERVERS = new SubscribedObservers<>();
3960

4061
private DataObserver<Class<T>> objectClassObserver;
4162
private DataSubscription objectClassSubscription;
@@ -76,26 +97,73 @@ public void onData(Class<T> objectClass) {
7697
}
7798

7899
@Override
79-
public void publishSingle(final DataObserver<List<T>> observer, @Nullable Object param) {
80-
box.getStore().internalScheduleThread(new Runnable() {
81-
@Override
82-
public void run() {
83-
List<T> result = query.find();
84-
observer.onData(result);
100+
public void publishSingle(DataObserver<List<T>> observer, @Nullable Object param) {
101+
synchronized (publishQueue) {
102+
publishQueue.add(observer);
103+
if (!publisherRunning) {
104+
publisherRunning = true;
105+
box.getStore().internalScheduleThread(this);
85106
}
86-
});
107+
}
87108
}
88109

89110
void publish() {
90-
box.getStore().internalScheduleThread(new Runnable() {
91-
@Override
92-
public void run() {
111+
synchronized (publishQueue) {
112+
publishQueue.add(SUBSCRIBED_OBSERVERS);
113+
if (!publisherRunning) {
114+
publisherRunning = true;
115+
box.getStore().internalScheduleThread(this);
116+
}
117+
}
118+
}
119+
120+
/**
121+
* Processes publish requests for this query on a single thread to prevent
122+
* older query results getting delivered after newer query results.
123+
* To speed up processing each loop publishes to all queued observers instead of just the next in line.
124+
* This reduces time spent querying and waiting for DataObserver.onData() and their potential DataTransformers.
125+
*/
126+
@Override
127+
public void run() {
128+
try {
129+
while (true) {
130+
// Get all queued observer(s), stop processing if none.
131+
List<DataObserver<List<T>>> singlePublishObservers = new ArrayList<>();
132+
boolean notifySubscribedObservers = false;
133+
synchronized (publishQueue) {
134+
DataObserver<List<T>> nextObserver;
135+
while ((nextObserver = publishQueue.poll()) != null) {
136+
if (SUBSCRIBED_OBSERVERS.equals(nextObserver)) {
137+
notifySubscribedObservers = true;
138+
} else {
139+
singlePublishObservers.add(nextObserver);
140+
}
141+
}
142+
if (!notifySubscribedObservers && singlePublishObservers.isEmpty()) {
143+
publisherRunning = false;
144+
break; // Stop.
145+
}
146+
}
147+
148+
// Query.
93149
List<T> result = query.find();
94-
for (DataObserver<List<T>> observer : observers) {
150+
151+
// Notify observer(s).
152+
for (DataObserver<List<T>> observer : singlePublishObservers) {
95153
observer.onData(result);
96154
}
155+
if (notifySubscribedObservers) {
156+
// Use current list of observers to avoid notifying unsubscribed observers.
157+
Set<DataObserver<List<T>>> observers = this.observers;
158+
for (DataObserver<List<T>> dataObserver : observers) {
159+
dataObserver.onData(result);
160+
}
161+
}
97162
}
98-
});
163+
} finally {
164+
// Re-set if wrapped code throws, otherwise this publisher can no longer publish.
165+
publisherRunning = false;
166+
}
99167
}
100168

101169
@Override

objectbox-java/src/main/java/io/objectbox/reactive/DataTransformer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@
1616

1717
package io.objectbox.reactive;
1818

19-
import javax.annotation.Nullable;
20-
2119
/**
2220
* Transforms or processes data before it is given to subscribed {@link DataObserver}s. A transformer is set via
2321
* {@link SubscriptionBuilder#transform(DataTransformer)}.
24-
*
22+
* <p>
2523
* Note that a transformer is not required to actually "transform" any data.
2624
* Technically, it's fine to return the same data it received and just do some processing with it.
27-
*
28-
* Threading notes: Note that the transformer is always executed asynchronously.
29-
* It is OK to perform long lasting operations.
25+
* <p>
26+
* Threading notes: transformations are executed sequentially on a background thread
27+
* owned by the subscription publisher. It is OK to perform long lasting operations,
28+
* however this will block notifications to all other observers until finished.
3029
*
3130
* @param <FROM> Data type this transformer receives
3231
* @param <TO> Type of transformed data

0 commit comments

Comments
 (0)