Skip to content

Commit cb6c926

Browse files
committed
[#1436] Close connection when uni is cancelled
1 parent cb1eaf1 commit cb6c926

File tree

2 files changed

+47
-20
lines changed

2 files changed

+47
-20
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionFactoryImpl.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55
*/
66
package org.hibernate.reactive.mutiny.impl;
77

8-
import io.smallrye.mutiny.Uni;
9-
import jakarta.persistence.criteria.CriteriaBuilder;
10-
import jakarta.persistence.metamodel.Metamodel;
8+
import java.lang.invoke.MethodHandles;
9+
import java.util.Objects;
10+
import java.util.concurrent.CompletionStage;
11+
import java.util.function.BiFunction;
12+
import java.util.function.Function;
13+
import java.util.function.Supplier;
14+
1115
import org.hibernate.Cache;
1216
import org.hibernate.internal.SessionCreationOptions;
1317
import org.hibernate.internal.SessionFactoryImpl;
@@ -25,12 +29,9 @@
2529
import org.hibernate.service.ServiceRegistry;
2630
import org.hibernate.stat.Statistics;
2731

28-
import java.lang.invoke.MethodHandles;
29-
import java.util.Objects;
30-
import java.util.concurrent.CompletionStage;
31-
import java.util.function.BiFunction;
32-
import java.util.function.Function;
33-
import java.util.function.Supplier;
32+
import io.smallrye.mutiny.Uni;
33+
import jakarta.persistence.criteria.CriteriaBuilder;
34+
import jakarta.persistence.metamodel.Metamodel;
3435

3536
import static org.hibernate.reactive.common.InternalStateAssertions.assertUseOnEventLoop;
3637

@@ -106,7 +107,12 @@ public Uni<Mutiny.Session> openSession(String tenantId) {
106107
*/
107108
private <S> Uni<S> create(ReactiveConnection connection, Supplier<S> supplier) {
108109
return Uni.createFrom().item( supplier )
109-
.onFailure().call( () -> Uni.createFrom().completionStage( connection.close() ) );
110+
.onCancellation().call( () -> close( connection ) )
111+
.onFailure().call( () -> close( connection ) );
112+
}
113+
114+
private static Uni<Void> close(ReactiveConnection connection) {
115+
return Uni.createFrom().completionStage( connection.close() );
110116
}
111117

112118
@Override
@@ -209,8 +215,8 @@ private<S extends Mutiny.Closeable, T> Uni<T> withSession(
209215
return sessionUni.chain( session -> Uni.createFrom().voidItem()
210216
.invoke( () -> context.put( contextKey, session ) )
211217
.chain( () -> work.apply( session ) )
212-
.eventually( () -> context.remove( contextKey ) )
213-
.eventually(session::close)
218+
.onTermination().invoke( () -> context.remove( contextKey ) )
219+
.onTermination().call( session::close )
214220
);
215221
}
216222

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
*/
66
package org.hibernate.reactive.pool.impl;
77

8+
import java.util.concurrent.CompletableFuture;
89
import java.util.concurrent.CompletionStage;
10+
import java.util.function.Consumer;
911

1012
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
1113
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
1214
import org.hibernate.reactive.pool.ReactiveConnection;
1315
import org.hibernate.reactive.pool.ReactiveConnectionPool;
1416

17+
import io.vertx.core.Future;
1518
import io.vertx.sqlclient.Pool;
1619
import io.vertx.sqlclient.SqlConnection;
1720

@@ -41,13 +44,13 @@ public abstract class SqlClientPool implements ReactiveConnectionPool {
4144

4245
/**
4346
* @return a Hibernate {@link SqlStatementLogger} for logging SQL
44-
* statements as they are executed
47+
* statements as they are executed
4548
*/
4649
protected abstract SqlStatementLogger getSqlStatementLogger();
4750

4851
/**
4952
* @return a Hibernate {@link SqlExceptionHelper} for converting
50-
* exceptions
53+
* exceptions
5154
*/
5255
protected abstract SqlExceptionHelper getSqlExceptionHelper();
5356

@@ -58,9 +61,7 @@ public abstract class SqlClientPool implements ReactiveConnectionPool {
5861
* subclasses which support multitenancy.
5962
*
6063
* @param tenantId the id of the tenant
61-
*
6264
* @throws UnsupportedOperationException if multitenancy is not supported
63-
*
6465
* @see ReactiveConnectionPool#getConnection(String)
6566
*/
6667
protected Pool getTenantPool(String tenantId) {
@@ -88,13 +89,33 @@ public CompletionStage<ReactiveConnection> getConnection(String tenantId, SqlExc
8889
}
8990

9091
private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool) {
91-
return pool.getConnection()
92-
.toCompletionStage().thenApply( this::newConnection );
92+
return completionStage( pool.getConnection().map( this::newConnection ), ReactiveConnection::close );
9393
}
9494

9595
private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool, SqlExceptionHelper sqlExceptionHelper) {
96-
return pool.getConnection()
97-
.toCompletionStage().thenApply( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) );
96+
return completionStage(
97+
pool.getConnection().map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ),
98+
ReactiveConnection::close
99+
);
100+
}
101+
102+
/**
103+
* @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation.
104+
*/
105+
private <T> CompletionStage<T> completionStage(Future<T> future, Consumer<T> onCancellation) {
106+
CompletableFuture<T> completableFuture = new CompletableFuture<>();
107+
future.onComplete( ar -> {
108+
if ( ar.succeeded() ) {
109+
if ( completableFuture.isCancelled() ) {
110+
onCancellation.accept( ar.result() );
111+
}
112+
completableFuture.complete( ar.result() );
113+
}
114+
else {
115+
completableFuture.completeExceptionally( ar.cause() );
116+
}
117+
} );
118+
return completableFuture;
98119
}
99120

100121
private SqlClientConnection newConnection(SqlConnection connection) {

0 commit comments

Comments
 (0)