Skip to content

Commit d854b19

Browse files
committed
Move result error handling into PostgresqlResult
ErrorResponse are now handled inside the result instead of the outer level. Previously, errors were handled in the stream that created result objects. This approach could cause a result stream to be created and the error not being propagated to the stream but rather terminate the top-level stream leaving the sub-stream unhandled. [resolves #182]
1 parent c8f1ae7 commit d854b19

7 files changed

+141
-15
lines changed

src/main/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatement.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,8 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
185185
.flatMapMany(name -> ExtendedQueryMessageFlow
186186
.execute(Flux.fromIterable(this.bindings.bindings), this.client, this.portalNameSupplier, name, this.forceBinary))
187187
.filter(RESULT_FRAME_FILTER)
188-
.handle(factory::handleErrorResponse)
189188
.windowUntil(CloseComplete.class::isInstance)
190-
.map(messages -> PostgresqlResult.toResult(this.codecs, messages));
189+
.map(messages -> PostgresqlResult.toResult(this.codecs, messages, factory));
191190
}
192191

193192
private int getIndex(String identifier) {

src/main/java/io/r2dbc/postgresql/PostgresqlResult.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,23 @@ final class PostgresqlResult implements io.r2dbc.postgresql.api.PostgresqlResult
4646

4747
private final Flux<BackendMessage> messages;
4848

49+
private final ExceptionFactory factory;
50+
4951
private volatile PostgresqlRowMetadata metadata;
5052

5153
private volatile RowDescription rowDescription;
5254

53-
PostgresqlResult(Codecs codecs, Flux<BackendMessage> messages) {
55+
PostgresqlResult(Codecs codecs, Flux<BackendMessage> messages, ExceptionFactory factory) {
5456
this.codecs = Assert.requireNonNull(codecs, "codecs must not be null");
5557
this.messages = Assert.requireNonNull(messages, "messages must not be null");
58+
this.factory = Assert.requireNonNull(factory, "factory must not be null");
5659
}
5760

5861
@Override
5962
public Mono<Integer> getRowsUpdated() {
6063

6164
return this.messages
65+
.handle(this.factory::handleErrorResponse)
6266
.doOnNext(message -> {
6367
if (message instanceof DataRow) {
6468
((DataRow) message).release();
@@ -81,6 +85,7 @@ public <T> Flux<T> map(BiFunction<Row, RowMetadata, ? extends T> f) {
8185
Assert.requireNonNull(f, "f must not be null");
8286

8387
return this.messages.takeUntil(TAKE_UNTIL)
88+
.handle(this.factory::handleErrorResponse)
8489
.handle((message, sink) -> {
8590

8691
if (message instanceof RowDescription) {
@@ -109,11 +114,12 @@ public String toString() {
109114
'}';
110115
}
111116

112-
static PostgresqlResult toResult(Codecs codecs, Flux<BackendMessage> messages) {
117+
static PostgresqlResult toResult(Codecs codecs, Flux<BackendMessage> messages, ExceptionFactory factory) {
113118
Assert.requireNonNull(codecs, "codecs must not be null");
114119
Assert.requireNonNull(messages, "messages must not be null");
120+
Assert.requireNonNull(factory, "factory must not be null");
115121

116-
return new PostgresqlResult(codecs, messages);
122+
return new PostgresqlResult(codecs, messages, factory);
117123
}
118124

119125
}

src/main/java/io/r2dbc/postgresql/SimpleQueryPostgresqlStatement.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,8 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
125125
ExceptionFactory factory = ExceptionFactory.withSql(sql);
126126
return SimpleQueryMessageFlow
127127
.exchange(this.client, sql)
128-
.handle(factory::handleErrorResponse)
129128
.windowUntil(WINDOW_UNTIL)
130-
.map(dataRow -> PostgresqlResult.toResult(this.codecs, dataRow));
129+
.map(dataRow -> PostgresqlResult.toResult(this.codecs, dataRow, factory));
131130
}
132131

133132
}

src/test/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatementTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.r2dbc.postgresql.message.backend.CommandComplete;
2929
import io.r2dbc.postgresql.message.backend.ErrorResponse;
3030
import io.r2dbc.postgresql.message.backend.NoData;
31+
import io.r2dbc.postgresql.message.backend.RowDescription;
3132
import io.r2dbc.postgresql.message.frontend.Bind;
3233
import io.r2dbc.postgresql.message.frontend.Close;
3334
import io.r2dbc.postgresql.message.frontend.Describe;
@@ -42,6 +43,7 @@
4243

4344
import java.time.Duration;
4445
import java.util.Arrays;
46+
import java.util.Collection;
4547
import java.util.Collections;
4648
import java.util.LinkedList;
4749

@@ -213,6 +215,34 @@ void executeEmpty() {
213215
.withMessage("No parameters have been bound");
214216
}
215217

218+
@Test
219+
void executeErrorAfterBind() {
220+
Client client = TestClient.builder()
221+
.expectRequest(
222+
new Bind("B_0", Collections.singletonList(FORMAT_BINARY), Collections.singletonList(TEST.buffer(4).writeInt(100)), Collections.emptyList(), "test-name"),
223+
new Describe("B_0", ExecutionType.PORTAL),
224+
new Execute("B_0", 0),
225+
new Close("B_0", ExecutionType.PORTAL),
226+
Sync.INSTANCE)
227+
.thenRespond(BindComplete.INSTANCE, new RowDescription(Collections.emptyList()), new ErrorResponse(Collections.emptyList()))
228+
.build();
229+
230+
MockCodecs codecs = MockCodecs.builder()
231+
.encoding(100, new Parameter(FORMAT_BINARY, INT4.getObjectId(), Flux.just(TEST.buffer(4).writeInt(100))))
232+
.build();
233+
234+
PortalNameSupplier portalNameSupplier = new LinkedList<>(Arrays.asList("B_0", "B_1"))::remove;
235+
236+
when(this.statementCache.getName(any(), any())).thenReturn(Mono.just("test-name"));
237+
238+
new ExtendedQueryPostgresqlStatement(client, codecs, portalNameSupplier, "test-query-$1", this.statementCache, false)
239+
.bind("$1", 100)
240+
.execute()
241+
.flatMap(PostgresqlResult::getRowsUpdated)
242+
.as(StepVerifier::create)
243+
.verifyError(R2dbcNonTransientResourceException.class);
244+
}
245+
216246
@Test
217247
void executeErrorResponseRows() {
218248
Client client = TestClient.builder()
@@ -292,6 +322,7 @@ void executeErrorResponse() {
292322
new ExtendedQueryPostgresqlStatement(client, codecs, portalNameSupplier, "test-query-$1", this.statementCache, false)
293323
.bind("$1", 100)
294324
.execute()
325+
.flatMap(PostgresqlResult::getRowsUpdated)
295326
.as(StepVerifier::create)
296327
.verifyError(R2dbcNonTransientResourceException.class);
297328
}

src/test/java/io/r2dbc/postgresql/PostgresqlResultTest.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.r2dbc.postgresql.message.backend.RowDescription;
2424
import org.junit.jupiter.api.Test;
2525
import reactor.core.publisher.Flux;
26-
import reactor.core.publisher.Mono;
2726
import reactor.test.StepVerifier;
2827

2928
import java.util.Collections;
@@ -34,19 +33,19 @@ final class PostgresqlResultTest {
3433

3534
@Test
3635
void constructorNoCodec() {
37-
assertThatIllegalArgumentException().isThrownBy(() -> new PostgresqlResult(null, Flux.empty()))
36+
assertThatIllegalArgumentException().isThrownBy(() -> new PostgresqlResult(null, Flux.empty(), ExceptionFactory.INSTANCE))
3837
.withMessage("codecs must not be null");
3938
}
4039

4140
@Test
4241
void constructorNoRowMetadata() {
43-
assertThatIllegalArgumentException().isThrownBy(() -> new PostgresqlResult(MockCodecs.empty(), null))
42+
assertThatIllegalArgumentException().isThrownBy(() -> new PostgresqlResult(MockCodecs.empty(), null, ExceptionFactory.INSTANCE))
4443
.withMessage("messages must not be null");
4544
}
4645

4746
@Test
4847
void toResultCommandComplete() {
49-
PostgresqlResult result = PostgresqlResult.toResult(MockCodecs.empty(), Flux.just(new CommandComplete("test", null, 1)));
48+
PostgresqlResult result = PostgresqlResult.toResult(MockCodecs.empty(), Flux.just(new CommandComplete("test", null, 1)), ExceptionFactory.INSTANCE);
5049

5150
result.map((row, rowMetadata) -> row)
5251
.as(StepVerifier::create)
@@ -60,7 +59,7 @@ void toResultCommandComplete() {
6059

6160
@Test
6261
void toResultEmptyQueryResponse() {
63-
PostgresqlResult result = PostgresqlResult.toResult(MockCodecs.empty(), Flux.just(EmptyQueryResponse.INSTANCE));
62+
PostgresqlResult result = PostgresqlResult.toResult(MockCodecs.empty(), Flux.just(EmptyQueryResponse.INSTANCE), ExceptionFactory.INSTANCE);
6463

6564
result.map((row, rowMetadata) -> row)
6665
.as(StepVerifier::create)
@@ -73,20 +72,20 @@ void toResultEmptyQueryResponse() {
7372

7473
@Test
7574
void toResultNoCodecs() {
76-
assertThatIllegalArgumentException().isThrownBy(() -> PostgresqlResult.toResult(null, Flux.empty()))
75+
assertThatIllegalArgumentException().isThrownBy(() -> PostgresqlResult.toResult(null, Flux.empty(), ExceptionFactory.INSTANCE))
7776
.withMessage("codecs must not be null");
7877
}
7978

8079
@Test
8180
void toResultNoMessages() {
82-
assertThatIllegalArgumentException().isThrownBy(() -> PostgresqlResult.toResult(MockCodecs.empty(), null))
81+
assertThatIllegalArgumentException().isThrownBy(() -> PostgresqlResult.toResult(MockCodecs.empty(), null, ExceptionFactory.INSTANCE))
8382
.withMessage("messages must not be null");
8483
}
8584

8685
@Test
8786
void toResultRowDescription() {
8887
PostgresqlResult result = PostgresqlResult.toResult(MockCodecs.empty(), Flux.just(new RowDescription(Collections.emptyList()), new DataRow(), new CommandComplete
89-
("test", null, null)));
88+
("test", null, null)), ExceptionFactory.INSTANCE);
9089

9190
result.map((row, rowMetadata) -> row)
9291
.as(StepVerifier::create)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.util.PostgresqlServerExtension;
20+
import io.r2dbc.spi.Connection;
21+
import io.r2dbc.spi.ConnectionFactories;
22+
import io.r2dbc.spi.ConnectionFactory;
23+
import io.r2dbc.spi.ConnectionFactoryOptions;
24+
import io.r2dbc.spi.R2dbcDataIntegrityViolationException;
25+
import io.r2dbc.spi.Result;
26+
import org.junit.jupiter.api.AfterEach;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.extension.RegisterExtension;
30+
import reactor.core.publisher.Flux;
31+
import reactor.core.publisher.Mono;
32+
import reactor.test.StepVerifier;
33+
34+
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.POSTGRESQL_DRIVER;
35+
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
36+
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER;
37+
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
38+
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
39+
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
40+
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
41+
42+
final class PostgresqlStatementErrorsIntegrationTests {
43+
44+
@RegisterExtension
45+
static final PostgresqlServerExtension SERVER = new PostgresqlServerExtension();
46+
47+
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
48+
.option(DRIVER, POSTGRESQL_DRIVER)
49+
.option(DATABASE, SERVER.getDatabase())
50+
.option(HOST, SERVER.getHost())
51+
.option(PORT, SERVER.getPort())
52+
.option(PASSWORD, SERVER.getPassword())
53+
.option(USER, SERVER.getUsername())
54+
.build());
55+
56+
Connection connection;
57+
58+
@BeforeEach
59+
void setUp() {
60+
SERVER.getJdbcOperations().execute("DROP TABLE IF EXISTS test");
61+
connection = Mono.from(connectionFactory.create()).block();
62+
}
63+
64+
65+
@AfterEach
66+
void tearDown() {
67+
SERVER.getJdbcOperations().execute("DROP TABLE IF EXISTS test");
68+
Mono.from(connection.close()).block();
69+
}
70+
71+
@Test
72+
void shouldReportDataIntegrityViolationUsingSimpleFlow() {
73+
74+
SERVER.getJdbcOperations().execute("CREATE TABLE test (id SERIAL PRIMARY KEY)");
75+
76+
Flux<Integer> insert = Flux.from(connection.createStatement("INSERT INTO test (id) VALUES (1) RETURNING *").execute()).flatMap(Result::getRowsUpdated);
77+
78+
insert.thenMany(insert).as(StepVerifier::create).verifyError(R2dbcDataIntegrityViolationException.class);
79+
}
80+
81+
@Test
82+
void shouldReportDataIntegrityViolationUsingExtendedFlow() {
83+
84+
SERVER.getJdbcOperations().execute("CREATE TABLE test (id SERIAL PRIMARY KEY)");
85+
86+
Flux<Integer> insert = Flux.from(connection.createStatement("INSERT INTO test (id) VALUES ($1) RETURNING *").bind("$1", 1).execute()).flatMap(Result::getRowsUpdated);
87+
88+
insert.thenMany(insert).as(StepVerifier::create).verifyError(R2dbcDataIntegrityViolationException.class);
89+
}
90+
91+
}

src/test/java/io/r2dbc/postgresql/SimpleQueryPostgresqlStatementTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ void executeErrorResponse() {
174174

175175
new SimpleQueryPostgresqlStatement(client, MockCodecs.empty(), "test-query")
176176
.execute()
177+
.flatMap(PostgresqlResult::getRowsUpdated)
177178
.as(StepVerifier::create)
178179
.verifyError(R2dbcNonTransientResourceException.class);
179180
}

0 commit comments

Comments
 (0)