@@ -103,27 +103,25 @@ public EventLoopContext apply(ContextInternal contextInternal) {
103
103
104
104
private final PoolConnector <PooledConnection > connector = new PoolConnector <PooledConnection >() {
105
105
@ Override
106
- public void connect (EventLoopContext context , PoolConnector . Listener listener , Handler < AsyncResult < ConnectResult < PooledConnection >>> handler ) {
106
+ public Future < ConnectResult < PooledConnection >> connect (EventLoopContext context , Listener listener ) {
107
107
Future <SqlConnection > future = connectionProvider .apply (context );
108
- future .onComplete (ar -> {
109
- if (ar .succeeded ()) {
110
- SqlConnectionBase res = (SqlConnectionBase ) ar .result ();
111
- Connection conn = res .unwrap ();
112
- if (conn .isValid ()) {
113
- PooledConnection pooled = new PooledConnection (res .factory (), conn , listener );
114
- conn .init (pooled );
115
- Handler <PooledConnection > connectionHandler = hook .get ();
116
- if (connectionHandler != null ) {
117
- pooled .poolResultHandler = handler ;
118
- connectionHandler .handle (pooled );
119
- } else {
120
- handler .handle (Future .succeededFuture (new ConnectResult <>(pooled , pipeliningLimit , 0 )));
121
- }
108
+ return future .compose (res -> {
109
+ SqlConnectionBase connBase = (SqlConnectionBase ) res ;
110
+ Connection conn = connBase .unwrap ();
111
+ if (conn .isValid ()) {
112
+ PooledConnection pooled = new PooledConnection (connBase .factory (), conn , listener );
113
+ conn .init (pooled );
114
+ Handler <PooledConnection > connectionHandler = hook .get ();
115
+ if (connectionHandler != null ) {
116
+ Promise <ConnectResult <PooledConnection >> p = Promise .promise ();
117
+ pooled .poolCallback = p ;
118
+ connectionHandler .handle (pooled );
119
+ return p .future ();
122
120
} else {
123
- handler . handle ( Future .failedFuture ( ConnectionBase . CLOSED_EXCEPTION ));
121
+ return Future .succeededFuture ( new ConnectResult <>( pooled , pipeliningLimit , 0 ));
124
122
}
125
123
} else {
126
- handler . handle ( Future .failedFuture (ar . cause ()) );
124
+ return Future .failedFuture (ConnectionBase . CLOSED_EXCEPTION );
127
125
}
128
126
});
129
127
}
@@ -261,7 +259,7 @@ public class PooledConnection implements Connection, Connection.Holder {
261
259
private final Connection conn ;
262
260
private final PoolConnector .Listener listener ;
263
261
private Holder holder ;
264
- private Handler < AsyncResult < ConnectResult <PooledConnection >>> poolResultHandler ;
262
+ private Promise < ConnectResult <PooledConnection >> poolCallback ;
265
263
private Lease <PooledConnection > lease ;
266
264
public long expirationTimestamp ;
267
265
@@ -332,11 +330,11 @@ private void doClose(Holder holder, Promise<Void> promise) {
332
330
promise .fail (msg );
333
331
} else {
334
332
this .holder = null ;
335
- Handler < AsyncResult < ConnectResult <PooledConnection >>> resultHandler = poolResultHandler ;
333
+ Promise < ConnectResult <PooledConnection >> resultHandler = poolCallback ;
336
334
if (resultHandler != null ) {
337
- poolResultHandler = null ;
335
+ poolCallback = null ;
338
336
promise .complete ();
339
- resultHandler .handle ( Future . succeededFuture ( new ConnectResult <>(this , pipeliningLimit , 0 ) ));
337
+ resultHandler .complete ( new ConnectResult <>(this , pipeliningLimit , 0 ));
340
338
return ;
341
339
}
342
340
if (beforeRecycle == null ) {
@@ -360,10 +358,10 @@ public void handleClosed() {
360
358
if (holder != null ) {
361
359
holder .handleClosed ();
362
360
}
363
- Handler < AsyncResult < ConnectResult <PooledConnection >>> resultHandler = poolResultHandler ;
361
+ Promise < ConnectResult <PooledConnection >> resultHandler = poolCallback ;
364
362
if (resultHandler != null ) {
365
- poolResultHandler = null ;
366
- resultHandler .handle ( Future . failedFuture ( ConnectionBase .CLOSED_EXCEPTION ) );
363
+ poolCallback = null ;
364
+ resultHandler .fail ( ConnectionBase .CLOSED_EXCEPTION );
367
365
}
368
366
listener .onRemove ();
369
367
}
0 commit comments