Skip to content

Commit 60b1138

Browse files
committed
Propagate connection closed and connection errors to message exchange
We now fail a stream if the connection was closed in flight and if connection errors occurred. [resolves #177]
1 parent ebdf71d commit 60b1138

File tree

2 files changed

+179
-48
lines changed

2 files changed

+179
-48
lines changed

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

Lines changed: 104 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.channel.ChannelHandlerContext;
2323
import io.netty.channel.ChannelOption;
2424
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
25+
import io.netty.util.ReferenceCountUtil;
2526
import io.r2dbc.postgresql.message.backend.BackendKeyData;
2627
import io.r2dbc.postgresql.message.backend.BackendMessage;
2728
import io.r2dbc.postgresql.message.backend.BackendMessageDecoder;
@@ -34,6 +35,7 @@
3435
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
3536
import io.r2dbc.postgresql.message.frontend.Terminate;
3637
import io.r2dbc.postgresql.util.Assert;
38+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
3739
import org.reactivestreams.Publisher;
3840
import org.slf4j.Logger;
3941
import org.slf4j.LoggerFactory;
@@ -59,8 +61,10 @@
5961
import java.util.StringJoiner;
6062
import java.util.concurrent.atomic.AtomicBoolean;
6163
import java.util.concurrent.atomic.AtomicInteger;
64+
import java.util.concurrent.atomic.AtomicReference;
6265
import java.util.function.Consumer;
6366
import java.util.function.Function;
67+
import java.util.function.Supplier;
6468

6569
import static io.r2dbc.postgresql.client.TransactionStatus.IDLE;
6670

@@ -75,6 +79,10 @@ public final class ReactorNettyClient implements Client {
7579

7680
private static final boolean DEBUG_ENABLED = logger.isDebugEnabled();
7781

82+
private static final Supplier<PostgresConnectionClosedException> UNEXPECTED = () -> new PostgresConnectionClosedException("Connection unexpectedly closed");
83+
84+
private static final Supplier<PostgresConnectionClosedException> EXPECTED = () -> new PostgresConnectionClosedException("Connection closed");
85+
7886
private final ByteBufAllocator byteBufAllocator;
7987

8088
private final Connection connection;
@@ -107,26 +115,36 @@ private ReactorNettyClient(Connection connection) {
107115
Assert.requireNonNull(connection, "Connection must not be null");
108116

109117
connection.addHandler(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0));
110-
connection.addHandler(new EnsureSubscribersCompleteChannelHandler(this.requestProcessor, this.responseReceivers));
118+
connection.addHandler(new EnsureSubscribersCompleteChannelHandler(this.requestProcessor));
111119
this.connection = connection;
112120
this.byteBufAllocator = connection.outbound().alloc();
113121

122+
AtomicReference<Throwable> receiveError = new AtomicReference<>();
114123
Mono<Void> receive = connection.inbound().receive()
115124
.map(BackendMessageDecoder::decode)
116125
.handle(this::handleResponse)
126+
.doOnError(throwable -> {
127+
receiveError.set(throwable);
128+
handleConnectionError(throwable);
129+
})
117130
.windowWhile(it -> it.getClass() != ReadyForQuery.class)
118131
.doOnNext(fluxOfMessages -> {
119132
MonoSink<Flux<BackendMessage>> receiver = this.responseReceivers.poll();
120133
if (receiver != null) {
121-
receiver.success(fluxOfMessages);
122-
}
123-
})
124-
.doOnComplete(() -> {
125-
MonoSink<Flux<BackendMessage>> receiver = this.responseReceivers.poll();
126-
if (receiver != null) {
127-
receiver.success(Flux.empty());
134+
receiver.success(fluxOfMessages.doOnComplete(() -> {
135+
136+
Throwable throwable = receiveError.get();
137+
if (throwable != null) {
138+
throw new PostgresConnectionException(throwable);
139+
}
140+
141+
if (!isConnected()) {
142+
throw EXPECTED.get();
143+
}
144+
}));
128145
}
129146
})
147+
.doOnComplete(this::handleClose)
130148
.then();
131149

132150
Mono<Void> request = this.requestProcessor
@@ -139,35 +157,27 @@ private ReactorNettyClient(Connection connection) {
139157
.then();
140158

141159
receive
142-
.onErrorResume(throwable -> {
143-
144-
MonoSink<Flux<BackendMessage>> receiver = this.responseReceivers.poll();
145-
if (receiver != null) {
146-
receiver.error(throwable);
147-
}
148-
this.requestProcessor.onComplete();
149-
150-
if (isSslException(throwable)) {
151-
logger.debug("Connection Error", throwable);
152-
} else {
153-
154-
logger.error("Connection Error", throwable);
155-
}
156-
return close();
157-
})
160+
.onErrorResume(this::resumeError)
158161
.subscribe();
159162

160163
request
161-
.onErrorResume(throwable -> {
164+
.onErrorResume(this::resumeError)
165+
.doAfterTerminate(this::handleClose)
166+
.subscribe();
167+
}
162168

163-
if (isSslException(throwable)) {
164-
logger.debug("Connection Error", throwable);
165-
}
169+
private Mono<Void> resumeError(Throwable throwable) {
166170

167-
logger.error("Connection Error", throwable);
168-
return close();
169-
})
170-
.subscribe();
171+
handleConnectionError(throwable);
172+
this.requestProcessor.onComplete();
173+
174+
if (isSslException(throwable)) {
175+
logger.debug("Connection Error", throwable);
176+
} else {
177+
logger.error("Connection Error", throwable);
178+
}
179+
180+
return close();
171181
}
172182

173183
private static boolean isSslException(Throwable throwable) {
@@ -301,9 +311,10 @@ private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Conn
301311
public Mono<Void> close() {
302312
return Mono.defer(() -> {
303313

314+
drainError(EXPECTED);
304315
if (this.isClosed.compareAndSet(false, true)) {
305316

306-
if (!this.connection.channel().isOpen() || this.processId == null) {
317+
if (!isConnected() || this.processId == null) {
307318
this.connection.dispose();
308319
return this.connection.onDispose();
309320
}
@@ -326,24 +337,33 @@ public Flux<BackendMessage> exchange(Publisher<FrontendMessage> requests) {
326337

327338
return Mono
328339
.<Flux<BackendMessage>>create(sink -> {
329-
if (this.isClosed.get()) {
330-
sink.error(new IllegalStateException("Cannot exchange messages because the connection is closed"));
331-
}
332340

333341
final AtomicInteger once = new AtomicInteger();
334342

335343
Flux.from(requests)
336344
.subscribe(message -> {
345+
346+
if (!isConnected()) {
347+
ReferenceCountUtil.safeRelease(message);
348+
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
349+
return;
350+
}
351+
337352
if (once.get() == 0 && once.compareAndSet(0, 1)) {
338353
synchronized (this) {
339354
this.responseReceivers.add(sink);
340355
this.requests.next(message);
341356
}
342-
return;
357+
} else {
358+
this.requests.next(message);
343359
}
344360

345-
this.requests.next(message);
346-
}, this.requests::error);
361+
}, this.requests::error, () -> {
362+
363+
if (!isConnected()) {
364+
sink.error(new PostgresConnectionClosedException("Cannot exchange messages because the connection is closed"));
365+
}
366+
});
347367

348368
})
349369
.flatMapMany(Function.identity());
@@ -380,6 +400,10 @@ public boolean isConnected() {
380400
return false;
381401
}
382402

403+
if (this.requestProcessor.isDisposed()) {
404+
return false;
405+
}
406+
383407
Channel channel = this.connection.channel();
384408
return channel.isOpen();
385409
}
@@ -399,26 +423,59 @@ private static String toString(List<Field> fields) {
399423
return joiner.toString();
400424
}
401425

402-
private static final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {
426+
private void handleClose() {
427+
if (this.isClosed.compareAndSet(false, true)) {
428+
drainError(UNEXPECTED);
429+
} else {
430+
drainError(EXPECTED);
431+
}
432+
}
403433

404-
private final EmitterProcessor<FrontendMessage> requestProcessor;
434+
private void handleConnectionError(Throwable error) {
435+
drainError(() -> new PostgresConnectionException(error));
436+
}
405437

406-
private final Queue<MonoSink<Flux<BackendMessage>>> responseReceivers;
438+
private void drainError(Supplier<? extends Throwable> supplier) {
439+
MonoSink<Flux<BackendMessage>> receiver;
440+
441+
while ((receiver = this.responseReceivers.poll()) != null) {
442+
receiver.error(supplier.get());
443+
}
444+
}
407445

408-
private EnsureSubscribersCompleteChannelHandler(EmitterProcessor<FrontendMessage> requestProcessor, Queue<MonoSink<Flux<BackendMessage>>> responseReceivers) {
446+
private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {
447+
448+
private final EmitterProcessor<FrontendMessage> requestProcessor;
449+
450+
private EnsureSubscribersCompleteChannelHandler(EmitterProcessor<FrontendMessage> requestProcessor) {
409451
this.requestProcessor = requestProcessor;
410-
this.responseReceivers = responseReceivers;
452+
}
453+
454+
@Override
455+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
456+
super.channelInactive(ctx);
411457
}
412458

413459
@Override
414460
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
415461
super.channelUnregistered(ctx);
416462

417463
this.requestProcessor.onComplete();
464+
handleClose();
465+
}
466+
}
418467

419-
for (MonoSink<Flux<BackendMessage>> responseReceiver = this.responseReceivers.poll(); responseReceiver != null; responseReceiver = this.responseReceivers.poll()) {
420-
responseReceiver.success(Flux.empty());
421-
}
468+
static class PostgresConnectionClosedException extends R2dbcNonTransientResourceException {
469+
470+
public PostgresConnectionClosedException(String reason) {
471+
super(reason);
472+
}
473+
}
474+
475+
static class PostgresConnectionException extends R2dbcNonTransientResourceException {
476+
477+
public PostgresConnectionException(Throwable cause) {
478+
super(cause);
422479
}
423480
}
424481

src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientTest.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.r2dbc.postgresql.message.backend.DataRow;
2727
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2828
import io.r2dbc.postgresql.message.backend.RowDescription;
29+
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
2930
import io.r2dbc.postgresql.message.frontend.Query;
3031
import io.r2dbc.postgresql.util.PostgresqlServerExtension;
3132
import io.r2dbc.spi.R2dbcNonTransientResourceException;
@@ -36,28 +37,43 @@
3637
import org.junit.jupiter.api.Test;
3738
import org.junit.jupiter.api.TestInstance;
3839
import org.junit.jupiter.api.extension.RegisterExtension;
40+
import org.springframework.util.ReflectionUtils;
41+
import reactor.core.publisher.EmitterProcessor;
3942
import reactor.core.publisher.Flux;
4043
import reactor.core.publisher.Mono;
4144
import reactor.core.publisher.UnicastProcessor;
45+
import reactor.netty.Connection;
4246
import reactor.test.StepVerifier;
4347

4448
import javax.net.ssl.HostnameVerifier;
4549
import javax.net.ssl.SSLSession;
50+
import java.lang.reflect.Field;
4651
import java.time.Duration;
4752
import java.util.Arrays;
4853
import java.util.Collections;
54+
import java.util.List;
55+
import java.util.concurrent.CompletableFuture;
56+
import java.util.concurrent.ExecutionException;
57+
import java.util.concurrent.TimeUnit;
4958
import java.util.function.Consumer;
5059
import java.util.function.Function;
5160
import java.util.stream.IntStream;
5261

5362
import static org.assertj.core.api.Assertions.assertThat;
5463
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
64+
import static org.assertj.core.api.Assertions.fail;
5565

5666
final class ReactorNettyClientTest {
5767

5868
@RegisterExtension
5969
static final PostgresqlServerExtension SERVER = new PostgresqlServerExtension();
6070

71+
static final Field CONNECTION = ReflectionUtils.findField(ReactorNettyClient.class, "connection");
72+
73+
static {
74+
ReflectionUtils.makeAccessible(CONNECTION);
75+
}
76+
6177
private final ReactorNettyClient client = ReactorNettyClient.connect(SERVER.getHost(), SERVER.getPort())
6278
.delayUntil(client -> StartupMessageFlow
6379
.exchange(this.getClass().getName(), m -> new PasswordAuthenticationHandler(SERVER.getPassword(), SERVER.getUsername()), client, SERVER.getDatabase(), SERVER.getUsername(),
@@ -69,7 +85,65 @@ void close() {
6985
this.client.close()
7086
.thenMany(this.client.exchange(Mono.empty()))
7187
.as(StepVerifier::create)
72-
.verifyErrorSatisfies(t -> assertThat(t).isInstanceOf(IllegalStateException.class).hasMessage("Cannot exchange messages because the connection is closed"));
88+
.verifyErrorSatisfies(t -> assertThat(t).isInstanceOf(R2dbcNonTransientResourceException.class).hasMessage("Cannot exchange messages because the connection is closed"));
89+
}
90+
91+
@Test
92+
void disconnectedShouldRejectExchange() {
93+
94+
Connection connection = (Connection) ReflectionUtils.getField(CONNECTION, this.client);
95+
connection.channel().close().awaitUninterruptibly();
96+
97+
this.client.close()
98+
.thenMany(this.client.exchange(Mono.empty()))
99+
.as(StepVerifier::create)
100+
.verifyErrorSatisfies(t -> assertThat(t).isInstanceOf(R2dbcNonTransientResourceException.class).hasMessage("Cannot exchange messages because the connection is closed"));
101+
}
102+
103+
@Test
104+
void shouldCancelExchangeOnCloseFirstMessage() throws Exception {
105+
106+
Connection connection = (Connection) ReflectionUtils.getField(CONNECTION, this.client);
107+
108+
EmitterProcessor<FrontendMessage> messages = EmitterProcessor.create();
109+
Flux<BackendMessage> query = this.client.exchange(messages);
110+
CompletableFuture<List<BackendMessage>> future = query.collectList().toFuture();
111+
112+
connection.channel().eventLoop().execute(() -> {
113+
114+
connection.channel().close();
115+
messages.onNext(new Query("SELECT value FROM test"));
116+
});
117+
118+
try {
119+
future.get(5, TimeUnit.SECONDS);
120+
fail("Expected PostgresConnectionClosedException");
121+
} catch (ExecutionException e) {
122+
assertThat(e).hasCauseInstanceOf(ReactorNettyClient.PostgresConnectionClosedException.class).hasMessageContaining("Cannot exchange messages");
123+
}
124+
}
125+
126+
@Test
127+
void shouldCancelExchangeOnCloseInFlight() throws Exception {
128+
129+
Connection connection = (Connection) ReflectionUtils.getField(CONNECTION, this.client);
130+
131+
EmitterProcessor<FrontendMessage> messages = EmitterProcessor.create();
132+
Flux<BackendMessage> query = this.client.exchange(messages);
133+
CompletableFuture<List<BackendMessage>> future = query.doOnNext(ignore -> {
134+
connection.channel().close();
135+
messages.onNext(new Query("SELECT value FROM test"));
136+
137+
}).collectList().toFuture();
138+
139+
messages.onNext(new Query("SELECT value FROM test;SELECT value FROM test;"));
140+
141+
try {
142+
future.get(5, TimeUnit.SECONDS);
143+
fail("Expected PostgresConnectionClosedException");
144+
} catch (ExecutionException e) {
145+
assertThat(e).hasCauseInstanceOf(ReactorNettyClient.PostgresConnectionClosedException.class).hasMessageContaining("Connection closed");
146+
}
73147
}
74148

75149
@AfterEach

0 commit comments

Comments
 (0)