Skip to content

Commit 280cf7f

Browse files
committed
Add support for Attention token (cancelling running queries).
Client.attention() now is able to cancel ongoing queries by sending an attention token. In attention sent state, incoming frames are discarded until receiving the attention acknowledgement token. Any ongoing query is finishes with an error signal MssqlStatementCancelled. [closes #215] Signed-off-by: Mark Paluch <mpaluch@vmware.com>
1 parent 47f0b0c commit 280cf7f

13 files changed

+344
-12
lines changed

src/main/java/io/r2dbc/mssql/DefaultMssqlResult.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ public Mono<Integer> getRowsUpdated() {
115115

116116
sink.next(doneToken.getRowCount());
117117
}
118+
119+
if (doneToken.isAttentionAck()) {
120+
sink.error(new ExceptionFactory.MssqlStatementCancelled());
121+
return;
122+
}
118123
}
119124

120125
if (message instanceof ErrorToken) {
@@ -191,6 +196,15 @@ private <T> Flux<T> doMap(boolean rows, boolean outparameters, Function<? super
191196
Flux<T> mapped = messages
192197
.handle((message, sink) -> {
193198

199+
if (message instanceof AbstractDoneToken) {
200+
201+
AbstractDoneToken doneToken = (AbstractDoneToken) message;
202+
if (doneToken.isAttentionAck()) {
203+
sink.error(new ExceptionFactory.MssqlStatementCancelled());
204+
return;
205+
}
206+
}
207+
194208
if (message.getClass() == ColumnMetadataToken.class) {
195209

196210
ColumnMetadataToken token = (ColumnMetadataToken) message;

src/main/java/io/r2dbc/mssql/ExceptionFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,17 @@ public ErrorDetails getErrorDetails() {
339339

340340
}
341341

342+
/**
343+
* SQL Server-specific {@link R2dbcTransientException} upon statement cancellation due to an attention acknowledgement.
344+
*/
345+
static final class MssqlStatementCancelled extends R2dbcTransientException {
346+
347+
public MssqlStatementCancelled() {
348+
super("Statement cancelled");
349+
}
350+
351+
}
352+
342353
/**
343354
* SQL Server-specific {@link R2dbcTransientResourceException}.
344355
*/

src/main/java/io/r2dbc/mssql/MssqlConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,15 @@ public Mono<Void> beginTransaction(TransactionDefinition transactionDefinition)
149149
});
150150
}
151151

152+
/**
153+
* Cancel an ongoing request.
154+
*
155+
* @return
156+
*/
157+
Mono<Void> cancel() {
158+
return this.client.attention();
159+
}
160+
152161
@Override
153162
public Mono<Void> close() {
154163
return this.client.close();

src/main/java/io/r2dbc/mssql/MssqlSegmentResult.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,15 @@ private static Flux<Segment> toSegments(String sql, Codecs codecs, Flux<io.r2dbc
128128
AtomicReference<MssqlRowMetadata> metadataRef = new AtomicReference<>();
129129
Flux<Segment> segments = messageStream.handle((message, sink) -> {
130130

131+
if (message instanceof AbstractDoneToken) {
132+
133+
AbstractDoneToken doneToken = (AbstractDoneToken) message;
134+
if (doneToken.isAttentionAck()) {
135+
sink.error(new ExceptionFactory.MssqlStatementCancelled());
136+
return;
137+
}
138+
}
139+
131140
if (message.getClass() == ColumnMetadataToken.class) {
132141

133142
ColumnMetadataToken token = (ColumnMetadataToken) message;

src/main/java/io/r2dbc/mssql/ParametrizedMssqlStatement.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@
4848
import java.util.regex.Matcher;
4949
import java.util.regex.Pattern;
5050

51-
import static io.r2dbc.mssql.util.PredicateUtils.or;
52-
5351
/**
5452
* Parametrized {@link Statement} with parameter markers executed against a Microsoft SQL Server database.
5553
* <p>
@@ -136,7 +134,7 @@ public Flux<MssqlResult> execute() {
136134
Binding binding = this.bindings.bindings.get(0);
137135
Flux<Message> exchange = exchange(effectiveFetchSize, useGeneratedKeysClause, sql, binding);
138136

139-
return exchange.windowUntil(or(DoneInProcToken.class::isInstance)).map(it -> DefaultMssqlResult.toResult(this.parsedQuery.getSql(), this.context, this.codecs, it,
137+
return exchange.windowUntil(DoneInProcToken.class::isInstance).map(it -> DefaultMssqlResult.toResult(this.parsedQuery.getSql(), this.context, this.codecs, it,
140138
binding.hasOutParameters()));
141139
}
142140

@@ -151,7 +149,7 @@ public Flux<MssqlResult> execute() {
151149

152150
return exchange.doOnComplete(() -> {
153151
tryNextBinding(iterator, sink, cancelled);
154-
}).windowUntil(or(DoneInProcToken.class::isInstance)).map(it -> DefaultMssqlResult.toResult(this.parsedQuery.getSql(), this.context, this.codecs, it, binding.hasOutParameters()));
152+
}).windowUntil(DoneInProcToken.class::isInstance).map(it -> DefaultMssqlResult.toResult(this.parsedQuery.getSql(), this.context, this.codecs, it, binding.hasOutParameters()));
155153
}).doOnSubscribe(it -> {
156154

157155
Binding initial = iterator.next();

src/main/java/io/r2dbc/mssql/RpcQueryMessageFlow.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ final class RpcQueryMessageFlow {
7070
DoneInProcToken.class::isInstance,
7171
IntermediateCount.class::isInstance,
7272
AbstractInfoToken.class::isInstance,
73-
Completion.class::isInstance);
73+
Completion.class::isInstance,
74+
AbstractDoneToken::isAttentionAck);
7475

7576
private static final Logger logger = Loggers.getLogger(RpcQueryMessageFlow.class);
7677

@@ -348,6 +349,14 @@ private static void handleMessage(Client client, int fetchSize, Sinks.Many<Clien
348349
return;
349350
}
350351

352+
if (AbstractDoneToken.isAttentionAck(message)) {
353+
354+
state.phase = Phase.CLOSED;
355+
sink.next(message);
356+
onCursorComplete.run();
357+
return;
358+
}
359+
351360
if (!(message instanceof DoneProcToken)) {
352361

353362
if (emit) {

src/main/java/io/r2dbc/mssql/client/Client.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@
3838
*/
3939
public interface Client {
4040

41+
/**
42+
* Send an attention token to interrupt an active statement.
43+
*
44+
* @return
45+
* @since 0.9
46+
*/
47+
Mono<Void> attention();
48+
4149
/**
4250
* Release any resources held by the {@link Client}.
4351
*

src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import io.r2dbc.mssql.message.header.PacketIdProvider;
3737
import io.r2dbc.mssql.message.tds.ProtocolException;
3838
import io.r2dbc.mssql.message.tds.Redirect;
39+
import io.r2dbc.mssql.message.token.AbstractDoneToken;
3940
import io.r2dbc.mssql.message.token.AbstractInfoToken;
41+
import io.r2dbc.mssql.message.token.Attention;
4042
import io.r2dbc.mssql.message.token.EnvChangeToken;
4143
import io.r2dbc.mssql.message.token.FeatureExtAckToken;
4244
import io.r2dbc.mssql.message.token.LoginAckToken;
@@ -54,6 +56,7 @@
5456
import reactor.core.publisher.MonoSink;
5557
import reactor.core.publisher.SynchronousSink;
5658
import reactor.netty.Connection;
59+
import reactor.netty.NettyOutbound;
5760
import reactor.netty.resources.ConnectionProvider;
5861
import reactor.netty.tcp.SslProvider;
5962
import reactor.netty.tcp.TcpClient;
@@ -70,6 +73,7 @@
7073
import java.util.UUID;
7174
import java.util.concurrent.atomic.AtomicBoolean;
7275
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
76+
import java.util.concurrent.atomic.AtomicLong;
7377
import java.util.function.Consumer;
7478
import java.util.function.Function;
7579
import java.util.function.Predicate;
@@ -116,6 +120,10 @@ public final class ReactorNettyClient implements Client {
116120

117121
private final AtomicBoolean isClosed = new AtomicBoolean(false);
118122

123+
private final AtomicLong attentionPropagation = new AtomicLong();
124+
125+
private final AtomicLong outstandingRequests = new AtomicLong();
126+
119127
private final EmitterProcessor<ClientMessage> requestProcessor = EmitterProcessor.create(false);
120128

121129
private final FluxSink<ClientMessage> requests = this.requestProcessor.sink();
@@ -241,6 +249,31 @@ public void next(Message message) {
241249
ReactorNettyClient.this.featureAckChange.accept((FeatureExtAckToken) message);
242250
}
243251

252+
if (AbstractDoneToken.isAttentionAck(message)) {
253+
254+
long current;
255+
do {
256+
current = ReactorNettyClient.this.attentionPropagation.get();
257+
258+
if (current == 0) {
259+
if (DEBUG_ENABLED) {
260+
logger.debug(ReactorNettyClient.this.context.getMessage("Swallowing attention acknowledged, no pending requests: {}. "), message);
261+
}
262+
return;
263+
}
264+
265+
} while (!ReactorNettyClient.this.attentionPropagation.compareAndSet(current, current - 1));
266+
}
267+
268+
long attentionPropagation = ReactorNettyClient.this.attentionPropagation.get();
269+
270+
if (attentionPropagation > 0 && !AbstractDoneToken.isAttentionAck(message)) {
271+
if (DEBUG_ENABLED) {
272+
logger.debug(ReactorNettyClient.this.context.getMessage("Discard message {}. Draining frames until attention acknowledgement."), message);
273+
}
274+
return;
275+
}
276+
244277
ReactorNettyClient.this.responseProcessor.onNext(message);
245278
}
246279
};
@@ -294,11 +327,7 @@ public void onComplete() {
294327
.concatMap(
295328
message -> {
296329

297-
if (DEBUG_ENABLED) {
298-
logger.debug(this.context.getMessage("Request: {}"), message);
299-
}
300-
301-
Object encoded = message.encode(connection.outbound().alloc(), this.tdsEncoder.getPacketSize());
330+
Object encoded = encodeForSend(message);
302331

303332
if (encoded instanceof Publisher) {
304333
return connection.outbound().sendObject((Publisher) encoded);
@@ -311,6 +340,15 @@ public void onComplete() {
311340
.subscribe();
312341
}
313342

343+
private Object encodeForSend(ClientMessage message) {
344+
345+
if (DEBUG_ENABLED) {
346+
logger.debug(this.context.getMessage("Request: {}"), message);
347+
}
348+
349+
return message.encode(this.connection.outbound().alloc(), this.tdsEncoder.getPacketSize());
350+
}
351+
314352
@SuppressWarnings("unchecked")
315353
private <T> Mono<T> resumeError(Throwable throwable) {
316354

@@ -469,6 +507,11 @@ private static SslHandler createSslTunnelHandler(ByteBufAllocator allocator, Ssl
469507
return new SslHandler(tunnel.getSslProvider().getSslContext().newEngine(allocator));
470508
}
471509

510+
@Override
511+
public Mono<Void> attention() {
512+
return Mono.defer(() -> Mono.fromFuture(send(Mono.just(Attention.create(1, getTransactionDescriptor()))).toFuture()));
513+
}
514+
472515
@Override
473516
public Mono<Void> close() {
474517

@@ -566,6 +609,7 @@ public Flux<Message> exchange(Publisher<? extends ClientMessage> requests, Predi
566609

567610
Flux<Message> requestMessages = this.responseProcessor
568611
.doOnSubscribe(s -> {
612+
this.outstandingRequests.incrementAndGet();
569613
Flux.from(requests).subscribe(t -> {
570614

571615
if (!isConnected()) {
@@ -598,14 +642,26 @@ public Flux<Message> exchange(Publisher<? extends ClientMessage> requests, Predi
598642
}
599643
});
600644

601-
return handle.doAfterTerminate(this.requestQueue).doOnCancel(() -> {
645+
return handle.doAfterTerminate(this.requestQueue).doFinally(it -> this.outstandingRequests.decrementAndGet()).doOnCancel(() -> {
602646

603647
if (!exchangeRequest.isComplete()) {
604648
logger.error("Exchange cancelled while exchange is active. This is likely a bug leading to unpredictable outcome.");
605649
}
606650
});
607651
}
608652

653+
private Mono<Void> send(Publisher<? extends ClientMessage> requests) {
654+
return Flux.from(requests).concatMap(message -> {
655+
NettyOutbound nettyOutbound = this.connection.outbound().sendObject(encodeForSend(message));
656+
657+
if (message instanceof Attention && this.outstandingRequests.longValue() != 0) {
658+
return Mono.from(nettyOutbound).doOnSuccess(v -> this.attentionPropagation.incrementAndGet());
659+
}
660+
661+
return nettyOutbound;
662+
}).then();
663+
}
664+
609665
private void handleClose() {
610666
if (this.isClosed.compareAndSet(false, true)) {
611667
logger.warn(ReactorNettyClient.this.context.getMessage("Connection has been closed by peer"));

src/main/java/io/r2dbc/mssql/message/token/AbstractDoneToken.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public abstract class AbstractDoneToken extends AbstractDataToken implements Res
6565
/**
6666
* The DONE message is a server acknowledgement of a client ATTENTION message.
6767
*/
68-
static final int DONE_ATTN = 0x10;
68+
static final int DONE_ATTN = 0x20;
6969

7070
/**
7171
* This DONEPROC message is associated with an RPC within a set of batched RPCs. This flag is not set on the last RPC in the RPC batch.
@@ -108,6 +108,22 @@ protected AbstractDoneToken(byte type, int status, int currentCommand, long rowC
108108
this.rowCount = rowCount;
109109
}
110110

111+
/**
112+
* Check whether the {@link Message} represents a attention acknowledgement.
113+
*
114+
* @param message the message to inspect.
115+
* @return {@literal true} if the {@link Message} represents a attention acknowledgement.
116+
* @since 0.9
117+
*/
118+
public static boolean isAttentionAck(Message message) {
119+
120+
if (message instanceof AbstractDoneToken) {
121+
return ((AbstractDoneToken) message).isAttentionAck();
122+
}
123+
124+
return false;
125+
}
126+
111127
/**
112128
* Check whether the {@link Message} represents a finished done token.
113129
*
@@ -173,6 +189,13 @@ public int getStatus() {
173189
return this.status;
174190
}
175191

192+
/**
193+
* @return {@code true} if this token indicates the response is acknowledging the attention request.
194+
*/
195+
public boolean isAttentionAck() {
196+
return (getStatus() & DONE_ATTN) != 0;
197+
}
198+
176199
/**
177200
* @return {@code true} if this token indicates the response is done and has no more rows.
178201
*/
@@ -242,6 +265,7 @@ public String toString() {
242265
sb.append(", hasCount=").append(hasCount());
243266
sb.append(", rowCount=").append(getRowCount());
244267
sb.append(", hasMore=").append(hasMore());
268+
sb.append(", attnAck=").append(isAttentionAck());
245269
sb.append(", currentCommand=").append(getCurrentCommand());
246270
sb.append(']');
247271
return sb.toString();

0 commit comments

Comments
 (0)