Skip to content

Commit 98a435d

Browse files
committed
Remove callbase usage in implementation
1 parent 0934c80 commit 98a435d

File tree

8 files changed

+25
-23
lines changed

8 files changed

+25
-23
lines changed

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLSocketConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.netty.channel.ChannelPipeline;
2121
import io.vertx.core.AsyncResult;
22+
import io.vertx.core.Future;
2223
import io.vertx.core.Handler;
2324
import io.vertx.core.Promise;
2425
import io.vertx.core.buffer.Buffer;
@@ -99,8 +100,8 @@ protected <R> void doSchedule(CommandBase<R> cmd, Handler<AsyncResult<R>> handle
99100
}
100101
}
101102

102-
public void upgradeToSsl(Handler<AsyncResult<Void>> completionHandler) {
103-
socket.upgradeToSsl(completionHandler);
103+
public Future<Void> upgradeToSsl() {
104+
return socket.upgradeToSsl();
104105
}
105106

106107
@Override

vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/InitialHandshakeCommandCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private void handleInitialHandshake(ByteBuf payload) {
143143
encoder.clientCapabilitiesFlag |= CLIENT_SSL;
144144
sendSslRequest();
145145

146-
encoder.socketConnection.upgradeToSsl(upgrade -> {
146+
encoder.socketConnection.upgradeToSsl().onComplete(upgrade -> {
147147
if (upgrade.succeeded()) {
148148
doSendHandshakeResponseMessage(serverAuthPluginName, cmd.authenticationPlugin(), authPluginData, serverCapabilitiesFlags);
149149
} else {

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/InitiateSslHandler.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5959
byteBuf.release();
6060
switch (b) {
6161
case PgProtocolConstants.MESSAGE_TYPE_SSL_YES: {
62-
Handler handler = o -> {
63-
if (o instanceof AsyncResult) {
64-
AsyncResult res = (AsyncResult) o;
65-
if (res.failed()) {
62+
conn
63+
.socket()
64+
.upgradeToSsl()
65+
.onComplete(ar -> {
66+
if (ar.succeeded()) {
67+
ctx.pipeline().remove(this);
68+
upgradePromise.complete();
69+
} else {
6670
// Connection close will fail the promise
67-
return;
6871
}
69-
}
70-
ctx.pipeline().remove(this);
71-
upgradePromise.complete();
72-
};
73-
conn.socket().upgradeToSsl(handler);
72+
});
7473
break;
7574
}
7675
case PgProtocolConstants.MESSAGE_TYPE_SSL_NO: {

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ void sendCancelRequestMessage(int processId, int secretKey, Handler<AsyncResult<
8585
buffer.appendInt(processId);
8686
buffer.appendInt(secretKey);
8787

88-
socket.write(buffer, ar -> {
88+
socket.write(buffer).onComplete(ar -> {
8989
if (ar.succeeded()) {
9090
// directly close this connection
9191
if (status == Status.CONNECTED) {

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/pubsub/PgSubscriberImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private void tryConnect(long delayMillis, Handler<AsyncResult<Void>> handler) {
168168
}
169169

170170
private void doConnect(Handler<AsyncResult<Void>> completionHandler) {
171-
PgConnection.connect(vertx, options, ar -> handleConnectResult(completionHandler, ar));
171+
PgConnection.connect(vertx, options).onComplete(ar -> handleConnectResult(completionHandler, ar));
172172
}
173173

174174
private synchronized void handleConnectResult(Handler<AsyncResult<Void>> completionHandler, AsyncResult<PgConnection> ar1) {
@@ -191,7 +191,7 @@ private synchronized void handleConnectResult(Handler<AsyncResult<Void>> complet
191191
return channel.quotedName;
192192
})
193193
.collect(Collectors.joining(";LISTEN ", "LISTEN ", ""));
194-
conn.query(sql).execute(ar2 -> {
194+
conn.query(sql).execute().onComplete(ar2 -> {
195195
if (ar2.failed()) {
196196
log.error("Cannot LISTEN to channels", ar2.cause());
197197
conn.close();
@@ -224,7 +224,7 @@ void add(ChannelImpl sub) {
224224
if (conn != null) {
225225
subscribed = true;
226226
String sql = "LISTEN " + quotedName;
227-
conn.query(sql).execute(ar -> {
227+
conn.query(sql).execute().onComplete(ar -> {
228228
if (ar.succeeded()) {
229229
Handler<Void> handler = sub.subscribeHandler;
230230
if (handler != null) {
@@ -243,7 +243,7 @@ void remove(ChannelImpl sub) {
243243
if (subs.isEmpty()) {
244244
channels.remove(name, this);
245245
if (conn != null) {
246-
conn.query("UNLISTEN " + quotedName).execute(ar -> {
246+
conn.query("UNLISTEN " + quotedName).execute().onComplete(ar -> {
247247
if (ar.failed()) {
248248
log.error("Cannot UNLISTEN channel " + name, ar.cause());
249249
}

vertx-sql-client-templates/src/main/java/io/vertx/sqlclient/templates/impl/SqlTemplateImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public void execute(I parameters, Handler<AsyncResult<R>> handler) {
7070

7171
queryMapper
7272
.apply(client.preparedQuery(sqlTemplate.getSql()))
73-
.execute(tupleMapper.apply(parameters), handler);
73+
.execute(tupleMapper.apply(parameters))
74+
.onComplete(handler);
7475
}
7576

7677
@Override
@@ -86,7 +87,8 @@ public void executeBatch(List<I> batch, Handler<AsyncResult<R>> handler) {
8687
.executeBatch(batch
8788
.stream()
8889
.map(tupleMapper)
89-
.collect(Collectors.toList()), handler);
90+
.collect(Collectors.toList()))
91+
.onComplete(handler);
9092
}
9193

9294
@Override

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public Future<SqlConnection> getConnection() {
175175
.compose(v -> context.failedFuture(err), failure -> context.failedFuture(err));
176176
}
177177
}))
178-
.onComplete(ar -> conn.close(v -> context.removeLocal(PROPAGATABLE_CONNECTION))));
178+
.onComplete(ar -> conn.close().onComplete(v -> context.removeLocal(PROPAGATABLE_CONNECTION))));
179179
}
180180

181181
@Override

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public RowStream<Row> handler(Handler<Row> handler) {
9090
return this;
9191
}
9292
}
93-
c.read(fetch, this);
93+
c.read(fetch).onComplete(this);
9494
return this;
9595
}
9696

@@ -206,7 +206,7 @@ private void checkPending() {
206206
break;
207207
} else if (cursor.hasMore()) {
208208
readInProgress = true;
209-
cursor.read(fetch, this);
209+
cursor.read(fetch).onComplete(this);
210210
break;
211211
} else {
212212
cursor.close();

0 commit comments

Comments
 (0)