24
24
import io .vertx .core .impl .VertxInternal ;
25
25
import io .vertx .sqlclient .Pool ;
26
26
import io .vertx .sqlclient .PoolOptions ;
27
- import io .vertx .sqlclient .SqlClient ;
28
27
import io .vertx .sqlclient .SqlConnection ;
29
28
import io .vertx .sqlclient .impl .command .CommandBase ;
30
29
import io .vertx .core .AsyncResult ;
33
32
import io .vertx .sqlclient .impl .pool .ConnectionPool ;
34
33
import io .vertx .sqlclient .impl .tracing .QueryTracer ;
35
34
36
- import java .util .function .Function ;
37
-
38
35
/**
39
36
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
40
37
* @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a>
@@ -75,11 +72,6 @@ protected <T> Promise<T> promise(Handler<AsyncResult<T>> handler) {
75
72
*/
76
73
public abstract void connect (Handler <AsyncResult <Connection >> completionHandler );
77
74
78
- private void acquire (Handler <AsyncResult <Connection >> completionHandler ) {
79
- pool .acquire (completionHandler );
80
- }
81
-
82
-
83
75
@ Override
84
76
public void getConnection (Handler <AsyncResult <SqlConnection >> handler ) {
85
77
Future <SqlConnection > fut = getConnection ();
@@ -100,31 +92,6 @@ public Future<SqlConnection> getConnection() {
100
92
});
101
93
}
102
94
103
- @ Override
104
- public <T > void withTransaction (Function <SqlClient , Future <T >> function , Handler <AsyncResult <T >> handler ) {
105
- Future <T > res = withTransaction (function );
106
- if (handler != null ) {
107
- res .onComplete (handler );
108
- }
109
- }
110
-
111
- @ Override
112
- public <T > Future <T > withTransaction (Function <SqlClient , Future <T >> function ) {
113
- return getConnection ()
114
- .flatMap (conn -> conn
115
- .begin ()
116
- .flatMap (tx -> function
117
- .apply (conn )
118
- .compose (
119
- res -> tx
120
- .commit ()
121
- .flatMap (v -> Future .succeededFuture (res )),
122
- err -> tx
123
- .rollback ()
124
- .flatMap (v -> Future .failedFuture (err ))))
125
- .onComplete (ar -> conn .close ()));
126
- }
127
-
128
95
@ Override
129
96
public <R > void schedule (CommandBase <R > cmd , Promise <R > promise ) {
130
97
acquire (new CommandWaiter () {
@@ -141,7 +108,11 @@ protected void onFailure(Throwable cause) {
141
108
});
142
109
}
143
110
144
- private abstract class CommandWaiter implements Connection .Holder , Handler <AsyncResult <Connection >> {
111
+ private void acquire (Handler <AsyncResult <Connection >> completionHandler ) {
112
+ pool .acquire (completionHandler );
113
+ }
114
+
115
+ private static abstract class CommandWaiter implements Connection .Holder , Handler <AsyncResult <Connection >> {
145
116
146
117
protected abstract void onSuccess (Connection conn );
147
118
0 commit comments