Skip to content

Commit e845fe4

Browse files
Test transformed data is received in order of publish.
1 parent ddde05c commit e845fe4

File tree

2 files changed

+77
-0
lines changed

2 files changed

+77
-0
lines changed

tests/objectbox-java-test/src/test/java/io/objectbox/ObjectClassObserverTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.util.List;
2424
import java.util.concurrent.CopyOnWriteArrayList;
2525
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728

29+
import io.objectbox.query.QueryObserverTest;
2830
import io.objectbox.reactive.DataObserver;
2931
import io.objectbox.reactive.DataSubscription;
3032
import io.objectbox.reactive.DataTransformer;
@@ -194,6 +196,42 @@ private void testTransform(TestScheduler scheduler) throws InterruptedException
194196
assertEquals(0, objectCounts.size());
195197
}
196198

199+
/**
200+
* There is an identical test asserting QueryPublisher at
201+
* {@link QueryObserverTest#transform_inOrderOfPublish()}.
202+
*/
203+
@Test
204+
public void transform_inOrderOfPublish() {
205+
final CountDownLatch latch = new CountDownLatch(2);
206+
final AtomicBoolean isLongTransform = new AtomicBoolean(true);
207+
final List<Integer> placing = new CopyOnWriteArrayList<>();
208+
209+
// Make first transformation take longer than second.
210+
DataSubscription subscription = store.subscribe(TestEntity.class).transform(source -> {
211+
if (isLongTransform.compareAndSet(true, false)) {
212+
// Wait long enough so publish triggered by transaction
213+
// can overtake publish triggered during observer() call.
214+
Thread.sleep(1000);
215+
return 1; // First, during observer() call.
216+
}
217+
return 2; // Second, due to transaction.
218+
}).observer(data -> {
219+
placing.add(data);
220+
latch.countDown();
221+
});
222+
223+
// Trigger publish due to transaction.
224+
store.runInTx(() -> putTestEntities(1));
225+
226+
assertLatchCountedDown(latch, 3);
227+
subscription.cancel();
228+
229+
// Second publish request should still deliver second.
230+
assertEquals(2, placing.size());
231+
assertEquals(1, (int) placing.get(0));
232+
assertEquals(2, (int) placing.get(1));
233+
}
234+
197235
@Test
198236
public void testScheduler() {
199237
TestScheduler scheduler = new TestScheduler();

tests/objectbox-java-test/src/test/java/io/objectbox/query/QueryObserverTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import io.objectbox.AbstractObjectBoxTest;
2929
import io.objectbox.Box;
30+
import io.objectbox.ObjectClassObserverTest;
3031
import io.objectbox.TestEntity;
3132
import io.objectbox.reactive.DataObserver;
3233
import io.objectbox.reactive.DataSubscription;
@@ -149,6 +150,44 @@ public void testTransformer() throws InterruptedException {
149150
assertEquals(2003 + 2007 + 2002, (int) receivedSums.get(1));
150151
}
151152

153+
/**
154+
* There is an identical test asserting ObjectClassPublisher at
155+
* {@link ObjectClassObserverTest#transform_inOrderOfPublish()}.
156+
*/
157+
@Test
158+
public void transform_inOrderOfPublish() {
159+
Query<TestEntity> query = box.query().build();
160+
161+
final CountDownLatch latch = new CountDownLatch(2);
162+
final AtomicBoolean isLongTransform = new AtomicBoolean(true);
163+
final List<Integer> placing = new CopyOnWriteArrayList<>();
164+
165+
// Make first transformation take longer than second.
166+
DataSubscription subscription = query.subscribe().transform(source -> {
167+
if (isLongTransform.compareAndSet(true, false)) {
168+
// Wait long enough so publish triggered by transaction
169+
// can overtake publish triggered during observer() call.
170+
Thread.sleep(1000);
171+
return 1; // First, during observer() call.
172+
}
173+
return 2; // Second, due to transaction.
174+
}).observer(data -> {
175+
placing.add(data);
176+
latch.countDown();
177+
});
178+
179+
// Trigger publish due to transaction.
180+
store.runInTx(() -> putTestEntities(1));
181+
182+
assertLatchCountedDown(latch, 3);
183+
subscription.cancel();
184+
185+
// Second publish request should still deliver second.
186+
assertEquals(2, placing.size());
187+
assertEquals(1, (int) placing.get(0));
188+
assertEquals(2, (int) placing.get(1));
189+
}
190+
152191
private void putTestEntitiesScalars() {
153192
putTestEntities(10, null, 2000);
154193
}

0 commit comments

Comments
 (0)