10
10
*/
11
11
package io .vertx .oracleclient .impl ;
12
12
13
- import io .vertx .core .Context ;
14
13
import io .vertx .core .Future ;
15
14
import io .vertx .core .Promise ;
16
15
import io .vertx .core .impl .ContextInternal ;
17
- import io .vertx .sqlclient .PropertyKind ;
18
16
import io .vertx .sqlclient .Row ;
19
- import io .vertx .sqlclient .RowIterator ;
20
- import io .vertx .sqlclient .RowSet ;
21
- import io .vertx .sqlclient .desc .ColumnDescriptor ;
22
17
import io .vertx .sqlclient .impl .QueryResultHandler ;
23
18
import io .vertx .sqlclient .impl .RowDesc ;
24
19
25
- import java .util .ArrayList ;
26
- import java .util .Iterator ;
27
- import java .util .List ;
28
20
import java .util .concurrent .Flow ;
29
21
import java .util .concurrent .atomic .AtomicBoolean ;
30
22
import java .util .concurrent .atomic .AtomicInteger ;
23
+ import java .util .stream .Collector ;
31
24
32
- public class RowReader implements Flow .Subscriber <Row > {
25
+ public class RowReader < R , A > implements Flow .Subscriber <Row > {
33
26
34
27
private final Flow .Publisher <Row > publisher ;
35
28
private final ContextInternal context ;
36
29
private final RowDesc description ;
37
- private final QueryResultHandler <RowSet < Row > > handler ;
30
+ private final QueryResultHandler <R > handler ;
38
31
private volatile Flow .Subscription subscription ;
39
32
private final Promise <Void > subscriptionPromise ;
40
33
private Promise <Void > readPromise ;
41
34
private volatile boolean completed ;
42
35
private volatile Throwable failed ;
43
- private volatile OracleRowSet collector ;
36
+ private final Collector <Row , A , R > collector ;
37
+ private A accumulator ;
38
+ private int count ;
44
39
private final AtomicInteger toRead = new AtomicInteger ();
45
40
46
41
private final AtomicBoolean wip = new AtomicBoolean ();
47
42
48
- public RowReader (Flow .Publisher <Row > publisher , RowDesc description , Promise <Void > promise ,
49
- QueryResultHandler <RowSet < Row > > handler ,
43
+ public RowReader (Flow .Publisher <Row > publisher , Collector < Row , A , R > collector , RowDesc description , Promise <Void > promise ,
44
+ QueryResultHandler <R > handler ,
50
45
ContextInternal context ) {
51
46
this .publisher = publisher ;
52
47
this .description = description ;
53
48
this .subscriptionPromise = promise ;
54
49
this .handler = handler ;
55
50
this .context = context ;
51
+ this .collector = collector ;
56
52
}
57
53
58
- public static Future <RowReader > create (Flow .Publisher <Row > publisher , ContextInternal context ,
59
- QueryResultHandler <RowSet <Row >> handler , RowDesc description ) {
54
+ public static <R > Future <RowReader <R , ?>> create (Flow .Publisher <Row > publisher ,
55
+ Collector <Row , ?, R > collector ,
56
+ ContextInternal context ,
57
+ QueryResultHandler <R > handler ,
58
+ RowDesc description ) {
60
59
Promise <Void > promise = context .promise ();
61
- RowReader reader = new RowReader (publisher , description , promise , handler , context );
60
+ RowReader < R , ?> reader = new RowReader <> (publisher , collector , description , promise , handler , context );
62
61
reader .subscribe ();
63
62
return promise .future ().map (reader );
64
63
}
@@ -75,7 +74,8 @@ public Future<Void> read(int fetchSize) {
75
74
}
76
75
if (wip .compareAndSet (false , true )) {
77
76
toRead .set (fetchSize );
78
- collector = new OracleRowSet (description );
77
+ accumulator = collector .supplier ().get ();
78
+ count = 0 ;
79
79
readPromise = context .promise ();
80
80
subscription .request (fetchSize );
81
81
return readPromise .future ();
@@ -96,10 +96,12 @@ public void onSubscribe(Flow.Subscription subscription) {
96
96
97
97
@ Override
98
98
public void onNext (Row item ) {
99
- collector .add (item );
99
+ collector .accumulator ().accept (accumulator , item );
100
+ count ++;
100
101
if (toRead .decrementAndGet () == 0 && wip .compareAndSet (true , false )) {
102
+ R result = collector .finisher ().apply (accumulator );
101
103
try {
102
- handler .handleResult (collector . rowCount (), collector . size () , description , collector , null );
104
+ handler .handleResult (count , count , description , result , null );
103
105
} catch (Exception e ) {
104
106
e .printStackTrace ();
105
107
}
@@ -122,69 +124,4 @@ public void onComplete() {
122
124
context .runOnContext (x -> readPromise .complete (null ));
123
125
}
124
126
}
125
-
126
- private class OracleRowSet implements RowSet <Row > {
127
-
128
- private final List <Row > rows = new ArrayList <>();
129
- private final RowDesc desc ;
130
-
131
- private OracleRowSet (RowDesc desc ) {
132
- this .desc = desc ;
133
- }
134
-
135
- @ Override
136
- public RowIterator <Row > iterator () {
137
- Iterator <Row > iterator = rows .iterator ();
138
- return new RowIterator <>() {
139
- @ Override
140
- public boolean hasNext () {
141
- return iterator .hasNext ();
142
- }
143
-
144
- @ Override
145
- public Row next () {
146
- return iterator .next ();
147
- }
148
- };
149
- }
150
-
151
- @ Override
152
- public int rowCount () {
153
- return rows .size ();
154
- }
155
-
156
- @ Override
157
- public List <String > columnsNames () {
158
- return desc .columnNames ();
159
- }
160
-
161
- @ Override
162
- public List <ColumnDescriptor > columnDescriptors () {
163
- return desc .columnDescriptor ();
164
- }
165
-
166
- @ Override
167
- public int size () {
168
- return rows .size ();
169
- }
170
-
171
- @ Override
172
- public <V > V property (PropertyKind <V > propertyKind ) {
173
- return null ; // TODO
174
- }
175
-
176
- @ Override
177
- public RowSet <Row > value () {
178
- return this ;
179
- }
180
-
181
- @ Override
182
- public RowSet <Row > next () {
183
- return null ;
184
- }
185
-
186
- public void add (Row item ) {
187
- rows .add (item );
188
- }
189
- }
190
127
}
0 commit comments