|
23 | 23 | import java.util.List;
|
24 | 24 | import java.util.concurrent.CopyOnWriteArrayList;
|
25 | 25 | import java.util.concurrent.CountDownLatch;
|
| 26 | +import java.util.concurrent.atomic.AtomicBoolean; |
26 | 27 |
|
27 | 28 | import io.objectbox.AbstractObjectBoxTest;
|
28 | 29 | import io.objectbox.Box;
|
29 | 30 | import io.objectbox.TestEntity;
|
30 | 31 | import io.objectbox.reactive.DataObserver;
|
| 32 | +import io.objectbox.reactive.DataSubscription; |
31 | 33 | import io.objectbox.reactive.DataTransformer;
|
32 | 34 |
|
33 | 35 |
|
@@ -65,6 +67,43 @@ public void testObserver() {
|
65 | 67 | assertEquals(3, receivedChanges.get(0).size());
|
66 | 68 | }
|
67 | 69 |
|
| 70 | + @Test |
| 71 | + public void observer_resultsDeliveredInOrder() { |
| 72 | + Query<TestEntity> query = box.query().build(); |
| 73 | + |
| 74 | + final CountDownLatch latch = new CountDownLatch(2); |
| 75 | + final AtomicBoolean isLongTransform = new AtomicBoolean(true); |
| 76 | + final List<Integer> placing = new CopyOnWriteArrayList<>(); |
| 77 | + |
| 78 | + // Block first onData call long enough so second one can race it. |
| 79 | + DataSubscription subscription = query.subscribe().observer(data -> { |
| 80 | + if (isLongTransform.compareAndSet(true, false)) { |
| 81 | + // Wait long enough so publish triggered by transaction |
| 82 | + // can overtake publish triggered during observer() call. |
| 83 | + try { |
| 84 | + Thread.sleep(1000); |
| 85 | + } catch (InterruptedException e) { |
| 86 | + throw new RuntimeException(e); |
| 87 | + } |
| 88 | + placing.add(1); // First, during observer() call. |
| 89 | + } else { |
| 90 | + placing.add(2); // Second, due to transaction. |
| 91 | + } |
| 92 | + latch.countDown(); |
| 93 | + }); |
| 94 | + |
| 95 | + // Trigger publish due to transaction. |
| 96 | + store.runInTx(() -> putTestEntities(1)); |
| 97 | + |
| 98 | + assertLatchCountedDown(latch, 3); |
| 99 | + subscription.cancel(); |
| 100 | + |
| 101 | + // Second publish request should still deliver second. |
| 102 | + assertEquals(2, placing.size()); |
| 103 | + assertEquals(1, (int) placing.get(0)); |
| 104 | + assertEquals(2, (int) placing.get(1)); |
| 105 | + } |
| 106 | + |
68 | 107 | @Test
|
69 | 108 | public void testSingle() throws InterruptedException {
|
70 | 109 | putTestEntitiesScalars();
|
|
0 commit comments