Skip to content

Commit 303a519

Browse files
committed
Refactor decode pipeline to use Reactor operators.
ReactorNettyClient's inbound decoder now decodes items into Iterables and uses concatMapIterable(…) to correctly determine demand vs. the number of produced elements. Previously, the decoder used a onNext hook that propagated items into the result processor. That could lead to demand mismatch as the inbound channel could had emitted chunked buffers that couldn't be decoded into messages. That would lead to a mismatch between demand and emitted items because the emitter byte buffer decremented the demand by 1 while the receiving side didn't yield any results. Eventually, the publisher stopped emission because it had emitted buffers that couldn't be decoded while the pipeline didn't account for requesting additional buffers to fully decode the buffers. Thanks to @gvisokinskas who helped a lot to investigate on this issue. [resolves #216] Signed-off-by: Mark Paluch <mpaluch@vmware.com>
1 parent cfa6fd6 commit 303a519

File tree

3 files changed

+70
-31
lines changed

3 files changed

+70
-31
lines changed

src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import javax.annotation.Nullable;
6969
import java.security.GeneralSecurityException;
7070
import java.time.Duration;
71+
import java.util.Collections;
7172
import java.util.Optional;
7273
import java.util.Queue;
7374
import java.util.UUID;
@@ -279,24 +280,22 @@ public void next(Message message) {
279280
};
280281

281282
connection.inbound().receiveObject() //
282-
.doOnNext(it -> {
283+
.concatMapIterable(it -> {
283284

284285
if (it instanceof ByteBuf) {
285286

286287
ByteBuf buffer = (ByteBuf) it;
287-
decoder.decode(buffer, this.decodeFunction, sink);
288-
return;
288+
return decoder.decode(buffer, this.decodeFunction);
289289
}
290290

291291
if (it instanceof Message) {
292-
sink.next((Message) it);
293-
return;
292+
return Collections.singleton((Message) it);
294293
}
295294

296295
throw ProtocolException.unsupported(String.format("Unexpected protocol message: [%s]", it));
297296
})
298297
.onErrorResume(this::resumeError)
299-
.subscribe(new CoreSubscriber<Object>() {
298+
.subscribe(new CoreSubscriber<Message>() {
300299

301300
@Override
302301
public Context currentContext() {
@@ -309,7 +308,8 @@ public void onSubscribe(Subscription s) {
309308
}
310309

311310
@Override
312-
public void onNext(Object message) {
311+
public void onNext(Message message) {
312+
sink.next(message);
313313
}
314314

315315
@Override

src/main/java/io/r2dbc/mssql/client/StreamDecoder.java

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* <p/>
3535
* TDS messages consist of a header ({@link Header#LENGTH 8 byte length}) and a body. Messages can be either self-contained ({@link Status.StatusBit#EOM}) or chunked. This decoder attempts to
3636
* decode messages from a {@link ByteBuf stream} by emitting zero, one or many {@link Message}s. Data buffers are aggregated and de-chunked until reaching a message boundary, then adaptive decoding
37-
* attempts to decode the aggregated and de-chunked body as far as possible. Remaining (undecoded) data buffers are aggregated until the next attempt.
37+
* attempts to decode the aggregated and de-chunked body as far as possible. Remaining (non-decodable) data buffers are aggregated until the next attempt.
3838
* <p/>
3939
* This decoder is stateful and should be used in a try-to-decode fashion.
4040
*
@@ -59,30 +59,9 @@ public List<Message> decode(ByteBuf in, MessageDecoder messageDecoder) {
5959
Assert.requireNonNull(in, "in must not be null");
6060
Assert.requireNonNull(messageDecoder, "MessageDecoder must not be null");
6161

62-
List<Message> result = new ArrayList<>();
62+
ListSink<Message> result = new ListSink<>();
6363

64-
decode(in, messageDecoder, new SynchronousSink<Message>() {
65-
66-
@Override
67-
public void complete() {
68-
throw new UnsupportedOperationException();
69-
}
70-
71-
@Override
72-
public Context currentContext() {
73-
throw new UnsupportedOperationException();
74-
}
75-
76-
@Override
77-
public void error(Throwable e) {
78-
throw new RuntimeException(e);
79-
}
80-
81-
@Override
82-
public void next(Message message) {
83-
result.add(message);
84-
}
85-
});
64+
decode(in, messageDecoder, result);
8665

8766
return result;
8867
}
@@ -350,4 +329,32 @@ int getChunkLength() {
350329

351330
}
352331

332+
static class ListSink<T> extends ArrayList<T> implements SynchronousSink<T> {
333+
334+
public ListSink() {
335+
super(2);
336+
}
337+
338+
@Override
339+
public void complete() {
340+
throw new UnsupportedOperationException();
341+
}
342+
343+
@Override
344+
public Context currentContext() {
345+
throw new UnsupportedOperationException();
346+
}
347+
348+
@Override
349+
public void error(Throwable e) {
350+
throw new RuntimeException(e);
351+
}
352+
353+
@Override
354+
public void next(T message) {
355+
add(message);
356+
}
357+
358+
}
359+
353360
}

src/test/java/io/r2dbc/mssql/LobIntegrationTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828

2929
import java.nio.Buffer;
3030
import java.nio.ByteBuffer;
31+
import java.nio.CharBuffer;
3132
import java.util.Optional;
3233
import java.util.stream.Collectors;
34+
import java.util.stream.IntStream;
3335

3436
import static org.assertj.core.api.Assertions.assertThat;
3537

@@ -116,6 +118,36 @@ void testBigBlob() {
116118
.verifyComplete();
117119
}
118120

121+
@Test
122+
void testBigBlobs() {
123+
124+
int count = 512000;
125+
createTable(connection, "NVARCHAR(MAX)");
126+
127+
CharBuffer chars = CharBuffer.allocate(count);
128+
IntStream.range(0, count).forEach(i -> chars.put((char) ((i % 26) + 'a')));
129+
chars.flip();
130+
String data = chars.toString();
131+
132+
for (int i = 0; i < 30; i++) {
133+
Flux.from(connection.createStatement("INSERT INTO lob_test values(@P0)")
134+
.bind("P0", data)
135+
.execute())
136+
.flatMap(Result::getRowsUpdated)
137+
.as(StepVerifier::create)
138+
.expectNext(1)
139+
.verifyComplete();
140+
141+
}
142+
143+
connection.createStatement("SELECT my_col FROM lob_test")
144+
.execute()
145+
.concatMap(it -> it.map((row, rowMetadata) -> row.get("my_col")))
146+
.as(StepVerifier::create)
147+
.expectNextCount(30)
148+
.verifyComplete();
149+
}
150+
119151
@Test
120152
void testByteArrayBlob() {
121153

0 commit comments

Comments
 (0)