Skip to content

Commit cfa6fd6

Browse files
committed
Add support for statement timeout.
The driver now times out statements that didn't emit a response (time to first response) within the specified timeout. A statement that times out triggers an attention token to cancel the query on the server side. All cancellation and discard operators are now attached in the …Flow classes to reduce duplications. [resolves #213] Signed-off-by: Mark Paluch <mpaluch@vmware.com>
1 parent 280cf7f commit cfa6fd6

11 files changed

+145
-19
lines changed

src/main/java/io/r2dbc/mssql/ConnectionOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import io.r2dbc.mssql.codec.Codecs;
2020
import io.r2dbc.mssql.codec.DefaultCodecs;
21+
import reactor.util.annotation.Nullable;
2122

23+
import java.time.Duration;
2224
import java.util.function.Predicate;
2325

2426
/**
@@ -34,6 +36,8 @@ class ConnectionOptions {
3436

3537
private final boolean sendStringParametersAsUnicode;
3638

39+
private volatile Duration statementTimeout = Duration.ZERO;
40+
3741
ConnectionOptions() {
3842
this(sql -> false, new DefaultCodecs(), new IndefinitePreparedStatementCache(), true);
3943
}
@@ -61,6 +65,14 @@ public boolean isSendStringParametersAsUnicode() {
6165
return this.sendStringParametersAsUnicode;
6266
}
6367

68+
public Duration getStatementTimeout() {
69+
return this.statementTimeout;
70+
}
71+
72+
public void setStatementTimeout(@Nullable Duration statementTimeout) {
73+
this.statementTimeout = statementTimeout;
74+
}
75+
6476
@Override
6577
public String toString() {
6678
final StringBuffer sb = new StringBuffer();
@@ -69,6 +81,7 @@ public String toString() {
6981
sb.append(", codecs=").append(this.codecs);
7082
sb.append(", preparedStatementCache=").append(this.preparedStatementCache);
7183
sb.append(", sendStringParametersAsUnicode=").append(this.sendStringParametersAsUnicode);
84+
sb.append(", statementTimeout=").append(this.statementTimeout);
7285
sb.append(']');
7386
return sb.toString();
7487
}

src/main/java/io/r2dbc/mssql/ExceptionFactory.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,24 @@ public ErrorDetails getErrorDetails() {
320320

321321
}
322322

323+
/**
324+
* SQL Server-specific {@link R2dbcTimeoutException}.
325+
*/
326+
static final class MssqlStatementTimeoutException extends R2dbcTimeoutException {
327+
328+
private final String sql;
329+
330+
public MssqlStatementTimeoutException(String reason, String sql) {
331+
super(reason);
332+
this.sql = sql;
333+
}
334+
335+
public String getSql() {
336+
return this.sql;
337+
}
338+
339+
}
340+
323341
/**
324342
* SQL Server-specific {@link R2dbcTransientException}.
325343
*/

src/main/java/io/r2dbc/mssql/MssqlConnection.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616

1717
package io.r2dbc.mssql;
1818

19-
import io.netty.util.ReferenceCountUtil;
20-
import io.netty.util.ReferenceCounted;
2119
import io.r2dbc.mssql.api.MssqlTransactionDefinition;
2220
import io.r2dbc.mssql.client.Client;
2321
import io.r2dbc.mssql.client.ConnectionContext;
2422
import io.r2dbc.mssql.client.TransactionStatus;
2523
import io.r2dbc.mssql.util.Assert;
26-
import io.r2dbc.mssql.util.Operators;
2724
import io.r2dbc.spi.Connection;
2825
import io.r2dbc.spi.IsolationLevel;
2926
import io.r2dbc.spi.Option;
@@ -339,7 +336,10 @@ public Mono<Void> setLockWaitTimeout(Duration timeout) {
339336

340337
@Override
341338
public Mono<Void> setStatementTimeout(Duration timeout) {
342-
throw new UnsupportedOperationException("https://github.com/r2dbc/r2dbc-mssql/issues/213");
339+
340+
Assert.requireNonNull(timeout, "Timeout must not be null");
341+
342+
return Mono.fromRunnable(() -> this.connectionOptions.setStatementTimeout(timeout));
343343
}
344344

345345
@Override
@@ -411,8 +411,6 @@ private Mono<Void> exchange(String sql) {
411411

412412
ExceptionFactory factory = ExceptionFactory.withSql(sql);
413413
return QueryMessageFlow.exchange(this.client, sql)
414-
.transform(Operators::discardOnCancel)
415-
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)
416414
.handle(factory::handleErrorResponse)
417415
.then();
418416
}

src/main/java/io/r2dbc/mssql/MssqlStatementSupport.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,17 @@
1616

1717
package io.r2dbc.mssql;
1818

19+
import io.r2dbc.mssql.client.Client;
20+
import io.r2dbc.mssql.message.Message;
1921
import io.r2dbc.mssql.util.Assert;
22+
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.Mono;
24+
import reactor.core.scheduler.Schedulers;
2025
import reactor.util.annotation.Nullable;
2126

2227
import java.sql.Statement;
28+
import java.time.Duration;
29+
import java.util.concurrent.TimeoutException;
2330

2431
/**
2532
* Base class for {@link Statement} implementations.
@@ -85,4 +92,18 @@ public MssqlStatementSupport fetchSize(int fetchSize) {
8592
return this;
8693
}
8794

95+
Flux<Message> potentiallyAttachTimeout(Flux<Message> exchange, ConnectionOptions connectionOptions, Client client, String sql) {
96+
97+
Duration statementTimeout = connectionOptions.getStatementTimeout();
98+
99+
if (statementTimeout.isZero()) {
100+
return exchange;
101+
}
102+
103+
Mono<Long> timeout = Mono.delay(statementTimeout, Schedulers.parallel()).onErrorReturn(0L);
104+
return exchange.timeout(timeout).onErrorResume(TimeoutException.class, e -> client.attention().then(Mono.error(new ExceptionFactory.MssqlStatementTimeoutException(String.format("Statement " +
105+
"did not yield a result within %dms", statementTimeout.toMillis()),
106+
sql))));
107+
}
108+
88109
}

src/main/java/io/r2dbc/mssql/ParametrizedMssqlStatement.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package io.r2dbc.mssql;
1818

19-
import io.netty.util.ReferenceCountUtil;
20-
import io.netty.util.ReferenceCounted;
2119
import io.r2dbc.mssql.client.Client;
2220
import io.r2dbc.mssql.client.ConnectionContext;
2321
import io.r2dbc.mssql.codec.Codecs;
@@ -28,7 +26,6 @@
2826
import io.r2dbc.mssql.message.token.AbstractDoneToken;
2927
import io.r2dbc.mssql.message.token.DoneInProcToken;
3028
import io.r2dbc.mssql.util.Assert;
31-
import io.r2dbc.mssql.util.Operators;
3229
import io.r2dbc.spi.Clob;
3330
import io.r2dbc.spi.Parameter;
3431
import io.r2dbc.spi.Statement;
@@ -74,6 +71,8 @@ final class ParametrizedMssqlStatement extends MssqlStatementSupport implements
7471

7572
private final Client client;
7673

74+
private final ConnectionOptions connectionOptions;
75+
7776
private final ConnectionContext context;
7877

7978
private final Codecs codecs;
@@ -89,6 +88,7 @@ final class ParametrizedMssqlStatement extends MssqlStatementSupport implements
8988
ParametrizedMssqlStatement(Client client, ConnectionOptions connectionOptions, String sql) {
9089

9190
super(connectionOptions.prefersCursors(sql));
91+
this.connectionOptions = connectionOptions;
9292

9393
Assert.requireNonNull(client, "Client must not be null");
9494
Assert.requireNonNull(connectionOptions, "ConnectionOptions must not be null");
@@ -125,7 +125,7 @@ public Flux<MssqlResult> execute() {
125125

126126
if (this.bindings.bindings.isEmpty()) {
127127

128-
Flux<Message> exchange = QueryMessageFlow.exchange(this.client, sql).transform(Operators::discardOnCancel).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
128+
Flux<Message> exchange = potentiallyAttachTimeout(QueryMessageFlow.exchange(this.client, sql), this.connectionOptions, this.client, sql);
129129
return exchange.windowUntil(AbstractDoneToken.class::isInstance).map(it -> DefaultMssqlResult.toResult(this.parsedQuery.getSql(), this.context, this.codecs, it, false));
130130
}
131131

@@ -186,7 +186,7 @@ private Flux<Message> exchange(int effectiveFetchSize, boolean useGeneratedKeysC
186186
if (useGeneratedKeysClause) {
187187
exchange = exchange.transform(GeneratedValues::reduceToSingleCountDoneToken);
188188
}
189-
return exchange;
189+
return potentiallyAttachTimeout(exchange, this.connectionOptions, this.client, sql);
190190
}
191191

192192
private void clearBindings(Iterator<Binding> iterator) {

src/main/java/io/r2dbc/mssql/QueryMessageFlow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616

1717
package io.r2dbc.mssql;
1818

19+
import io.netty.util.ReferenceCountUtil;
20+
import io.netty.util.ReferenceCounted;
1921
import io.r2dbc.mssql.client.Client;
2022
import io.r2dbc.mssql.message.Message;
2123
import io.r2dbc.mssql.message.TransactionDescriptor;
2224
import io.r2dbc.mssql.message.token.DoneToken;
2325
import io.r2dbc.mssql.message.token.SqlBatch;
2426
import io.r2dbc.mssql.util.Assert;
27+
import io.r2dbc.mssql.util.Operators;
2528
import reactor.core.publisher.Flux;
2629
import reactor.core.publisher.Mono;
2730
import reactor.core.publisher.SynchronousSink;
@@ -52,7 +55,8 @@ static Flux<Message> exchange(Client client, String query) {
5255

5356
return client.exchange(Mono.fromSupplier(() -> SqlBatch.create(1, client.getTransactionDescriptor(), query)), DoneToken::isDone)
5457
.doOnSubscribe(ignore -> QueryLogger.logQuery(client.getContext(), query))
55-
.handle(DoneHandler.INSTANCE);
58+
.handle(DoneHandler.INSTANCE)
59+
.transform(Operators::discardOnCancel).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
5660
}
5761

5862
enum DoneHandler implements BiConsumer<Message, SynchronousSink<Message>> {

src/main/java/io/r2dbc/mssql/SimpleMssqlStatement.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package io.r2dbc.mssql;
1818

19-
import io.netty.util.ReferenceCountUtil;
20-
import io.netty.util.ReferenceCounted;
2119
import io.r2dbc.mssql.client.Client;
2220
import io.r2dbc.mssql.client.ConnectionContext;
2321
import io.r2dbc.mssql.codec.Codecs;
@@ -26,7 +24,6 @@
2624
import io.r2dbc.mssql.message.token.DoneInProcToken;
2725
import io.r2dbc.mssql.message.token.SqlBatch;
2826
import io.r2dbc.mssql.util.Assert;
29-
import io.r2dbc.mssql.util.Operators;
3027
import org.reactivestreams.Publisher;
3128
import reactor.core.publisher.Flux;
3229
import reactor.util.Logger;
@@ -48,6 +45,8 @@ final class SimpleMssqlStatement extends MssqlStatementSupport implements MssqlS
4845

4946
private final Codecs codecs;
5047

48+
private final ConnectionOptions connectionOptions;
49+
5150
private final ConnectionContext context;
5251

5352
private final String sql;
@@ -63,6 +62,7 @@ final class SimpleMssqlStatement extends MssqlStatementSupport implements MssqlS
6362
SimpleMssqlStatement(Client client, ConnectionOptions connectionOptions, String sql) {
6463

6564
super(connectionOptions.prefersCursors(sql) || prefersCursors(sql));
65+
this.connectionOptions = connectionOptions;
6666

6767
Assert.requireNonNull(client, "Client must not be null");
6868
Assert.requireNonNull(connectionOptions, "ConnectionOptions must not be null");
@@ -123,7 +123,7 @@ public Flux<MssqlResult> execute() {
123123
logger.debug(this.context.getMessage("Start cursored exchange for {} with fetch size {}"), sql, effectiveFetchSize);
124124
}
125125

126-
exchange = RpcQueryMessageFlow.exchange(this.client, this.codecs, this.sql, effectiveFetchSize);
126+
exchange = potentiallyAttachTimeout(RpcQueryMessageFlow.exchange(this.client, this.codecs, this.sql, effectiveFetchSize), this.connectionOptions, this.client, this.sql);
127127

128128
return createResultStream(useGeneratedKeysClause, exchange, DoneInProcToken.class::isInstance);
129129
} else {
@@ -132,7 +132,7 @@ public Flux<MssqlResult> execute() {
132132
logger.debug(this.context.getMessage("Start direct exchange for {}"), sql);
133133
}
134134

135-
exchange = QueryMessageFlow.exchange(this.client, sql).transform(Operators::discardOnCancel).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
135+
exchange = potentiallyAttachTimeout(QueryMessageFlow.exchange(this.client, sql), this.connectionOptions, this.client, this.sql);
136136

137137
return createResultStream(useGeneratedKeysClause, exchange, AbstractDoneToken.class::isInstance);
138138
}

src/test/java/io/r2dbc/mssql/MssqlConnectionIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ void shouldReusePreparedStatements() {
297297
}
298298

299299
@Test
300-
void shouldApplyLockWaitTimeout() {
300+
void shouldApplyLockWaitwaot() {
301301

302302
ConnectionFactoryOptions options = builder().option(LOCK_WAIT_TIMEOUT, Duration.ofMillis(100)).build();
303303

src/test/java/io/r2dbc/mssql/ParametrizedMssqlStatementIntegrationTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
package io.r2dbc.mssql;
1818

1919
import io.r2dbc.mssql.util.IntegrationTestSupport;
20+
import io.r2dbc.spi.R2dbcTimeoutException;
2021
import io.r2dbc.spi.Result;
2122
import org.junit.jupiter.api.Test;
2223
import reactor.core.publisher.Flux;
2324
import reactor.core.publisher.Hooks;
2425
import reactor.core.publisher.Mono;
2526
import reactor.test.StepVerifier;
2627

28+
import java.time.Duration;
2729
import java.util.Optional;
2830
import java.util.concurrent.atomic.AtomicBoolean;
2931
import java.util.concurrent.atomic.AtomicInteger;
@@ -221,4 +223,22 @@ void shouldRunStatementWithMultipleBindingsAndResults() {
221223
assertThat(rowCount).hasValue(6);
222224
}
223225

226+
@Test
227+
void shouldTimeoutSqlBatch() {
228+
229+
connection.setStatementTimeout(Duration.ofMillis(100)).as(StepVerifier::create).verifyComplete();
230+
231+
connection.createStatement("WAITFOR DELAY @P0").fetchSize(0).bind("P0", "10:00").execute().flatMap(Result::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcTimeoutException.class);
232+
connection.createStatement("SELECT 1").execute().flatMap(it -> it.map(row -> row.get(0))).as(StepVerifier::create).expectNext(1).verifyComplete();
233+
}
234+
235+
@Test
236+
void shouldTimeoutCursored() {
237+
238+
connection.setStatementTimeout(Duration.ofMillis(100)).as(StepVerifier::create).verifyComplete();
239+
240+
connection.createStatement("WAITFOR DELAY @P0").fetchSize(1).bind("P0", "10:00").execute().flatMap(Result::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcTimeoutException.class);
241+
connection.createStatement("SELECT 1").execute().flatMap(it -> it.map(row -> row.get(0))).as(StepVerifier::create).expectNext(1).verifyComplete();
242+
}
243+
224244
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2018-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.mssql;
18+
19+
import io.r2dbc.mssql.util.IntegrationTestSupport;
20+
import io.r2dbc.spi.R2dbcTimeoutException;
21+
import io.r2dbc.spi.Result;
22+
import org.junit.jupiter.api.Test;
23+
import reactor.test.StepVerifier;
24+
25+
import java.time.Duration;
26+
27+
/**
28+
* Integration tests for {@link SimpleMssqlStatement}.
29+
*
30+
* @author Mark Paluch
31+
*/
32+
class SimpleMssqlStatementIntegrationTests extends IntegrationTestSupport {
33+
34+
@Test
35+
void shouldTimeoutSqlBatch() {
36+
37+
connection.setStatementTimeout(Duration.ofMillis(100)).as(StepVerifier::create).verifyComplete();
38+
39+
connection.createStatement("WAITFOR DELAY '10:00'").fetchSize(0).execute().flatMap(Result::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcTimeoutException.class);
40+
connection.createStatement("SELECT 1").execute().flatMap(it -> it.map(row -> row.get(0))).as(StepVerifier::create).expectNext(1).verifyComplete();
41+
}
42+
43+
@Test
44+
void shouldTimeoutCursored() {
45+
46+
connection.setStatementTimeout(Duration.ofMillis(100)).as(StepVerifier::create).verifyComplete();
47+
48+
connection.createStatement("WAITFOR DELAY '10:00'").fetchSize(100).execute().flatMap(Result::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcTimeoutException.class);
49+
connection.createStatement("SELECT 1").execute().flatMap(it -> it.map(row -> row.get(0))).as(StepVerifier::create).expectNext(1).verifyComplete();
50+
}
51+
52+
}

0 commit comments

Comments
 (0)