Skip to content

Commit 4d7c147

Browse files
ObjectClassPublisher: process all requests on same thread to ensure order.
1 parent 18f9be5 commit 4d7c147

File tree

1 file changed

+56
-31
lines changed

1 file changed

+56
-31
lines changed

objectbox-java/src/main/java/io/objectbox/ObjectClassPublisher.java

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,28 @@
3131
import io.objectbox.reactive.DataObserver;
3232
import io.objectbox.reactive.DataPublisher;
3333
import io.objectbox.reactive.DataPublisherUtils;
34+
import io.objectbox.reactive.SubscriptionBuilder;
3435

36+
/**
37+
* A {@link DataPublisher} that notifies {@link DataObserver}s about changes in an entity box.
38+
* Publishing is requested when a subscription is {@link SubscriptionBuilder#observer(DataObserver) observed} and
39+
* then by {@link BoxStore} for each {@link BoxStore#txCommitted(Transaction, int[]) txCommitted}.
40+
* Publish requests are processed on a single thread, one at a time, in the order publishing was requested.
41+
*/
42+
@SuppressWarnings("rawtypes")
3543
@Internal
3644
class ObjectClassPublisher implements DataPublisher<Class>, Runnable {
3745
final BoxStore boxStore;
3846
final MultimapSet<Integer, DataObserver<Class>> observersByEntityTypeId = MultimapSet.create(SetType.THREAD_SAFE);
39-
final Deque<int[]> changesQueue = new ArrayDeque<>();
47+
private final Deque<PublishRequest> changesQueue = new ArrayDeque<>();
48+
private static class PublishRequest {
49+
@Nullable private final DataObserver<Class> observer;
50+
private final int[] entityTypeIds;
51+
PublishRequest(@Nullable DataObserver<Class> observer, int[] entityTypeIds) {
52+
this.observer = observer;
53+
this.entityTypeIds = entityTypeIds;
54+
}
55+
}
4056
volatile boolean changePublisherRunning;
4157

4258
ObjectClassPublisher(BoxStore boxStore) {
@@ -76,21 +92,19 @@ private void unsubscribe(DataObserver<Class> observer, int entityTypeId) {
7692
}
7793

7894
@Override
79-
public void publishSingle(final DataObserver<Class> observer, @Nullable final Object forClass) {
80-
boxStore.internalScheduleThread(new Runnable() {
81-
@Override
82-
public void run() {
83-
Collection<Class> entityClasses = forClass != null ? Collections.singletonList((Class) forClass) :
84-
boxStore.getAllEntityClasses();
85-
for (Class entityClass : entityClasses) {
86-
try {
87-
observer.onData(entityClass);
88-
} catch (RuntimeException e) {
89-
handleObserverException(entityClass);
90-
}
91-
}
95+
public void publishSingle(DataObserver<Class> observer, @Nullable Object forClass) {
96+
int[] entityTypeIds = forClass != null
97+
? new int[]{boxStore.getEntityTypeIdOrThrow((Class) forClass)}
98+
: boxStore.getAllEntityTypeIds();
99+
100+
synchronized (changesQueue) {
101+
changesQueue.add(new PublishRequest(observer, entityTypeIds));
102+
// Only one thread at a time.
103+
if (!changePublisherRunning) {
104+
changePublisherRunning = true;
105+
boxStore.internalScheduleThread(this);
92106
}
93-
});
107+
}
94108
}
95109

96110
private void handleObserverException(Class objectClass) {
@@ -107,39 +121,50 @@ private void handleObserverException(Class objectClass) {
107121
*/
108122
void publish(int[] entityTypeIdsAffected) {
109123
synchronized (changesQueue) {
110-
changesQueue.add(entityTypeIdsAffected);
111-
// Only one thread at a time
124+
changesQueue.add(new PublishRequest(null, entityTypeIdsAffected));
125+
// Only one thread at a time.
112126
if (!changePublisherRunning) {
113127
changePublisherRunning = true;
114128
boxStore.internalScheduleThread(this);
115129
}
116130
}
117131
}
118132

133+
/**
134+
* Processes publish requests using a single thread to prevent any data generated by observers to get stale.
135+
* This publisher on its own can NOT deliver stale data (the entity class types do not change).
136+
* However, a {@link DataObserver} of this publisher might apply a {@link io.objectbox.reactive.DataTransformer}
137+
* which queries for data which CAN get stale if delivered out of order.
138+
*/
119139
@Override
120140
public void run() {
121141
try {
122142
while (true) {
123-
// We do not join all available array, just in case the app relies on a specific order
124-
int[] entityTypeIdsAffected;
143+
PublishRequest request;
125144
synchronized (changesQueue) {
126-
entityTypeIdsAffected = changesQueue.pollFirst();
127-
if (entityTypeIdsAffected == null) {
145+
request = changesQueue.pollFirst();
146+
if (request == null) {
128147
changePublisherRunning = false;
129148
break;
130149
}
131150
}
132-
for (int entityTypeId : entityTypeIdsAffected) {
133-
Collection<DataObserver<Class>> observers = observersByEntityTypeId.get(entityTypeId);
134-
if (observers != null && !observers.isEmpty()) {
135-
Class objectClass = boxStore.getEntityClassOrThrow(entityTypeId);
136-
try {
137-
for (DataObserver<Class> observer : observers) {
138-
observer.onData(objectClass);
139-
}
140-
} catch (RuntimeException e) {
141-
handleObserverException(objectClass);
151+
152+
for (int entityTypeId : request.entityTypeIds) {
153+
// If no specific observer specified, notify all current observers.
154+
Collection<DataObserver<Class>> observers = request.observer != null
155+
? Collections.singletonList(request.observer)
156+
: observersByEntityTypeId.get(entityTypeId);
157+
if (observers == null || observers.isEmpty()) {
158+
continue; // No observers for this entity type.
159+
}
160+
161+
Class entityClass = boxStore.getEntityClassOrThrow(entityTypeId);
162+
try {
163+
for (DataObserver<Class> observer : observers) {
164+
observer.onData(entityClass);
142165
}
166+
} catch (RuntimeException e) {
167+
handleObserverException(entityClass);
143168
}
144169
}
145170
}

0 commit comments

Comments
 (0)