Skip to content

Commit c674ea3

Browse files
committed
[#1548] Implement reactive PersistentTable*Strategy
1 parent 0e6deac commit c674ea3

File tree

6 files changed

+469
-48
lines changed

6 files changed

+469
-48
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveSqmMultiTableMutationStrategyProvider.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@
1313
import org.hibernate.query.sqm.mutation.internal.cte.CteMutationStrategy;
1414
import org.hibernate.query.sqm.mutation.internal.temptable.LocalTemporaryTableInsertStrategy;
1515
import org.hibernate.query.sqm.mutation.internal.temptable.LocalTemporaryTableMutationStrategy;
16+
import org.hibernate.query.sqm.mutation.internal.temptable.PersistentTableInsertStrategy;
17+
import org.hibernate.query.sqm.mutation.internal.temptable.PersistentTableMutationStrategy;
1618
import org.hibernate.query.sqm.mutation.spi.SqmMultiTableInsertStrategy;
1719
import org.hibernate.query.sqm.mutation.spi.SqmMultiTableMutationStrategy;
1820
import org.hibernate.query.sqm.mutation.spi.SqmMultiTableMutationStrategyProvider;
1921
import org.hibernate.reactive.query.sqm.mutation.internal.cte.ReactiveCteInsertStrategy;
2022
import org.hibernate.reactive.query.sqm.mutation.internal.cte.ReactiveCteMutationStrategy;
2123
import org.hibernate.reactive.query.sqm.mutation.internal.temptable.ReactiveLocalTemporaryTableInsertStrategy;
2224
import org.hibernate.reactive.query.sqm.mutation.internal.temptable.ReactiveLocalTemporaryTableMutationStrategy;
25+
import org.hibernate.reactive.query.sqm.mutation.internal.temptable.ReactivePersistentTableInsertStrategy;
26+
import org.hibernate.reactive.query.sqm.mutation.internal.temptable.ReactivePersistentTableMutationStrategy;
2327

2428
public class ReactiveSqmMultiTableMutationStrategyProvider implements SqmMultiTableMutationStrategyProvider {
2529

@@ -35,6 +39,9 @@ public SqmMultiTableMutationStrategy createMutationStrategy(
3539
if ( mutationStrategy instanceof LocalTemporaryTableMutationStrategy ) {
3640
return new ReactiveLocalTemporaryTableMutationStrategy( (LocalTemporaryTableMutationStrategy) mutationStrategy );
3741
}
42+
if ( mutationStrategy instanceof PersistentTableMutationStrategy ) {
43+
return new ReactivePersistentTableMutationStrategy( (PersistentTableMutationStrategy) mutationStrategy );
44+
}
3845
return mutationStrategy;
3946
}
4047

@@ -59,6 +66,9 @@ public SqmMultiTableInsertStrategy createInsertStrategy(
5966
if ( insertStrategy instanceof LocalTemporaryTableInsertStrategy ) {
6067
return new ReactiveLocalTemporaryTableInsertStrategy( (LocalTemporaryTableInsertStrategy) insertStrategy );
6168
}
69+
if ( insertStrategy instanceof PersistentTableInsertStrategy ) {
70+
return new ReactivePersistentTableInsertStrategy( (PersistentTableInsertStrategy) insertStrategy );
71+
}
6272
return insertStrategy;
6373
}
6474

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/mutation/internal/temptable/ReactiveExecuteWithTemporaryTableHelper.java

Lines changed: 21 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.hibernate.query.sqm.mutation.internal.temptable.BeforeUseAction;
3131
import org.hibernate.reactive.logging.impl.Log;
3232
import org.hibernate.reactive.logging.impl.LoggerFactory;
33+
import org.hibernate.reactive.query.sqm.mutation.internal.temptable.ReactiveTemporaryTableHelper.TemporaryTableCreationWork;
3334
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
3435
import org.hibernate.reactive.sql.exec.internal.StandardReactiveJdbcMutationExecutor;
3536
import org.hibernate.spi.NavigablePath;
@@ -50,6 +51,7 @@
5051
import org.hibernate.sql.exec.spi.JdbcParameterBindings;
5152
import org.hibernate.sql.results.internal.SqlSelectionImpl;
5253

54+
import static org.hibernate.reactive.query.sqm.mutation.internal.temptable.ReactiveTemporaryTableHelper.cleanTemporaryTableRows;
5355
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
5456

5557
/**
@@ -297,22 +299,12 @@ public static CompletionStage<Void> performBeforeTemporaryTableUseActions(
297299
final SessionFactoryImplementor factory = executionContext.getSession().getFactory();
298300
final Dialect dialect = factory.getJdbcServices().getDialect();
299301
if ( dialect.getTemporaryTableBeforeUseAction() == BeforeUseAction.CREATE ) {
300-
final ReactiveTemporaryTableHelper.TemporaryTableCreationWork temporaryTableCreationWork = new ReactiveTemporaryTableHelper
301-
.TemporaryTableCreationWork( temporaryTable, factory );
302-
302+
final TemporaryTableCreationWork temporaryTableCreationWork = new TemporaryTableCreationWork( temporaryTable, factory );
303303
final TempTableDdlTransactionHandling ddlTransactionHandling = dialect.getTemporaryTableDdlTransactionHandling();
304304
if ( ddlTransactionHandling == TempTableDdlTransactionHandling.NONE ) {
305305
return temporaryTableCreationWork.reactiveExecute( ( (ReactiveConnectionSupplier) executionContext.getSession() ).getReactiveConnection() );
306306
}
307-
else {
308-
throw LOG.notYetImplemented();
309-
// final IsolationDelegate isolationDelegate = executionContext.getSession()
310-
// .getJdbcCoordinator()
311-
// .getJdbcSessionOwner()
312-
// .getTransactionCoordinator()
313-
// .createIsolationDelegate();
314-
// isolationDelegate.delegateWork( temporaryTableCreationWork, ddlTransactionHandling == TempTableDdlTransactionHandling.ISOLATE_AND_TRANSACT );
315-
}
307+
throw LOG.notYetImplemented();
316308
}
317309
return voidFuture();
318310
}
@@ -326,39 +318,29 @@ public static CompletionStage<Void> performAfterTemporaryTableUseActions(
326318
final Dialect dialect = factory.getJdbcServices().getDialect();
327319
switch ( afterUseAction ) {
328320
case CLEAN:
329-
return ReactiveTemporaryTableHelper.cleanTemporaryTableRows(
330-
temporaryTable,
331-
dialect.getTemporaryTableExporter(),
332-
sessionUidAccess,
333-
executionContext.getSession()
334-
);
321+
return cleanTemporaryTableRows( temporaryTable, dialect.getTemporaryTableExporter(), sessionUidAccess, executionContext.getSession() );
335322
case DROP:
336-
final ReactiveTemporaryTableHelper.TemporaryTableDropWork temporaryTableDropWork = new ReactiveTemporaryTableHelper.TemporaryTableDropWork(
337-
temporaryTable,
338-
factory
339-
);
340-
341-
final TempTableDdlTransactionHandling ddlTransactionHandling = dialect.getTemporaryTableDdlTransactionHandling();
342-
if ( ddlTransactionHandling == TempTableDdlTransactionHandling.NONE ) {
343-
return temporaryTableDropWork.reactiveExecute( ( (ReactiveConnectionSupplier) executionContext.getSession() ).getReactiveConnection() );
344-
}
345-
else {
346-
throw LOG.notYetImplemented();
347-
// final IsolationDelegate isolationDelegate = executionContext.getSession()
348-
// .getJdbcCoordinator()
349-
// .getJdbcSessionOwner()
350-
// .getTransactionCoordinator()
351-
// .createIsolationDelegate();
352-
// isolationDelegate.delegateWork(
353-
// temporaryTableDropWork,
354-
// ddlTransactionHandling == TempTableDdlTransactionHandling.ISOLATE_AND_TRANSACT
355-
// );
356-
}
323+
return dropAction( temporaryTable, executionContext, factory, dialect );
357324
default:
358325
return voidFuture();
359326
}
360327
}
361328

329+
private static CompletionStage<Void> dropAction(
330+
TemporaryTable temporaryTable,
331+
ExecutionContext executionContext,
332+
SessionFactoryImplementor factory,
333+
Dialect dialect) {
334+
final TempTableDdlTransactionHandling ddlTransactionHandling = dialect.getTemporaryTableDdlTransactionHandling();
335+
if ( ddlTransactionHandling == TempTableDdlTransactionHandling.NONE ) {
336+
return new ReactiveTemporaryTableHelper
337+
.TemporaryTableDropWork( temporaryTable, factory )
338+
.reactiveExecute( ( (ReactiveConnectionSupplier) executionContext.getSession() ).getReactiveConnection() );
339+
}
340+
341+
throw LOG.notYetImplemented();
342+
}
343+
362344
private static void doNothing(Integer integer, PreparedStatement preparedStatement) {
363345
}
364346
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.query.sqm.mutation.internal.temptable;
7+
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.CompletionStage;
10+
11+
import org.hibernate.engine.jdbc.connections.spi.JdbcConnectionAccess;
12+
import org.hibernate.engine.spi.SessionFactoryImplementor;
13+
import org.hibernate.engine.spi.SharedSessionContractImplementor;
14+
import org.hibernate.metamodel.mapping.internal.MappingModelCreationProcess;
15+
import org.hibernate.query.spi.DomainQueryExecutionContext;
16+
import org.hibernate.query.sqm.internal.DomainParameterXref;
17+
import org.hibernate.query.sqm.mutation.internal.temptable.PersistentTableInsertStrategy;
18+
import org.hibernate.query.sqm.tree.insert.SqmInsertStatement;
19+
import org.hibernate.reactive.query.sqm.mutation.spi.ReactiveSqmMultiTableInsertStrategy;
20+
21+
public class ReactivePersistentTableInsertStrategy extends PersistentTableInsertStrategy
22+
implements ReactivePersistentTableStrategy, ReactiveSqmMultiTableInsertStrategy {
23+
24+
private final CompletableFuture<Void> tableCreatedStage = new CompletableFuture();
25+
26+
private final CompletableFuture<Void> tableDroppedStage = new CompletableFuture();
27+
28+
private boolean prepared;
29+
30+
private boolean dropIdTables;
31+
32+
public ReactivePersistentTableInsertStrategy(PersistentTableInsertStrategy strategy) {
33+
super( strategy.getTemporaryTable(), strategy.getSessionFactory() );
34+
}
35+
36+
private static String sessionIdentifier(SharedSessionContractImplementor session) {
37+
return session.getSessionIdentifier().toString();
38+
}
39+
40+
@Override
41+
public void prepare(
42+
MappingModelCreationProcess mappingModelCreationProcess,
43+
JdbcConnectionAccess connectionAccess) {
44+
prepare( mappingModelCreationProcess, connectionAccess, tableCreatedStage );
45+
}
46+
47+
@Override
48+
public void release(SessionFactoryImplementor sessionFactory, JdbcConnectionAccess connectionAccess) {
49+
release( sessionFactory, connectionAccess, tableDroppedStage );
50+
}
51+
52+
@Override
53+
public CompletionStage<Integer> reactiveExecuteInsert(
54+
SqmInsertStatement<?> sqmInsertStatement,
55+
DomainParameterXref domainParameterXref,
56+
DomainQueryExecutionContext context) {
57+
return tableCreatedStage.thenCompose( v -> new ReactiveTableBasedInsertHandler(
58+
sqmInsertStatement,
59+
domainParameterXref,
60+
getTemporaryTable(),
61+
getSessionFactory().getJdbcServices().getDialect().getTemporaryTableAfterUseAction(),
62+
ReactivePersistentTableInsertStrategy::sessionIdentifier,
63+
getSessionFactory()
64+
).reactiveExecute( context ) );
65+
}
66+
67+
68+
@Override
69+
public boolean isPrepared() {
70+
return prepared;
71+
}
72+
73+
@Override
74+
public void setPrepared(boolean prepared) {
75+
this.prepared = prepared;
76+
}
77+
78+
@Override
79+
public boolean isDropIdTables() {
80+
return dropIdTables;
81+
}
82+
83+
@Override
84+
public void setDropIdTables(boolean dropIdTables) {
85+
this.dropIdTables = dropIdTables;
86+
}
87+
88+
@Override
89+
public CompletionStage<Void> getDropTableActionStage() {
90+
return tableDroppedStage;
91+
}
92+
93+
@Override
94+
public CompletionStage<Void> getCreateTableActionStage() {
95+
return tableCreatedStage;
96+
}
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.query.sqm.mutation.internal.temptable;
7+
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.CompletionStage;
10+
11+
import org.hibernate.engine.jdbc.connections.spi.JdbcConnectionAccess;
12+
import org.hibernate.engine.spi.SessionFactoryImplementor;
13+
import org.hibernate.engine.spi.SharedSessionContractImplementor;
14+
import org.hibernate.metamodel.mapping.internal.MappingModelCreationProcess;
15+
import org.hibernate.query.spi.DomainQueryExecutionContext;
16+
import org.hibernate.query.sqm.internal.DomainParameterXref;
17+
import org.hibernate.query.sqm.mutation.internal.temptable.PersistentTableMutationStrategy;
18+
import org.hibernate.query.sqm.tree.delete.SqmDeleteStatement;
19+
import org.hibernate.query.sqm.tree.update.SqmUpdateStatement;
20+
import org.hibernate.reactive.query.sqm.mutation.spi.ReactiveSqmMultiTableMutationStrategy;
21+
22+
23+
public class ReactivePersistentTableMutationStrategy extends PersistentTableMutationStrategy
24+
implements ReactivePersistentTableStrategy, ReactiveSqmMultiTableMutationStrategy {
25+
26+
private final CompletableFuture<Void> tableCreatedStage = new CompletableFuture();
27+
28+
private final CompletableFuture<Void> tableDroppedStage = new CompletableFuture();
29+
30+
private boolean prepared;
31+
32+
private boolean dropIdTables;
33+
34+
public ReactivePersistentTableMutationStrategy(PersistentTableMutationStrategy original) {
35+
super( original.getTemporaryTable(), original.getSessionFactory() );
36+
}
37+
38+
private static String sessionIdentifier(SharedSessionContractImplementor session) {
39+
return session.getSessionIdentifier().toString();
40+
}
41+
42+
@Override
43+
public void prepare(
44+
MappingModelCreationProcess mappingModelCreationProcess,
45+
JdbcConnectionAccess connectionAccess) {
46+
prepare( mappingModelCreationProcess, connectionAccess, tableCreatedStage );
47+
}
48+
49+
@Override
50+
public void release(SessionFactoryImplementor sessionFactory, JdbcConnectionAccess connectionAccess) {
51+
release( sessionFactory, connectionAccess, tableDroppedStage );
52+
}
53+
54+
@Override
55+
public CompletionStage<Integer> reactiveExecuteUpdate(
56+
SqmUpdateStatement<?> sqmUpdateStatement,
57+
DomainParameterXref domainParameterXref,
58+
DomainQueryExecutionContext context) {
59+
return tableCreatedStage
60+
.thenCompose( v -> new ReactiveTableBasedUpdateHandler(
61+
sqmUpdateStatement,
62+
domainParameterXref,
63+
getTemporaryTable(),
64+
getSessionFactory().getJdbcServices().getDialect().getTemporaryTableAfterUseAction(),
65+
ReactivePersistentTableMutationStrategy::sessionIdentifier,
66+
getSessionFactory()
67+
).reactiveExecute( context ) );
68+
}
69+
70+
@Override
71+
public CompletionStage<Integer> reactiveExecuteDelete(
72+
SqmDeleteStatement<?> sqmDeleteStatement,
73+
DomainParameterXref domainParameterXref,
74+
DomainQueryExecutionContext context) {
75+
return tableCreatedStage
76+
.thenCompose( v -> new ReactiveTableBasedDeleteHandler(
77+
sqmDeleteStatement,
78+
domainParameterXref,
79+
getTemporaryTable(),
80+
getSessionFactory().getJdbcServices().getDialect().getTemporaryTableAfterUseAction(),
81+
ReactivePersistentTableMutationStrategy::sessionIdentifier,
82+
getSessionFactory()
83+
).reactiveExecute( context ) );
84+
}
85+
86+
@Override
87+
public CompletionStage<Void> getDropTableActionStage() {
88+
return tableDroppedStage;
89+
}
90+
91+
@Override
92+
public CompletionStage<Void> getCreateTableActionStage() {
93+
return tableCreatedStage;
94+
}
95+
96+
@Override
97+
public boolean isPrepared() {
98+
return prepared;
99+
}
100+
101+
@Override
102+
public void setPrepared(boolean prepared) {
103+
this.prepared = prepared;
104+
}
105+
106+
@Override
107+
public boolean isDropIdTables() {
108+
return dropIdTables;
109+
}
110+
111+
@Override
112+
public void setDropIdTables(boolean dropIdTables) {
113+
this.dropIdTables = dropIdTables;
114+
}
115+
}

0 commit comments

Comments
 (0)