Skip to content

Commit 603e323

Browse files
committed
Add support for level 7 proxies (PgBouncer with pool_mode=transaction) in Postgres client.
1 parent 32939cb commit 603e323

File tree

11 files changed

+224
-7
lines changed

11 files changed

+224
-7
lines changed

vertx-pg-client/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
<version>${jmh.version}</version>
115115
<scope>test</scope>
116116
</dependency>
117+
117118
</dependencies>
118119

119120
<build>

vertx-pg-client/src/main/asciidoc/index.adoc

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ The client is reactive and non-blocking, allowing to handle many database connec
2020
* SSL/TLS
2121
* Unix domain socket
2222
* HTTP/1.x CONNECT, SOCKS4a or SOCKS5 proxy support
23+
* Proxy (level 4 and 7) support
2324
2425
== Usage
2526

@@ -564,12 +565,32 @@ All https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION[
564565

565566
More information can be found in the http://vertx.io/docs/vertx-core/java/#ssl[Vert.x documentation].
566567

567-
== Using a proxy
568+
== Using a level 4 proxy
568569

569-
You can also configure the client to use an HTTP/1.x CONNECT, SOCKS4a or SOCKS5 proxy.
570+
You can configure the client to use an HTTP/1.x CONNECT, SOCKS4a or SOCKS5 level 4 proxy.
570571

571572
More information can be found in the http://vertx.io/docs/vertx-core/java/#_using_a_proxy_for_client_connections[Vert.x documentation].
572573

574+
== Using a level 7 proxy
575+
576+
Level 7 proxies can load balance queries on several connections to the actual database. When it happens, the client can be confused by the lack of session affinity and unwanted errors can happen like _ERROR: unnamed prepared statement does not exist (26000)_.
577+
578+
Supported proxies:
579+
580+
- https://www.pgbouncer.org[PgBouncer] configured with `_pool_mode=transaction`.
581+
582+
You can configure the client to interact differently to the proxy:
583+
584+
[source,$lang]
585+
----
586+
{@link examples.PgClientExamples#pgBouncer}
587+
----
588+
589+
When doing so, prepared statement cannot be cached and therefore
590+
591+
- prepared statement caching must be disabled
592+
- explicit prepared statement can only live within the scope of a transaction, it means you can use cursors but the prepared statement for the cursor must be created and destroyed within the scope of a transaction
593+
573594
== Advanced pool configuration
574595

575596
include::pool_config.adoc[]

vertx-pg-client/src/main/generated/io/vertx/pgclient/PgConnectOptionsConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
3030
obj.setSslMode(io.vertx.pgclient.SslMode.valueOf((String)member.getValue()));
3131
}
3232
break;
33+
case "useLayer7Proxy":
34+
if (member.getValue() instanceof Boolean) {
35+
obj.setUseLayer7Proxy((Boolean)member.getValue());
36+
}
37+
break;
3338
}
3439
}
3540
}
@@ -43,5 +48,6 @@ public static void toJson(PgConnectOptions obj, java.util.Map<String, Object> js
4348
if (obj.getSslMode() != null) {
4449
json.put("sslMode", obj.getSslMode().name());
4550
}
51+
json.put("useLayer7Proxy", obj.getUseLayer7Proxy());
4652
}
4753
}

vertx-pg-client/src/main/java/examples/PgClientExamples.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,4 +732,8 @@ public void batchReturning(SqlClient client) {
732732
}
733733
});
734734
}
735+
736+
public void pgBouncer(PgConnectOptions connectOptions) {
737+
connectOptions.setUseLayer7Proxy(true);
738+
}
735739
}

vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnectOptions.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package io.vertx.pgclient;
1919

2020
import io.vertx.codegen.annotations.GenIgnore;
21+
import io.vertx.codegen.annotations.Unstable;
2122
import io.vertx.core.tracing.TracingPolicy;
2223
import io.vertx.pgclient.impl.PgConnectionUriParser;
2324
import io.vertx.codegen.annotations.DataObject;
@@ -111,6 +112,7 @@ public static PgConnectOptions fromEnv() {
111112
public static final String DEFAULT_PASSWORD = "pass";
112113
public static final int DEFAULT_PIPELINING_LIMIT = 256;
113114
public static final SslMode DEFAULT_SSLMODE = SslMode.DISABLE;
115+
public static final boolean DEFAULT_USE_LAYER_7_PROXY = false;
114116
public static final Map<String, String> DEFAULT_PROPERTIES;
115117

116118
static {
@@ -124,6 +126,7 @@ public static PgConnectOptions fromEnv() {
124126

125127
private int pipeliningLimit = DEFAULT_PIPELINING_LIMIT;
126128
private SslMode sslMode = DEFAULT_SSLMODE;
129+
private boolean useLayer7Proxy = DEFAULT_USE_LAYER_7_PROXY;
127130

128131
public PgConnectOptions() {
129132
super();
@@ -235,6 +238,27 @@ public PgConnectOptions setSslMode(SslMode sslmode) {
235238
return this;
236239
}
237240

241+
/**
242+
* @return whether the client interacts with a layer 7 proxy instead of a server
243+
*/
244+
@Unstable
245+
public boolean getUseLayer7Proxy() {
246+
return useLayer7Proxy;
247+
}
248+
249+
/**
250+
* Set the client to use a layer 7 (application) proxy compatible protocol, set to {@code true} when the client
251+
* interacts with a layer 7 proxy like PgBouncer instead of a server. Prepared statement caching must be disabled.
252+
*
253+
* @param useLayer7Proxy whether to use a layer 7 proxy instead of a server
254+
* @return a reference to this, so the API can be used fluently
255+
*/
256+
@Unstable
257+
public PgConnectOptions setUseLayer7Proxy(boolean useLayer7Proxy) {
258+
this.useLayer7Proxy = useLayer7Proxy;
259+
return this;
260+
}
261+
238262
@Override
239263
public PgConnectOptions setSendBufferSize(int sendBufferSize) {
240264
return (PgConnectOptions)super.setSendBufferSize(sendBufferSize);

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,21 @@ public class PgConnectionFactory extends ConnectionFactoryBase {
4545

4646
private SslMode sslMode;
4747
private int pipeliningLimit;
48+
private boolean useLayer7Proxy;
4849

4950
public PgConnectionFactory(VertxInternal context, PgConnectOptions options) {
5051
super(context, options);
52+
if (options.getCachePreparedStatements() && options.getUseLayer7Proxy()) {
53+
throw new IllegalArgumentException("Prepared statement caching must be disabled when using proxy mode");
54+
}
5155
}
5256

5357
@Override
5458
protected void initializeConfiguration(SqlConnectOptions connectOptions) {
5559
PgConnectOptions options = (PgConnectOptions) connectOptions;
5660
this.pipeliningLimit = options.getPipeliningLimit();
5761
this.sslMode = options.isUsingDomainSocket() ? SslMode.DISABLE : options.getSslMode();
62+
this.useLayer7Proxy = options.getUseLayer7Proxy();
5863

5964
// check ssl mode here
6065
switch (sslMode) {
@@ -162,6 +167,6 @@ public Future<SqlConnection> connect(Context context) {
162167
}
163168

164169
private PgSocketConnection newSocketConnection(EventLoopContext context, NetSocketInternal socket) {
165-
return new PgSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
170+
return new PgSocketConnection(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, useLayer7Proxy, context);
166171
}
167172
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
public class PgSocketConnection extends SocketConnectionBase {
5151

5252
private PgCodec codec;
53+
private final boolean useLayer7Proxy;
5354
public int processId;
5455
public int secretKey;
5556
public PgDatabaseMetadata dbMetaData;
@@ -59,13 +60,15 @@ public PgSocketConnection(NetSocketInternal socket,
5960
int preparedStatementCacheSize,
6061
Predicate<String> preparedStatementCacheSqlFilter,
6162
int pipeliningLimit,
63+
boolean useLayer7Proxy,
6264
EventLoopContext context) {
6365
super(socket, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
66+
this.useLayer7Proxy = useLayer7Proxy;
6467
}
6568

6669
@Override
6770
public void init() {
68-
codec = new PgCodec();
71+
codec = new PgCodec(useLayer7Proxy);
6972
ChannelPipeline pipeline = socket.channelHandlerContext().pipeline();
7073
pipeline.addBefore("handler", "codec", codec);
7174
super.init();

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedQueryCommandCodec.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,18 @@ void encode(PgEncoder encoder) {
4848
completionHandler.handle(CommandResponse.failure("Can not execute batch query with 0 sets of batch parameters."));
4949
return;
5050
} else {
51+
if (encoder.useLayer7Proxy) {
52+
encoder.writeParse(ps.sql, ps.bind.statement, new DataType[0]);
53+
}
5154
for (TupleInternal param : cmd.paramsList()) {
5255
encoder.writeBind(ps.bind, cmd.cursorId(), param);
5356
encoder.writeExecute(cmd.cursorId(), cmd.fetch());
5457
}
5558
}
5659
} else {
60+
if (encoder.useLayer7Proxy && ps.bind.statement.length == 1) {
61+
encoder.writeParse(ps.sql, ps.bind.statement, new DataType[0]);
62+
}
5763
encoder.writeBind(ps.bind, cmd.cursorId(), cmd.params());
5864
encoder.writeExecute(cmd.cursorId(), cmd.fetch());
5965
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgCodec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ public class PgCodec extends CombinedChannelDuplexHandler<PgDecoder, PgEncoder>
2929

3030
private final ArrayDeque<PgCommandCodec<?, ?>> inflight = new ArrayDeque<>();
3131

32-
public PgCodec() {
32+
public PgCodec(boolean useLayer7Proxy) {
3333
PgDecoder decoder = new PgDecoder(inflight);
34-
PgEncoder encoder = new PgEncoder(inflight);
34+
PgEncoder encoder = new PgEncoder(useLayer7Proxy, inflight);
3535
init(decoder, encoder);
3636
}
3737

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@ final class PgEncoder extends ChannelOutboundHandlerAdapter {
6262
private static final byte SYNC = 'S';
6363

6464
private final ArrayDeque<PgCommandCodec<?, ?>> inflight;
65+
final boolean useLayer7Proxy;
6566
private ChannelHandlerContext ctx;
6667
private ByteBuf out;
6768
private final HexSequence psSeq = new HexSequence(); // used for generating named prepared statement name
6869
boolean closeSent;
6970

70-
PgEncoder(ArrayDeque<PgCommandCodec<?, ?>> inflight) {
71+
PgEncoder(boolean useLayer7Proxy, ArrayDeque<PgCommandCodec<?, ?>> inflight) {
72+
this.useLayer7Proxy = useLayer7Proxy;
7173
this.inflight = inflight;
7274
}
7375

0 commit comments

Comments
 (0)