Skip to content

Commit 1ae91e9

Browse files
committed
Request more items when dropping tokens.
Signed-off-by: Mark Paluch <mpaluch@vmware.com>
1 parent ee3f2e1 commit 1ae91e9

File tree

1 file changed

+13
-0
lines changed

1 file changed

+13
-0
lines changed

src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.concurrent.atomic.AtomicBoolean;
7676
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
7777
import java.util.concurrent.atomic.AtomicLong;
78+
import java.util.concurrent.atomic.AtomicReference;
7879
import java.util.function.Consumer;
7980
import java.util.function.Function;
8081
import java.util.function.Predicate;
@@ -210,6 +211,7 @@ private ReactorNettyClient(Connection connection, TdsEncoder tdsEncoder, Connect
210211
}
211212
};
212213

214+
AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
213215
SynchronousSink<Message> sink = new SynchronousSink<Message>() {
214216

215217
@Override
@@ -250,6 +252,7 @@ public void next(Message message) {
250252
ReactorNettyClient.this.featureAckChange.accept((FeatureExtAckToken) message);
251253
}
252254

255+
Subscription subscription = subscriptionRef.get();
253256
if (AbstractDoneToken.isAttentionAck(message)) {
254257

255258
long current;
@@ -260,6 +263,11 @@ public void next(Message message) {
260263
if (DEBUG_ENABLED) {
261264
logger.debug(ReactorNettyClient.this.context.getMessage("Swallowing attention acknowledged, no pending requests: {}. "), message);
262265
}
266+
267+
// update demand for dropped next signal
268+
if (subscription != null) {
269+
subscription.request(1);
270+
}
263271
return;
264272
}
265273

@@ -272,6 +280,10 @@ public void next(Message message) {
272280
if (DEBUG_ENABLED) {
273281
logger.debug(ReactorNettyClient.this.context.getMessage("Discard message {}. Draining frames until attention acknowledgement."), message);
274282
}
283+
// update demand for dropped next signal
284+
if (subscription != null) {
285+
subscription.request(1);
286+
}
275287
return;
276288
}
277289

@@ -304,6 +316,7 @@ public Context currentContext() {
304316

305317
@Override
306318
public void onSubscribe(Subscription s) {
319+
subscriptionRef.set(s);
307320
ReactorNettyClient.this.responseProcessor.onSubscribe(s);
308321
}
309322

0 commit comments

Comments
 (0)