20
20
import io .r2dbc .mssql .client .ConnectionContext ;
21
21
import io .r2dbc .mssql .codec .Codecs ;
22
22
import io .r2dbc .mssql .message .token .AbstractDoneToken ;
23
- import io .r2dbc .mssql .message .token .AbstractInfoToken ;
24
23
import io .r2dbc .mssql .message .token .ColumnMetadataToken ;
25
24
import io .r2dbc .mssql .message .token .ErrorToken ;
26
25
import io .r2dbc .mssql .message .token .NbcRowToken ;
27
26
import io .r2dbc .mssql .message .token .ReturnValue ;
28
27
import io .r2dbc .mssql .message .token .RowToken ;
29
28
import io .r2dbc .mssql .util .Assert ;
30
29
import io .r2dbc .spi .R2dbcException ;
30
+ import io .r2dbc .spi .Readable ;
31
31
import io .r2dbc .spi .Result ;
32
32
import io .r2dbc .spi .Row ;
33
33
import io .r2dbc .spi .RowMetadata ;
44
44
import java .util .function .Predicate ;
45
45
46
46
/**
47
- * TODO: Revisit segment-based result consumption as we need to materialize {@link Segment} for filter(…) and ReturnValue processing.
48
- * <p>
49
47
* {@link Result} of query results.
50
48
*
51
49
* @author Mark Paluch
@@ -97,8 +95,7 @@ static MssqlResult toResult(String sql, ConnectionContext context, Codecs codecs
97
95
98
96
LOGGER .debug (context .getMessage ("Creating new result" ));
99
97
100
- // return new MssqlResult(sql, context, codecs, messages, expectReturnValues);
101
- return MssqlSegmentResult .toResult (sql , context , codecs , messages , expectReturnValues );
98
+ return new DefaultMssqlResult (sql , context , codecs , messages , expectReturnValues );
102
99
}
103
100
104
101
@ Override
@@ -144,14 +141,26 @@ public Mono<Integer> getRowsUpdated() {
144
141
}
145
142
146
143
@ Override
147
- public <T > Flux <T > map (BiFunction <Row , RowMetadata , ? extends T > f ) {
144
+ public <T > Flux <T > map (BiFunction <Row , RowMetadata , ? extends T > mappingFunction ) {
145
+ Assert .requireNonNull (mappingFunction , "Mapping function must not be null" );
146
+ return doMap (true , false , readable -> {
147
+ Row row = (Row ) readable ;
148
+ return mappingFunction .apply (row , row .getMetadata ());
149
+ });
150
+ }
148
151
149
- Assert .requireNonNull (f , "Mapping function must not be null" );
152
+ @ Override
153
+ public <T > Publisher <T > map (Function <? super Readable , ? extends T > mappingFunction ) {
154
+ Assert .requireNonNull (mappingFunction , "Mapping function must not be null" );
155
+ return doMap (true , true , mappingFunction );
156
+ }
157
+
158
+ private <T > Flux <T > doMap (boolean rows , boolean outparameters , Function <? super Readable , ? extends T > mappingFunction ) {
150
159
151
160
Flux <T > mappedReturnValues = Flux .empty ();
152
161
Flux <io .r2dbc .mssql .message .Message > messages = this .messages ;
153
162
154
- if (this .expectReturnValues ) {
163
+ if (this .expectReturnValues && outparameters ) {
155
164
156
165
List <ReturnValue > returnValues = new ArrayList <>();
157
166
@@ -160,23 +169,22 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
160
169
if (message instanceof ReturnValue ) {
161
170
returnValues .add ((ReturnValue ) message );
162
171
}
163
- });
172
+ }). filter ( it -> !( it instanceof ReturnValue )) ;
164
173
165
174
mappedReturnValues = Flux .defer (() -> {
166
175
167
176
if (returnValues .size () != 0 ) {
168
- return Flux .just (MssqlReturnValues .toReturnValues (this .codecs , returnValues ));
169
- }
170
-
171
- return Flux .empty ();
172
177
173
- }). handle (( row , sink ) -> {
178
+ MssqlReturnValues mssqlReturnValues = MssqlReturnValues . toReturnValues ( this . codecs , returnValues );
174
179
175
- try {
176
- sink .next (f .apply (row , row .getMetadata ()));
177
- } finally {
178
- row .release ();
180
+ try {
181
+ return Flux .just (mappingFunction .apply (mssqlReturnValues ));
182
+ } finally {
183
+ mssqlReturnValues .release ();
184
+ }
179
185
}
186
+
187
+ return Flux .empty ();
180
188
});
181
189
}
182
190
@@ -200,7 +208,7 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
200
208
return ;
201
209
}
202
210
203
- if (message .getClass () == RowToken .class || message .getClass () == NbcRowToken .class ) {
211
+ if (rows && ( message .getClass () == RowToken .class || message .getClass () == NbcRowToken .class ) ) {
204
212
205
213
MssqlRowMetadata rowMetadata = this .rowMetadata ;
206
214
@@ -211,7 +219,7 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
211
219
212
220
MssqlRow row = MssqlRow .toRow (this .codecs , (RowToken ) message , rowMetadata );
213
221
try {
214
- sink .next (f .apply (row , row . getMetadata () ));
222
+ sink .next (mappingFunction .apply (row ));
215
223
} finally {
216
224
row .release ();
217
225
}
@@ -253,150 +261,13 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
253
261
}
254
262
255
263
@ Override
256
- public Result filter (Predicate <Segment > filter ) {
257
-
258
- Flux <io .r2dbc .mssql .message .Message > filteredMessages = this .messages .filter (message -> {
259
-
260
- if (message .getClass () == ColumnMetadataToken .class ) {
261
-
262
- ColumnMetadataToken token = (ColumnMetadataToken ) message ;
263
-
264
- if (token .hasColumns ()) {
265
- this .rowMetadata = MssqlRowMetadata .create (this .codecs , token );
266
- }
267
- return true ;
268
- }
269
-
270
- if (message .getClass () == RowToken .class || message .getClass () == NbcRowToken .class ) {
271
-
272
- MssqlRowMetadata rowMetadata = this .rowMetadata ;
273
-
274
- if (rowMetadata == null ) {
275
- return false ;
276
- }
277
-
278
- MssqlRow row = MssqlRow .toRow (this .codecs , (RowToken ) message , rowMetadata );
279
-
280
- boolean result = filter .test (row );
281
-
282
- if (!result ) {
283
- row .release ();
284
- }
285
-
286
- return result ;
287
- }
288
-
289
- if (message instanceof AbstractInfoToken ) {
290
- return filter .test (createMessage ((AbstractInfoToken ) message ));
291
- }
292
-
293
- if (message instanceof AbstractDoneToken ) {
294
-
295
- AbstractDoneToken doneToken = (AbstractDoneToken ) message ;
296
- if (doneToken .hasCount ()) {
297
-
298
- return filter .test (doneToken );
299
- }
300
- }
301
- return true ;
302
- });
303
-
304
- return new DefaultMssqlResult (this .sql , this .context , this .codecs , filteredMessages , this .expectReturnValues );
264
+ public MssqlResult filter (Predicate <Segment > filter ) {
265
+ return MssqlSegmentResult .toResult (this .sql , this .context , this .codecs , this .messages , this .expectReturnValues ).filter (filter );
305
266
}
306
267
307
268
@ Override
308
269
public <T > Flux <T > flatMap (Function <Segment , ? extends Publisher <? extends T >> mappingFunction ) {
309
-
310
- return this .messages
311
- .flatMap (message -> {
312
-
313
- if (message instanceof AbstractDoneToken ) {
314
-
315
- AbstractDoneToken doneToken = (AbstractDoneToken ) message ;
316
- if (doneToken .hasCount ()) {
317
-
318
- if (DEBUG_ENABLED ) {
319
- LOGGER .debug (this .context .getMessage ("Incoming row count: {}" ), doneToken );
320
- }
321
-
322
- return mappingFunction .apply (doneToken );
323
- }
324
- }
325
-
326
- if (message .getClass () == ColumnMetadataToken .class ) {
327
-
328
- ColumnMetadataToken token = (ColumnMetadataToken ) message ;
329
-
330
- if (!token .hasColumns ()) {
331
- return Mono .empty ();
332
- }
333
-
334
- if (DEBUG_ENABLED ) {
335
- LOGGER .debug (this .context .getMessage ("Result column definition: {}" ), message );
336
- }
337
-
338
- this .rowMetadata = MssqlRowMetadata .create (this .codecs , token );
339
- }
340
-
341
- if (message .getClass () == RowToken .class || message .getClass () == NbcRowToken .class ) {
342
-
343
- MssqlRowMetadata rowMetadata = this .rowMetadata ;
344
-
345
- if (rowMetadata == null ) {
346
- return Mono .error (new IllegalStateException ("No MssqlRowMetadata available" ));
347
- }
348
-
349
- MssqlRow row = MssqlRow .toRow (this .codecs , (RowToken ) message , rowMetadata );
350
-
351
- try {
352
- return Flux .from (mappingFunction .apply (row )).doFinally (it -> row .release ());
353
- } catch (RuntimeException e ) {
354
- row .release ();
355
- throw e ;
356
- }
357
- }
358
-
359
- if (message instanceof AbstractInfoToken ) {
360
- return mappingFunction .apply (createMessage ((AbstractInfoToken ) message ));
361
- }
362
-
363
- ReferenceCountUtil .release (message );
364
-
365
- return Mono .empty ();
366
- });
367
- }
368
-
369
- private Message createMessage (AbstractInfoToken message ) {
370
-
371
- ErrorDetails errorDetails = ExceptionFactory .createErrorDetails (message );
372
-
373
- return new Message () {
374
-
375
- @ Override
376
- public R2dbcException exception () {
377
- return ExceptionFactory .createException (message , DefaultMssqlResult .this .sql );
378
- }
379
-
380
- @ Override
381
- public int errorCode () {
382
- return (int ) errorDetails .getNumber ();
383
- }
384
-
385
- @ Override
386
- public String sqlState () {
387
- return errorDetails .getStateCode ();
388
- }
389
-
390
- @ Override
391
- public String message () {
392
- return errorDetails .getMessage ();
393
- }
394
-
395
- @ Override
396
- public Severity severity () {
397
- return message instanceof ErrorToken ? Severity .ERROR : Severity .INFO ;
398
- }
399
- };
270
+ return MssqlSegmentResult .toResult (this .sql , this .context , this .codecs , this .messages , this .expectReturnValues ).flatMap (mappingFunction );
400
271
}
401
272
402
273
}
0 commit comments