Skip to content

Commit 07cb34c

Browse files
authored
Rework the prepared statement cache and evict the statement from the … (#597)
* rework the prepared statement cache and evict the statement from the cache when it's closed - fixes #572 Signed-off-by: Billy Yuan <billy112487983@gmail.com> * prepared statement cache will only cache successful responses now Signed-off-by: Billy Yuan <billy112487983@gmail.com> * evict the invalid prepared statement when the postgres table is changed - fixes #619 Signed-off-by: Billy Yuan <billy112487983@gmail.com> * remove unnecessary lines Signed-off-by: Billy Yuan <billy112487983@gmail.com> * use sql string as the prepared statement cache entry removing identifier and some minors for naming Signed-off-by: Billy Yuan <billy112487983@gmail.com>
1 parent 28f6820 commit 07cb34c

File tree

13 files changed

+382
-115
lines changed

13 files changed

+382
-115
lines changed

vertx-pg-client/docker/postgres/resources/create-postgres.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,19 @@ CREATE TABLE mutable
347347
);
348348
-- mutable for insert,update,delete query testing
349349

350+
-- unstable table for table schema changing testing
351+
DROP TABLE IF EXISTS unstable;
352+
CREATE TABLE unstable
353+
(
354+
id integer NOT NULL,
355+
message varchar(2048) NOT NULL,
356+
PRIMARY KEY (id)
357+
);
358+
359+
INSERT INTO unstable (id, message)
360+
VALUES (1, 'fortune: No such file or directory');
361+
-- unstable table for table schema changing testing
362+
350363
-- table for test ANSI SQL data type codecs
351364
DROP TABLE IF EXISTS basicdatatype;
352365
CREATE TABLE basicdatatype

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedBatchQueryCommandCodec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class ExtendedBatchQueryCommandCodec<R> extends ExtendedQueryCommandBaseCodec<R,
3030

3131
@Override
3232
void encode(PgEncoder encoder) {
33+
super.encode(encoder);
3334
if (cmd.isSuspended()) {
3435
encoder.writeExecute(cmd.cursorId(), cmd.fetch());
3536
encoder.writeSync();

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedQueryCommandBaseCodec.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,26 @@
1616
*/
1717
package io.vertx.pgclient.impl.codec;
1818

19+
import io.vertx.sqlclient.impl.codec.InvalidCachedStatementEvent;
1920
import io.vertx.sqlclient.impl.RowDesc;
2021
import io.vertx.sqlclient.impl.command.ExtendedQueryCommandBase;
2122

2223
abstract class ExtendedQueryCommandBaseCodec<R, C extends ExtendedQueryCommandBase<R>> extends QueryCommandBaseCodec<R, C> {
2324

25+
private PgEncoder encoder;
26+
27+
private static final String TABLE_SCHEMA_CHANGE_ERROR_MESSAGE_PATTERN = "bind message has \\d result formats but query has \\d columns";
28+
2429
ExtendedQueryCommandBaseCodec(C cmd) {
2530
super(cmd);
2631
decoder = new RowResultDecoder<>(cmd.collector(), ((PgPreparedStatement)cmd.preparedStatement()).rowDesc());
2732
}
2833

34+
@Override
35+
void encode(PgEncoder encoder) {
36+
this.encoder = encoder;
37+
}
38+
2939
@Override
3040
void handleRowDescription(PgRowDesc rowDescription) {
3141
decoder = new RowResultDecoder<>(cmd.collector(), rowDescription);
@@ -51,4 +61,12 @@ void handlePortalSuspended() {
5161
void handleBindComplete() {
5262
// Response to Bind
5363
}
64+
65+
@Override
66+
public void handleErrorResponse(ErrorResponse errorResponse) {
67+
if (cmd.preparedStatement().cacheable() && errorResponse.getMessage().matches(TABLE_SCHEMA_CHANGE_ERROR_MESSAGE_PATTERN)) {
68+
encoder.channelHandlerContext().fireChannelRead(new InvalidCachedStatementEvent(cmd.preparedStatement().sql()));
69+
}
70+
super.handleErrorResponse(errorResponse);
71+
}
5472
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/ExtendedQueryCommandCodec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class ExtendedQueryCommandCodec<R> extends ExtendedQueryCommandBaseCodec<R, Exte
2828

2929
@Override
3030
void encode(PgEncoder encoder) {
31+
super.encode(encoder);
3132
if (cmd.isSuspended()) {
3233
encoder.writeExecute(cmd.cursorId(), cmd.fetch());
3334
encoder.writeSync();

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgEncoder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,4 +440,8 @@ private void ensureBuffer() {
440440
long nextStatementName() {
441441
return psSeq.next();
442442
}
443+
444+
public ChannelHandlerContext channelHandlerContext() {
445+
return ctx;
446+
}
443447
}

vertx-pg-client/src/test/java/io/vertx/pgclient/PreparedStatementCachedTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,45 @@
1717

1818
package io.vertx.pgclient;
1919

20+
import io.vertx.ext.unit.Async;
21+
import io.vertx.ext.unit.TestContext;
22+
import io.vertx.sqlclient.Tuple;
23+
import org.junit.Test;
24+
2025
public class PreparedStatementCachedTest extends PreparedStatementTestBase {
2126

2227
@Override
2328
protected PgConnectOptions options() {
2429
return new PgConnectOptions(options).setCachePreparedStatements(true);
2530
}
2631

32+
@Test
33+
public void testOneShotPreparedQueryCacheRefreshOnTableSchemaChange(TestContext ctx) {
34+
Async async = ctx.async();
35+
PgConnection.connect(vertx, options(), ctx.asyncAssertSuccess(conn -> {
36+
conn.preparedQuery("SELECT * FROM unstable WHERE id=$1").execute(Tuple.of(1), ctx.asyncAssertSuccess(res1 -> {
37+
ctx.assertEquals(1, res1.size());
38+
Tuple row1 = res1.iterator().next();
39+
ctx.assertEquals(1, row1.getInteger(0));
40+
ctx.assertEquals("fortune: No such file or directory", row1.getString(1));
41+
42+
// change table schema
43+
conn.query("ALTER TABLE unstable DROP COLUMN message").execute(ctx.asyncAssertSuccess(dropColumn -> {
44+
// failure due to schema change
45+
conn.preparedQuery("SELECT * FROM unstable WHERE id=$1").execute(Tuple.of(1), ctx.asyncAssertFailure(failure -> {
46+
// recover because the cache is refreshed
47+
conn.preparedQuery("SELECT * FROM unstable WHERE id=$1").execute(Tuple.of(1), ctx.asyncAssertSuccess(res2 -> {
48+
ctx.assertEquals(1, res2.size());
49+
Tuple row2 = res2.iterator().next();
50+
ctx.assertEquals(1, row2.getInteger(0));
51+
ctx.assertEquals(null, row2.getString(1)); // the message column is removed
52+
conn.close();
53+
async.complete();
54+
}));
55+
}));
56+
}));
57+
}));
58+
}));
59+
}
60+
2761
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementCache.java

Lines changed: 0 additions & 70 deletions
This file was deleted.

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SocketConnectionBase.java

Lines changed: 35 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@
3030
import io.vertx.core.net.impl.NetSocketInternal;
3131
import io.vertx.core.logging.Logger;
3232
import io.vertx.core.logging.LoggerFactory;
33+
import io.vertx.sqlclient.impl.cache.PreparedStatementCache;
34+
import io.vertx.sqlclient.impl.codec.InvalidCachedStatementEvent;
3335
import io.vertx.sqlclient.impl.command.*;
3436

3537
import java.util.ArrayDeque;
36-
import java.util.Deque;
3738

3839
/**
3940
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
@@ -68,7 +69,7 @@ public SocketConnectionBase(NetSocketInternal socket,
6869
this.socket = socket;
6970
this.context = context;
7071
this.pipeliningLimit = pipeliningLimit;
71-
this.psCache = cachePreparedStatements ? new PreparedStatementCache(preparedStatementCacheSize, this) : null;
72+
this.psCache = cachePreparedStatements ? new PreparedStatementCache(this, preparedStatementCacheSize) : null;
7273
this.preparedStatementCacheSqlLimit = preparedStatementCacheSqlLimit;
7374
}
7475

@@ -162,30 +163,33 @@ protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handle
162163

163164
// Special handling for cache
164165
PreparedStatementCache psCache = this.psCache;
165-
if (psCache != null && cmd instanceof PrepareStatementCommand) {
166-
PrepareStatementCommand psCmd = (PrepareStatementCommand) cmd;
167-
if (!psCmd.cacheable()) {
168-
// we don't cache non one-shot preparedQuery
169-
} else if (psCmd.sql().length() > preparedStatementCacheSqlLimit) {
170-
// do not cache the statements
171-
} else {
172-
// TODO fix the auto-closing logic
173-
CachedPreparedStatement cached = psCache.get(psCmd.sql());
174-
Handler<AsyncResult<PreparedStatement>> orig = (Handler) handler;
175-
if (cached != null) {
176-
psCmd.handler = orig;
177-
cached.get(psCmd::complete);
178-
return;
166+
if (psCache != null) {
167+
// cache is enabled
168+
if (cmd instanceof PrepareStatementCommand) {
169+
PrepareStatementCommand psCmd = (PrepareStatementCommand) cmd;
170+
if (!psCmd.cacheable()) {
171+
// we don't cache non one-shot preparedQuery
172+
} else if (psCmd.sql().length() > preparedStatementCacheSqlLimit) {
173+
// do not cache the statements if it exceeds the sql length limit
179174
} else {
180-
if (psCache.size() >= psCache.getCapacity() && !psCache.isReady()) {
181-
// only if the prepared statement is ready then it can be evicted
175+
Handler<AsyncResult<PreparedStatement>> originalHandler = (Handler) handler;
176+
Handler<AsyncResult<PreparedStatement>> newHandler = psCache.appendStmtReq(psCmd.sql(), originalHandler);
177+
if (newHandler == null) {
178+
// we don't need to schedule it if the result is cached or the request has been sent
179+
return;
182180
} else {
183-
cached = new CachedPreparedStatement();
184-
cached.get(orig);
185-
psCache.put(psCmd.sql(), cached);
186-
handler = (Handler) cached;
181+
handler = (Handler) newHandler;
187182
}
188183
}
184+
} else if (cmd instanceof CloseStatementCommand) {
185+
CloseStatementCommand closeStmtCommand = (CloseStatementCommand) cmd;
186+
/*
187+
* We need to know how we handle the close statement command, this cmd might origin from PreparedStatement#close
188+
* or it's automatically sent by the cache once the stmt is evicted, we should clean up the cache for those closing cached prepared statements.
189+
*/
190+
if (closeStmtCommand.statement().cacheable()) {
191+
psCache.remove(closeStmtCommand.statement().sql());
192+
}
189193
}
190194
}
191195

@@ -201,29 +205,6 @@ protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handle
201205
}
202206
}
203207

204-
static class CachedPreparedStatement implements Handler<AsyncResult<PreparedStatement>> {
205-
206-
private final Deque<Handler<AsyncResult<PreparedStatement>>> waiters = new ArrayDeque<>();
207-
AsyncResult<PreparedStatement> resp;
208-
209-
void get(Handler<AsyncResult<PreparedStatement>> handler) {
210-
if (resp != null) {
211-
handler.handle(resp);
212-
} else {
213-
waiters.add(handler);
214-
}
215-
}
216-
217-
@Override
218-
public void handle(AsyncResult<PreparedStatement> event) {
219-
resp = event;
220-
Handler<AsyncResult<PreparedStatement>> waiter;
221-
while ((waiter = waiters.poll()) != null) {
222-
waiter.handle(resp);
223-
}
224-
}
225-
}
226-
227208
private void checkPending() {
228209
ChannelHandlerContext ctx = socket.channelHandlerContext();
229210
if (inflight < pipeliningLimit) {
@@ -242,6 +223,9 @@ protected void handleMessage(Object msg) {
242223
checkPending();
243224
CommandResponse resp =(CommandResponse) msg;
244225
resp.fire();
226+
} else if (msg instanceof InvalidCachedStatementEvent) {
227+
InvalidCachedStatementEvent event = (InvalidCachedStatementEvent) msg;
228+
removeCachedStatement(event.sql());
245229
}
246230
}
247231

@@ -251,6 +235,12 @@ protected void handleEvent(Object event) {
251235
}
252236
}
253237

238+
private void removeCachedStatement(String sql) {
239+
if (this.psCache != null) {
240+
this.psCache.remove(sql);
241+
}
242+
}
243+
254244
private void handleClosed(Void v) {
255245
handleClose(null);
256246
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
12+
package io.vertx.sqlclient.impl.cache;
13+
14+
import io.vertx.core.AsyncResult;
15+
import io.vertx.core.Handler;
16+
import io.vertx.sqlclient.impl.PreparedStatement;
17+
18+
import java.util.ArrayDeque;
19+
import java.util.Deque;
20+
21+
class InflightCachingStmtEntry implements Handler<AsyncResult<PreparedStatement>> {
22+
private final Deque<Handler<AsyncResult<PreparedStatement>>> waiters = new ArrayDeque<>();
23+
private final String sql;
24+
private final PreparedStatementCache psCache;
25+
26+
InflightCachingStmtEntry(String sql, PreparedStatementCache psCache) {
27+
this.sql = sql;
28+
this.psCache = psCache;
29+
}
30+
31+
void addWaiter(Handler<AsyncResult<PreparedStatement>> handler) {
32+
waiters.add(handler);
33+
}
34+
35+
@Override
36+
public void handle(AsyncResult<PreparedStatement> preparedStatementResult) {
37+
if (preparedStatementResult.succeeded()) {
38+
// put it in the cache since the response is ready
39+
// we only need to cache successful responses here
40+
psCache.cache().put(sql, preparedStatementResult);
41+
}
42+
psCache.inflight().remove(sql);
43+
Handler<AsyncResult<PreparedStatement>> waiter;
44+
while ((waiter = waiters.poll()) != null) {
45+
waiter.handle(preparedStatementResult);
46+
}
47+
}
48+
}

0 commit comments

Comments
 (0)