19
19
import org .junit .Before ;
20
20
import org .junit .Test ;
21
21
22
- import java .util .ArrayList ;
22
+ import java .util .Arrays ;
23
23
import java .util .List ;
24
24
import java .util .concurrent .CopyOnWriteArrayList ;
25
25
import java .util .concurrent .CountDownLatch ;
26
+ import java .util .concurrent .TimeUnit ;
26
27
27
28
import io .objectbox .AbstractObjectBoxTest ;
28
29
import io .objectbox .Box ;
29
30
import io .objectbox .TestEntity ;
30
31
import io .objectbox .reactive .DataObserver ;
31
- import io .objectbox .reactive .DataTransformer ;
32
32
33
33
34
34
import static io .objectbox .TestEntity_ .simpleInt ;
35
35
import static org .junit .Assert .assertEquals ;
36
+ import static org .junit .Assert .assertTrue ;
36
37
37
- public class QueryObserverTest extends AbstractObjectBoxTest implements DataObserver < List < TestEntity >> {
38
+ public class QueryObserverTest extends AbstractObjectBoxTest {
38
39
39
40
private Box <TestEntity > box ;
40
- private List <List <TestEntity >> receivedChanges = new CopyOnWriteArrayList <>();
41
- private CountDownLatch latch = new CountDownLatch (1 );
42
41
43
42
@ Before
44
43
public void setUpBox () {
@@ -52,80 +51,117 @@ public void testObserver() {
52
51
assertEquals (0 , query .count ());
53
52
54
53
// Initial data on subscription.
55
- query .subscribe ().observer (this );
56
- assertLatchCountedDown (latch , 5 );
57
- assertEquals (1 , receivedChanges .size ());
58
- assertEquals (0 , receivedChanges .get (0 ).size ());
54
+ TestObserver <List <TestEntity >> testObserver = new TestObserver <>();
55
+ query .subscribe ().observer (testObserver );
56
+ testObserver .assertLatchCountedDown ();
57
+ assertEquals (1 , testObserver .receivedChanges .size ());
58
+ assertEquals (0 , testObserver .receivedChanges .get (0 ).size ());
59
59
60
60
// On put.
61
- receivedChanges .clear ();
62
- latch = new CountDownLatch ( 1 );
61
+ testObserver . receivedChanges .clear ();
62
+ testObserver . resetLatch ( );
63
63
putTestEntitiesScalars ();
64
- assertLatchCountedDown (latch , 5 );
65
- assertEquals (1 , receivedChanges .size ());
66
- assertEquals (3 , receivedChanges .get (0 ).size ());
64
+ testObserver . assertLatchCountedDown ();
65
+ assertEquals (1 , testObserver . receivedChanges .size ());
66
+ assertEquals (3 , testObserver . receivedChanges .get (0 ).size ());
67
67
68
68
// On remove all.
69
- receivedChanges .clear ();
70
- latch = new CountDownLatch ( 1 );
69
+ testObserver . receivedChanges .clear ();
70
+ testObserver . resetLatch ( );
71
71
box .removeAll ();
72
- assertLatchCountedDown (latch , 5 );
73
- assertEquals (1 , receivedChanges .size ());
74
- assertEquals (0 , receivedChanges .get (0 ).size ());
72
+ testObserver . assertLatchCountedDown ();
73
+ assertEquals (1 , testObserver . receivedChanges .size ());
74
+ assertEquals (0 , testObserver . receivedChanges .get (0 ).size ());
75
75
}
76
76
77
77
@ Test
78
78
public void testSingle () throws InterruptedException {
79
79
putTestEntitiesScalars ();
80
80
int [] valuesInt = {2003 , 2007 , 2002 };
81
81
Query <TestEntity > query = box .query ().in (simpleInt , valuesInt ).build ();
82
- query .subscribe ().single ().observer (this );
83
- assertLatchCountedDown (latch , 5 );
84
- assertEquals (1 , receivedChanges .size ());
85
- assertEquals (3 , receivedChanges .get (0 ).size ());
86
82
87
- receivedChanges .clear ();
83
+ TestObserver <List <TestEntity >> testObserver = new TestObserver <>();
84
+ query .subscribe ().single ().observer (testObserver );
85
+ testObserver .assertLatchCountedDown ();
86
+ assertEquals (1 , testObserver .receivedChanges .size ());
87
+ assertEquals (3 , testObserver .receivedChanges .get (0 ).size ());
88
+
89
+ testObserver .receivedChanges .clear ();
88
90
putTestEntities (1 );
89
91
Thread .sleep (20 );
90
- assertEquals (0 , receivedChanges .size ());
92
+ assertEquals (0 , testObserver . receivedChanges .size ());
91
93
}
92
94
93
95
@ Test
94
96
public void testTransformer () throws InterruptedException {
95
97
int [] valuesInt = {2003 , 2007 , 2002 };
96
98
Query <TestEntity > query = box .query ().in (simpleInt , valuesInt ).build ();
97
99
assertEquals (0 , query .count ());
98
- final List <Integer > receivedSums = new ArrayList <>();
100
+ TestObserver <Integer > testObserver = new TestObserver <>();
99
101
100
102
query .subscribe ().transform (source -> {
101
103
int sum = 0 ;
102
104
for (TestEntity entity : source ) {
103
105
sum += entity .getSimpleInt ();
104
106
}
105
107
return sum ;
106
- }).observer (data -> {
107
- receivedSums .add (data );
108
- latch .countDown ();
109
- });
110
- assertLatchCountedDown (latch , 5 );
108
+ }).observer (testObserver );
109
+ testObserver .assertLatchCountedDown ();
111
110
112
- latch = new CountDownLatch ( 1 );
111
+ testObserver . resetLatch ( );
113
112
putTestEntitiesScalars ();
114
- assertLatchCountedDown (latch , 5 );
113
+ testObserver . assertLatchCountedDown ();
115
114
Thread .sleep (20 );
116
115
117
- assertEquals (2 , receivedSums .size ());
118
- assertEquals (0 , (int ) receivedSums .get (0 ));
119
- assertEquals (2003 + 2007 + 2002 , (int ) receivedSums .get (1 ));
116
+ assertEquals (2 , testObserver . receivedChanges .size ());
117
+ assertEquals (0 , (int ) testObserver . receivedChanges .get (0 ));
118
+ assertEquals (2003 + 2007 + 2002 , (int ) testObserver . receivedChanges .get (1 ));
120
119
}
121
120
122
121
private void putTestEntitiesScalars () {
123
122
putTestEntities (10 , null , 2000 );
124
123
}
125
124
126
- @ Override
127
- public void onData (List <TestEntity > queryResult ) {
128
- receivedChanges .add (queryResult );
129
- latch .countDown ();
125
+ public static class TestObserver <T > implements DataObserver <T > {
126
+
127
+ List <T > receivedChanges = new CopyOnWriteArrayList <>();
128
+ CountDownLatch latch = new CountDownLatch (1 );
129
+
130
+ private void log (String message ) {
131
+ System .out .println ("TestObserver: " + message );
132
+ }
133
+
134
+ void printEvents () {
135
+ int count = receivedChanges .size ();
136
+ log ("Received " + count + " event(s):" );
137
+ for (int i = 0 ; i < count ; i ++) {
138
+ T receivedChange = receivedChanges .get (i );
139
+ if (receivedChange instanceof List ) {
140
+ List <?> list = (List <?>) receivedChange ;
141
+ log ((i + 1 ) + "/" + count + ": size=" + list .size ()
142
+ + "; items=" + Arrays .toString (list .toArray ()));
143
+ }
144
+ }
145
+ }
146
+
147
+ void resetLatch () {
148
+ latch = new CountDownLatch (1 );
149
+ }
150
+
151
+ void assertLatchCountedDown () {
152
+ try {
153
+ assertTrue (latch .await (5 , TimeUnit .SECONDS ));
154
+ } catch (InterruptedException e ) {
155
+ throw new RuntimeException (e );
156
+ }
157
+ printEvents ();
158
+ }
159
+
160
+ @ Override
161
+ public void onData (T data ) {
162
+ receivedChanges .add (data );
163
+ latch .countDown ();
164
+ }
165
+
130
166
}
131
167
}
0 commit comments