Skip to content

Commit 24a5e82

Browse files
QueryPublisher: notify all queued observers to reduce wait.
1 parent 4d7c147 commit 24a5e82

File tree

1 file changed

+31
-17
lines changed

1 file changed

+31
-17
lines changed

objectbox-java/src/main/java/io/objectbox/query/QueryPublisher.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.objectbox.query;
1818

1919
import java.util.ArrayDeque;
20+
import java.util.ArrayList;
2021
import java.util.Deque;
2122
import java.util.List;
2223
import java.util.Set;
@@ -49,13 +50,13 @@ class QueryPublisher<T> implements DataPublisher<List<T>>, Runnable {
4950
private final Deque<DataObserver<List<T>>> publishQueue = new ArrayDeque<>();
5051
private volatile boolean publisherRunning = false;
5152

52-
private static class AllObservers<T> implements DataObserver<List<T>> {
53+
private static class SubscribedObservers<T> implements DataObserver<List<T>> {
5354
@Override
5455
public void onData(List<T> data) {
5556
}
5657
}
57-
/** Placeholder observer if all observers should be notified. */
58-
private final AllObservers<T> ALL_OBSERVERS = new AllObservers<>();
58+
/** Placeholder observer if subscribed observers should be notified. */
59+
private final SubscribedObservers<T> SUBSCRIBED_OBSERVERS = new SubscribedObservers<>();
5960

6061
private DataObserver<Class<T>> objectClassObserver;
6162
private DataSubscription objectClassSubscription;
@@ -108,42 +109,55 @@ public void publishSingle(DataObserver<List<T>> observer, @Nullable Object param
108109

109110
void publish() {
110111
synchronized (publishQueue) {
111-
publishQueue.add(ALL_OBSERVERS);
112+
publishQueue.add(SUBSCRIBED_OBSERVERS);
112113
if (!publisherRunning) {
113114
publisherRunning = true;
114115
box.getStore().internalScheduleThread(this);
115116
}
116117
}
117118
}
118119

120+
/**
121+
* Processes publish requests for this query on a single thread to prevent
122+
* older query results getting delivered after newer query results.
123+
* To speed up processing each loop publishes to all queued observers instead of just the next in line.
124+
* This reduces time spent querying and waiting for DataObserver.onData() and their potential DataTransformers.
125+
*/
119126
@Override
120127
public void run() {
121-
/*
122-
* Process publish requests for this query on a single thread to avoid an older request
123-
* racing a new one (and causing outdated results to be delivered last).
124-
*/
125128
try {
126129
while (true) {
127-
// Get next observer(s).
128-
DataObserver<List<T>> observer;
130+
// Get all queued observer(s), stop processing if none.
131+
List<DataObserver<List<T>>> singlePublishObservers = new ArrayList<>();
132+
boolean notifySubscribedObservers = false;
129133
synchronized (publishQueue) {
130-
observer = publishQueue.pollFirst();
131-
if (observer == null) {
134+
DataObserver<List<T>> nextObserver;
135+
while ((nextObserver = publishQueue.poll()) != null) {
136+
if (SUBSCRIBED_OBSERVERS.equals(nextObserver)) {
137+
notifySubscribedObservers = true;
138+
} else {
139+
singlePublishObservers.add(nextObserver);
140+
}
141+
}
142+
if (!notifySubscribedObservers && singlePublishObservers.isEmpty()) {
132143
publisherRunning = false;
133-
break;
144+
break; // Stop.
134145
}
135146
}
136147

137-
// Query, then notify observer(s).
148+
// Query.
138149
List<T> result = query.find();
139-
if (ALL_OBSERVERS.equals(observer)) {
150+
151+
// Notify observer(s).
152+
for (DataObserver<List<T>> observer : singlePublishObservers) {
153+
observer.onData(result);
154+
}
155+
if (notifySubscribedObservers) {
140156
// Use current list of observers to avoid notifying unsubscribed observers.
141157
Set<DataObserver<List<T>>> observers = this.observers;
142158
for (DataObserver<List<T>> dataObserver : observers) {
143159
dataObserver.onData(result);
144160
}
145-
} else {
146-
observer.onData(result);
147161
}
148162
}
149163
} finally {

0 commit comments

Comments
 (0)