Skip to content

Commit 9efe389

Browse files
committed
Add initial support for result segment mapping.
Signed-off-by: Mark Paluch <mpaluch@vmware.com> [#199]
1 parent 0547359 commit 9efe389

File tree

5 files changed

+308
-37
lines changed

5 files changed

+308
-37
lines changed

src/main/java/io/r2dbc/mssql/ExceptionFactory.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,20 @@ static R2dbcException createException(AbstractInfoToken token, String sql) {
9090
case 565: // A stack overflow occurred in the server while compiling the query. Please simplify the query.
9191
case 4408: // Too many tables. The query and the views or functions in it exceed the limit of %d tables. Revise the query to reduce the number of tables.
9292
case 2812: // Could not find stored procedure '%.*ls'.
93-
return new MssqlBadGrammarException(createExceptionDetails(token), sql);
93+
return new MssqlBadGrammarException(createErrorDetails(token), sql);
9494

9595
case 2601: // Cannot insert duplicate key row in object '%.*ls' with unique index '%.*ls'. The duplicate key value is %ls.
9696
case 2627: // Violation of %ls constraint '%.*ls'. Cannot insert duplicate key in object '%.*ls'. The duplicate key value is %ls.
9797
case 544: // Cannot insert explicit value for identity column in table '%.*ls' when IDENTITY_INSERT is set to OFF.
9898
case 8114: // Error converting data type %ls to %ls.
9999
case 8115: // Arithmetic overflow error converting %ls to data type %ls.
100-
return new MssqlDataIntegrityViolationException(createExceptionDetails(token));
100+
return new MssqlDataIntegrityViolationException(createErrorDetails(token));
101101

102102
case 701: // Maximum number of databases used for each query has been exceeded. The maximum allowed is %d.
103103
case 1222: // Lock request time out period exceeded.
104104
case 1204: // The instance of the SQL Server Database Engine cannot obtain a LOCK resource at this time. Rerun your statement when there are fewer active users. Ask the database
105105
// administrator to check the lock and memory configuration for this instance, or to check for
106-
return new MssqlTransientException(createExceptionDetails(token));
106+
return new MssqlTransientException(createErrorDetails(token));
107107

108108
case 1203: // Process ID %d attempted to unlock a resource it does not own: %.*ls. Retry the transaction, because this error may be caused by a timing condition. If the problem
109109
// persists, contact the database administrator.
@@ -117,7 +117,7 @@ static R2dbcException createException(AbstractInfoToken token, String sql) {
117117
case 3938: // The transaction has been stopped because it conflicted with the execution of a FILESTREAM close operation using the same transaction. The transaction will be rolled back.
118118
case 28611: // The request is aborted because the transaction has been aborted by Matrix Transaction Coordination Manager. This is mostly caused by one or more transaction particpant
119119
// brick went offline.
120-
return new MssqlRollbackException(createExceptionDetails(token));
120+
return new MssqlRollbackException(createErrorDetails(token));
121121

122122
case 921: // Database '%.*ls' has not been recovered yet. Wait and try again.
123123
case 941: // Database '%.*ls' cannot be opened because it is not started. Retry when the database is started.
@@ -131,7 +131,7 @@ static R2dbcException createException(AbstractInfoToken token, String sql) {
131131
case 40642: // The server is currently too busy. Please try again later.
132132
case 40675: // The service is currently too busy. Please try again later.
133133
case 40825: // Unable to complete request now. Please try again later.
134-
return new MssqlTransientResourceException(createExceptionDetails(token));
134+
return new MssqlTransientResourceException(createErrorDetails(token));
135135
}
136136

137137
if (token.getClassification() == GENERAL_ERROR && token.getNumber() == 4002) {
@@ -141,19 +141,19 @@ static R2dbcException createException(AbstractInfoToken token, String sql) {
141141
switch (token.getClassification()) {
142142
case OBJECT_DOES_NOT_EXIST:
143143
case SYNTAX_ERROR:
144-
return new MssqlBadGrammarException(createExceptionDetails(token), sql);
144+
return new MssqlBadGrammarException(createErrorDetails(token), sql);
145145
case INCONSISTENT_NO_LOCK:
146-
return new MssqlDataIntegrityViolationException(createExceptionDetails(token));
146+
return new MssqlDataIntegrityViolationException(createErrorDetails(token));
147147
case TX_DEADLOCK:
148-
return new MssqlRollbackException(createExceptionDetails(token));
148+
return new MssqlRollbackException(createErrorDetails(token));
149149
case SECURITY:
150-
return new MssqlPermissionDeniedException(createExceptionDetails(token));
150+
return new MssqlPermissionDeniedException(createErrorDetails(token));
151151
case GENERAL_ERROR:
152-
return new MssqlNonTransientException(createExceptionDetails(token));
152+
return new MssqlNonTransientException(createErrorDetails(token));
153153
case OUT_OF_RESOURCES:
154-
return new MssqlTransientResourceException(createExceptionDetails(token));
154+
return new MssqlTransientResourceException(createErrorDetails(token));
155155
default:
156-
return new MssqlNonTransientResourceException(createExceptionDetails(token));
156+
return new MssqlNonTransientResourceException(createErrorDetails(token));
157157
}
158158
}
159159

@@ -181,7 +181,7 @@ RuntimeException createException(ErrorToken message) {
181181
return createException(message, this.sql);
182182
}
183183

184-
private static ErrorDetails createExceptionDetails(AbstractInfoToken token) {
184+
static ErrorDetails createErrorDetails(AbstractInfoToken token) {
185185
return new ErrorDetails(token.getMessage(), token.getNumber(), token.getState(), token.getInfoClass(), token.getServerName(), token.getProcName(), token.getLineNumber());
186186
}
187187

src/main/java/io/r2dbc/mssql/MssqlResult.java

Lines changed: 154 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import io.netty.util.ReferenceCountUtil;
2020
import io.r2dbc.mssql.client.ConnectionContext;
2121
import io.r2dbc.mssql.codec.Codecs;
22-
import io.r2dbc.mssql.message.Message;
2322
import io.r2dbc.mssql.message.token.AbstractDoneToken;
23+
import io.r2dbc.mssql.message.token.AbstractInfoToken;
2424
import io.r2dbc.mssql.message.token.ColumnMetadataToken;
2525
import io.r2dbc.mssql.message.token.ErrorToken;
2626
import io.r2dbc.mssql.message.token.NbcRowToken;
@@ -30,12 +30,15 @@
3030
import io.r2dbc.spi.Result;
3131
import io.r2dbc.spi.Row;
3232
import io.r2dbc.spi.RowMetadata;
33+
import org.reactivestreams.Publisher;
3334
import reactor.core.publisher.Flux;
3435
import reactor.core.publisher.Mono;
3536
import reactor.util.Logger;
3637
import reactor.util.Loggers;
3738

3839
import java.util.function.BiFunction;
40+
import java.util.function.Function;
41+
import java.util.function.Predicate;
3942

4043
/**
4144
* {@link Result} of query results.
@@ -54,13 +57,13 @@ public final class MssqlResult implements Result {
5457

5558
private final Codecs codecs;
5659

57-
private final Flux<Message> messages;
60+
private final Flux<io.r2dbc.mssql.message.Message> messages;
5861

5962
private volatile MssqlRowMetadata rowMetadata;
6063

6164
private volatile RuntimeException throwable;
6265

63-
private MssqlResult(String sql, ConnectionContext context, Codecs codecs, Flux<Message> messages) {
66+
private MssqlResult(String sql, ConnectionContext context, Codecs codecs, Flux<io.r2dbc.mssql.message.Message> messages) {
6467

6568
this.sql = sql;
6669
this.context = context;
@@ -76,7 +79,7 @@ private MssqlResult(String sql, ConnectionContext context, Codecs codecs, Flux<M
7679
* @param messages message stream.
7780
* @return {@link Result} object.
7881
*/
79-
static MssqlResult toResult(String sql, ConnectionContext context, Codecs codecs, Flux<Message> messages) {
82+
static MssqlResult toResult(String sql, ConnectionContext context, Codecs codecs, Flux<io.r2dbc.mssql.message.Message> messages) {
8083

8184
Assert.requireNonNull(sql, "SQL must not be null");
8285
Assert.requireNonNull(codecs, "Codecs must not be null");
@@ -197,4 +200,151 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
197200
});
198201
}
199202

203+
@Override
204+
public Result filter(Predicate<Segment> filter) {
205+
206+
Flux<io.r2dbc.mssql.message.Message> filteredMessages = this.messages.filter(message -> {
207+
208+
if (message.getClass() == ColumnMetadataToken.class) {
209+
210+
ColumnMetadataToken token = (ColumnMetadataToken) message;
211+
212+
if (token.hasColumns()) {
213+
this.rowMetadata = MssqlRowMetadata.create(this.codecs, token);
214+
}
215+
return true;
216+
}
217+
218+
if (message.getClass() == RowToken.class || message.getClass() == NbcRowToken.class) {
219+
220+
MssqlRowMetadata rowMetadata = this.rowMetadata;
221+
222+
if (rowMetadata == null) {
223+
return false;
224+
}
225+
226+
MssqlRow row = MssqlRow.toRow(this.codecs, (RowToken) message, rowMetadata);
227+
228+
boolean result = filter.test(row);
229+
230+
if (!result) {
231+
row.release();
232+
}
233+
234+
return result;
235+
}
236+
237+
if (message instanceof AbstractInfoToken) {
238+
return filter.test(createMessage((AbstractInfoToken) message));
239+
}
240+
241+
if (message instanceof AbstractDoneToken) {
242+
243+
AbstractDoneToken doneToken = (AbstractDoneToken) message;
244+
if (doneToken.hasCount()) {
245+
246+
return filter.test(doneToken);
247+
}
248+
}
249+
return true;
250+
});
251+
252+
return new MssqlResult(this.sql, this.context, this.codecs, filteredMessages);
253+
}
254+
255+
@Override
256+
public <T> Flux<T> flatMap(Function<Segment, ? extends Publisher<? extends T>> mappingFunction) {
257+
258+
return this.messages
259+
.flatMap(message -> {
260+
261+
if (message instanceof AbstractDoneToken) {
262+
263+
AbstractDoneToken doneToken = (AbstractDoneToken) message;
264+
if (doneToken.hasCount()) {
265+
266+
if (DEBUG_ENABLED) {
267+
LOGGER.debug(this.context.getMessage("Incoming row count: {}"), doneToken);
268+
}
269+
270+
return mappingFunction.apply(doneToken);
271+
}
272+
}
273+
274+
if (message.getClass() == ColumnMetadataToken.class) {
275+
276+
ColumnMetadataToken token = (ColumnMetadataToken) message;
277+
278+
if (!token.hasColumns()) {
279+
return Mono.empty();
280+
}
281+
282+
if (DEBUG_ENABLED) {
283+
LOGGER.debug(this.context.getMessage("Result column definition: {}"), message);
284+
}
285+
286+
this.rowMetadata = MssqlRowMetadata.create(this.codecs, token);
287+
}
288+
289+
if (message.getClass() == RowToken.class || message.getClass() == NbcRowToken.class) {
290+
291+
MssqlRowMetadata rowMetadata1 = this.rowMetadata;
292+
293+
if (rowMetadata1 == null) {
294+
return Mono.error(new IllegalStateException("No MssqlRowMetadata available"));
295+
}
296+
297+
MssqlRow row = MssqlRow.toRow(this.codecs, (RowToken) message, rowMetadata1);
298+
299+
try {
300+
return Flux.from(mappingFunction.apply(row)).doFinally(it -> row.release());
301+
} catch (RuntimeException e) {
302+
row.release();
303+
throw e;
304+
}
305+
}
306+
307+
if (message instanceof AbstractInfoToken) {
308+
return mappingFunction.apply(createMessage((AbstractInfoToken) message));
309+
}
310+
311+
ReferenceCountUtil.release(message);
312+
313+
return Mono.empty();
314+
});
315+
}
316+
317+
private Message createMessage(AbstractInfoToken message) {
318+
319+
ErrorDetails errorDetails = ExceptionFactory.createErrorDetails(message);
320+
321+
return new Message() {
322+
323+
@Override
324+
public R2dbcException exception() {
325+
return ExceptionFactory.createException(message, MssqlResult.this.sql);
326+
}
327+
328+
@Override
329+
public int errorCode() {
330+
return (int) errorDetails.getNumber();
331+
}
332+
333+
@Override
334+
public String sqlState() {
335+
return errorDetails.getStateCode();
336+
}
337+
338+
@Override
339+
public String message() {
340+
return errorDetails.getMessage();
341+
}
342+
343+
@Override
344+
public Severity severity() {
345+
return message instanceof ErrorToken ? Severity.ERROR : Severity.INFO;
346+
}
347+
};
348+
}
349+
200350
}

src/main/java/io/r2dbc/mssql/MssqlRow.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import io.r2dbc.mssql.message.token.RowToken;
2424
import io.r2dbc.mssql.message.type.SqlServerType;
2525
import io.r2dbc.mssql.util.Assert;
26+
import io.r2dbc.spi.Result;
2627
import io.r2dbc.spi.Row;
28+
import io.r2dbc.spi.RowMetadata;
2729
import reactor.util.annotation.Nullable;
2830

2931
/**
@@ -35,7 +37,7 @@
3537
* @see #release()
3638
* @see ReferenceCounted
3739
*/
38-
final class MssqlRow implements Row {
40+
final class MssqlRow implements Row, Result.Data {
3941

4042
private static final int STATE_ACTIVE = 0;
4143

@@ -82,6 +84,11 @@ public MssqlRowMetadata getMetadata() {
8284
return this.metadata;
8385
}
8486

87+
@Override
88+
public RowMetadata metadata() {
89+
return this.metadata;
90+
}
91+
8592
@Override
8693
public <T> T get(int index, Class<T> type) {
8794

src/main/java/io/r2dbc/mssql/message/token/AbstractDoneToken.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.r2dbc.mssql.message.Message;
2121
import io.r2dbc.mssql.message.tds.Encode;
2222
import io.r2dbc.mssql.util.Assert;
23+
import io.r2dbc.spi.Result;
2324

2425
import java.util.Objects;
2526

@@ -28,7 +29,7 @@
2829
*
2930
* @author Mark Paluch
3031
*/
31-
public abstract class AbstractDoneToken extends AbstractDataToken {
32+
public abstract class AbstractDoneToken extends AbstractDataToken implements Result.UpdateCount {
3233

3334
/**
3435
* Packet length in bytes.
@@ -246,4 +247,9 @@ public String toString() {
246247
return sb.toString();
247248
}
248249

250+
@Override
251+
public long value() {
252+
return hasCount() ? getRowCount() : 0;
253+
}
254+
249255
}

0 commit comments

Comments
 (0)