Skip to content

Commit ddde05c

Browse files
QueryPublisher: queue publish requests to ensure delivery order matches.
1 parent 955bd13 commit ddde05c

File tree

1 file changed

+67
-13
lines changed

1 file changed

+67
-13
lines changed

objectbox-java/src/main/java/io/objectbox/query/QueryPublisher.java

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.objectbox.query;
1818

19+
import java.util.ArrayDeque;
20+
import java.util.Deque;
1921
import java.util.List;
2022
import java.util.Set;
2123
import java.util.concurrent.CopyOnWriteArraySet;
@@ -29,13 +31,31 @@
2931
import io.objectbox.reactive.DataPublisher;
3032
import io.objectbox.reactive.DataPublisherUtils;
3133
import io.objectbox.reactive.DataSubscription;
34+
import io.objectbox.reactive.SubscriptionBuilder;
3235

36+
/**
37+
* A {@link DataPublisher} that subscribes to an ObjectClassPublisher if there is at least one observer.
38+
* Publishing is requested if the ObjectClassPublisher reports changes, a subscription is
39+
* {@link SubscriptionBuilder#observer(DataObserver) observed} or {@link Query#publish()} is called.
40+
* For publishing the query is re-run and the result delivered to the current observers.
41+
* Results are published on a single thread, one at a time, in the order publishing was requested.
42+
*/
3343
@Internal
34-
class QueryPublisher<T> implements DataPublisher<List<T>> {
44+
class QueryPublisher<T> implements DataPublisher<List<T>>, Runnable {
3545

3646
private final Query<T> query;
3747
private final Box<T> box;
3848
private final Set<DataObserver<List<T>>> observers = new CopyOnWriteArraySet<>();
49+
private final Deque<DataObserver<List<T>>> publishQueue = new ArrayDeque<>();
50+
private volatile boolean publisherRunning = false;
51+
52+
private static class AllObservers<T> implements DataObserver<List<T>> {
53+
@Override
54+
public void onData(List<T> data) {
55+
}
56+
}
57+
/** Placeholder observer if all observers should be notified. */
58+
private final AllObservers<T> ALL_OBSERVERS = new AllObservers<>();
3959

4060
private DataObserver<Class<T>> objectClassObserver;
4161
private DataSubscription objectClassSubscription;
@@ -76,26 +96,60 @@ public void onData(Class<T> objectClass) {
7696
}
7797

7898
@Override
79-
public void publishSingle(final DataObserver<List<T>> observer, @Nullable Object param) {
80-
box.getStore().internalScheduleThread(new Runnable() {
81-
@Override
82-
public void run() {
83-
List<T> result = query.find();
84-
observer.onData(result);
99+
public void publishSingle(DataObserver<List<T>> observer, @Nullable Object param) {
100+
synchronized (publishQueue) {
101+
publishQueue.add(observer);
102+
if (!publisherRunning) {
103+
publisherRunning = true;
104+
box.getStore().internalScheduleThread(this);
85105
}
86-
});
106+
}
87107
}
88108

89109
void publish() {
90-
box.getStore().internalScheduleThread(new Runnable() {
91-
@Override
92-
public void run() {
110+
synchronized (publishQueue) {
111+
publishQueue.add(ALL_OBSERVERS);
112+
if (!publisherRunning) {
113+
publisherRunning = true;
114+
box.getStore().internalScheduleThread(this);
115+
}
116+
}
117+
}
118+
119+
@Override
120+
public void run() {
121+
/*
122+
* Process publish requests for this query on a single thread to avoid an older request
123+
* racing a new one (and causing outdated results to be delivered last).
124+
*/
125+
try {
126+
while (true) {
127+
// Get next observer(s).
128+
DataObserver<List<T>> observer;
129+
synchronized (publishQueue) {
130+
observer = publishQueue.pollFirst();
131+
if (observer == null) {
132+
publisherRunning = false;
133+
break;
134+
}
135+
}
136+
137+
// Query, then notify observer(s).
93138
List<T> result = query.find();
94-
for (DataObserver<List<T>> observer : observers) {
139+
if (ALL_OBSERVERS.equals(observer)) {
140+
// Use current list of observers to avoid notifying unsubscribed observers.
141+
Set<DataObserver<List<T>>> observers = this.observers;
142+
for (DataObserver<List<T>> dataObserver : observers) {
143+
dataObserver.onData(result);
144+
}
145+
} else {
95146
observer.onData(result);
96147
}
97148
}
98-
});
149+
} finally {
150+
// Re-set if wrapped code throws, otherwise this publisher can no longer publish.
151+
publisherRunning = false;
152+
}
99153
}
100154

101155
@Override

0 commit comments

Comments
 (0)