Skip to content

Commit e531268

Browse files
committed
Revert "Remove callbase usage in implementation"
This reverts commit 53b09ae.
1 parent d34a274 commit e531268

File tree

8 files changed

+23
-25
lines changed

8 files changed

+23
-25
lines changed

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLSocketConnection.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import io.netty.channel.ChannelPipeline;
2121
import io.vertx.core.AsyncResult;
22-
import io.vertx.core.Future;
2322
import io.vertx.core.Handler;
2423
import io.vertx.core.Promise;
2524
import io.vertx.core.buffer.Buffer;
@@ -100,8 +99,8 @@ protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handle
10099
}
101100
}
102101

103-
public Future<Void> upgradeToSsl() {
104-
return socket.upgradeToSsl();
102+
public void upgradeToSsl(Handler<AsyncResult<Void>> completionHandler) {
103+
socket.upgradeToSsl(completionHandler);
105104
}
106105

107106
@Override

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitialHandshakeCommandCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private void handleInitialHandshake(ByteBuf payload) {
143143
encoder.clientCapabilitiesFlag |= CLIENT_SSL;
144144
sendSslRequest();
145145

146-
encoder.socketConnection.upgradeToSsl().onComplete(upgrade -> {
146+
encoder.socketConnection.upgradeToSsl(upgrade -> {
147147
if (upgrade.succeeded()) {
148148
doSendHandshakeResponseMessage(serverAuthPluginName, cmd.authenticationPlugin(), authPluginData, serverCapabilitiesFlags);
149149
} else {

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/InitiateSslHandler.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5959
byteBuf.release();
6060
switch (b) {
6161
case PgProtocolConstants.MESSAGE_TYPE_SSL_YES: {
62-
conn
63-
.socket()
64-
.upgradeToSsl()
65-
.onComplete(ar -> {
66-
if (ar.succeeded()) {
67-
ctx.pipeline().remove(this);
68-
upgradePromise.complete();
69-
} else {
62+
Handler handler = o -> {
63+
if (o instanceof AsyncResult) {
64+
AsyncResult res = (AsyncResult) o;
65+
if (res.failed()) {
7066
// Connection close will fail the promise
67+
return;
7168
}
72-
});
69+
}
70+
ctx.pipeline().remove(this);
71+
upgradePromise.complete();
72+
};
73+
conn.socket().upgradeToSsl(handler);
7374
break;
7475
}
7576
case PgProtocolConstants.MESSAGE_TYPE_SSL_NO: {

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ void sendCancelRequestMessage(int processId, int secretKey, Handler<AsyncResult<
8585
buffer.appendInt(processId);
8686
buffer.appendInt(secretKey);
8787

88-
socket.write(buffer).onComplete(ar -> {
88+
socket.write(buffer, ar -> {
8989
if (ar.succeeded()) {
9090
// directly close this connection
9191
if (status == Status.CONNECTED) {

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private void tryConnect(long delayMillis, Handler<AsyncResult<Void>> handler) {
168168
}
169169

170170
private void doConnect(Handler<AsyncResult<Void>> completionHandler) {
171-
PgConnection.connect(vertx, options).onComplete(ar -> handleConnectResult(completionHandler, ar));
171+
PgConnection.connect(vertx, options, ar -> handleConnectResult(completionHandler, ar));
172172
}
173173

174174
private synchronized void handleConnectResult(Handler<AsyncResult<Void>> completionHandler, AsyncResult<PgConnection> ar1) {
@@ -191,7 +191,7 @@ private synchronized void handleConnectResult(Handler<AsyncResult<Void>> complet
191191
return channel.quotedName;
192192
})
193193
.collect(Collectors.joining(";LISTEN ", "LISTEN ", ""));
194-
conn.query(sql).execute().onComplete(ar2 -> {
194+
conn.query(sql).execute(ar2 -> {
195195
if (ar2.failed()) {
196196
log.error("Cannot LISTEN to channels", ar2.cause());
197197
conn.close();
@@ -224,7 +224,7 @@ void add(ChannelImpl sub) {
224224
if (conn != null) {
225225
subscribed = true;
226226
String sql = "LISTEN " + quotedName;
227-
conn.query(sql).execute().onComplete(ar -> {
227+
conn.query(sql).execute(ar -> {
228228
if (ar.succeeded()) {
229229
Handler<Void> handler = sub.subscribeHandler;
230230
if (handler != null) {
@@ -243,7 +243,7 @@ void remove(ChannelImpl sub) {
243243
if (subs.isEmpty()) {
244244
channels.remove(name, this);
245245
if (conn != null) {
246-
conn.query("UNLISTEN " + quotedName).execute().onComplete(ar -> {
246+
conn.query("UNLISTEN " + quotedName).execute(ar -> {
247247
if (ar.failed()) {
248248
log.error("Cannot UNLISTEN channel " + name, ar.cause());
249249
}

vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ public void execute(I parameters, Handler<AsyncResult<R>> handler) {
7070

7171
queryMapper
7272
.apply(client.preparedQuery(sqlTemplate.getSql()))
73-
.execute(tupleMapper.apply(parameters))
74-
.onComplete(handler);
73+
.execute(tupleMapper.apply(parameters), handler);
7574
}
7675

7776
@Override
@@ -87,8 +86,7 @@ public void executeBatch(List<I> batch, Handler<AsyncResult<R>> handler) {
8786
.executeBatch(batch
8887
.stream()
8988
.map(tupleMapper)
90-
.collect(Collectors.toList()))
91-
.onComplete(handler);
89+
.collect(Collectors.toList()), handler);
9290
}
9391

9492
@Override

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public Future<SqlConnection> getConnection() {
175175
.compose(v -> context.failedFuture(err), failure -> context.failedFuture(err));
176176
}
177177
}))
178-
.onComplete(ar -> conn.close().onComplete(v -> context.removeLocal(PROPAGATABLE_CONNECTION))));
178+
.onComplete(ar -> conn.close(v -> context.removeLocal(PROPAGATABLE_CONNECTION))));
179179
}
180180

181181
@Override

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public RowStream<Row> handler(Handler<Row> handler) {
9090
return this;
9191
}
9292
}
93-
c.read(fetch).onComplete(this);
93+
c.read(fetch, this);
9494
return this;
9595
}
9696

@@ -206,7 +206,7 @@ private void checkPending() {
206206
break;
207207
} else if (cursor.hasMore()) {
208208
readInProgress = true;
209-
cursor.read(fetch).onComplete(this);
209+
cursor.read(fetch, this);
210210
break;
211211
} else {
212212
cursor.close();

0 commit comments

Comments
 (0)