5
5
*/
6
6
package org .hibernate .reactive .engine .jdbc .mutation .internal ;
7
7
8
+ import static org .hibernate .reactive .engine .jdbc .ResultsCheckerUtil .checkResults ;
9
+ import static org .hibernate .reactive .util .impl .CompletionStages .completedFuture ;
10
+ import static org .hibernate .sql .model .ModelMutationLogging .MODEL_MUTATION_LOGGER ;
11
+ import static org .hibernate .sql .model .ModelMutationLogging .MODEL_MUTATION_LOGGER_TRACE_ENABLED ;
12
+ import java .util .concurrent .CompletionStage ;
13
+ import java .util .concurrent .atomic .AtomicReference ;
8
14
import org .hibernate .engine .jdbc .mutation .OperationResultChecker ;
15
+ import org .hibernate .engine .jdbc .mutation .ParameterUsage ;
9
16
import org .hibernate .engine .jdbc .mutation .TableInclusionChecker ;
17
+ import org .hibernate .engine .jdbc .mutation .group .PreparedStatementDetails ;
10
18
import org .hibernate .engine .jdbc .mutation .internal .MutationExecutorPostInsert ;
11
19
import org .hibernate .engine .spi .SharedSessionContractImplementor ;
20
+ import org .hibernate .persister .entity .mutation .EntityTableMapping ;
21
+ import org .hibernate .reactive .adaptor .impl .PrepareStatementDetailsAdaptor ;
22
+ import org .hibernate .reactive .adaptor .impl .PreparedStatementAdaptor ;
12
23
import org .hibernate .reactive .engine .jdbc .env .internal .ReactiveMutationExecutor ;
24
+ import org .hibernate .reactive .id .insert .ReactiveInsertGeneratedIdentifierDelegate ;
25
+ import org .hibernate .reactive .pool .ReactiveConnection ;
26
+ import org .hibernate .reactive .session .ReactiveConnectionSupplier ;
13
27
import org .hibernate .sql .model .MutationOperationGroup ;
14
28
import org .hibernate .sql .model .ValuesAnalysis ;
15
29
@@ -21,12 +35,105 @@ public ReactiveMutationExecutorPostInsert(
21
35
super ( mutationOperationGroup , session );
22
36
}
23
37
24
- public Object execute (
25
- Object modelReference ,
26
- ValuesAnalysis valuesAnalysis ,
38
+ @ Override
39
+ public CompletionStage <Object > executeReactive (Object modelReference , ValuesAnalysis valuesAnalysis ,
40
+ TableInclusionChecker inclusionChecker ,
41
+ OperationResultChecker resultChecker ,
42
+ SharedSessionContractImplementor session ) {
43
+ return ( (ReactiveInsertGeneratedIdentifierDelegate ) mutationTarget .getIdentityInsertDelegate () )
44
+ .reactivePerformInsert (
45
+ identityInsertStatementDetails ,
46
+ getJdbcValueBindings (),
47
+ modelReference ,
48
+ session
49
+ )
50
+ .thenApply ( this ::logId )
51
+ .thenCompose (id -> {
52
+ if (secondaryTablesStatementGroup == null ) {
53
+ return completedFuture (id );
54
+ }
55
+ AtomicReference <CompletionStage <Object >> res = new AtomicReference <>(completedFuture (id ));
56
+ secondaryTablesStatementGroup .forEachStatement ((tableName , statementDetails ) -> {
57
+ res .set (res .get ().thenCompose (i -> reactiveExecuteWithId (i , tableName , statementDetails , inclusionChecker , resultChecker , session )));
58
+ });
59
+ return res .get ();
60
+ });
61
+ }
62
+
63
+ private Object logId (Object identifier ) {
64
+ if ( MODEL_MUTATION_LOGGER_TRACE_ENABLED ) {
65
+ MODEL_MUTATION_LOGGER
66
+ .tracef ( "Post-insert generated value : `%s` (%s)" , identifier , mutationTarget .getNavigableRole ().getFullPath () );
67
+ }
68
+ return identifier ;
69
+ }
70
+
71
+ private CompletionStage <Object > reactiveExecuteWithId (
72
+ Object id ,
73
+ String tableName ,
74
+ PreparedStatementDetails statementDetails ,
27
75
TableInclusionChecker inclusionChecker ,
28
76
OperationResultChecker resultChecker ,
29
77
SharedSessionContractImplementor session ) {
30
- throw LOG .nonReactiveMethodCall ( "executeReactive" );
78
+
79
+ if ( statementDetails == null ) {
80
+ return completedFuture (id );
81
+ }
82
+
83
+ final EntityTableMapping tableDetails = (EntityTableMapping ) statementDetails .getMutatingTableDetails ();
84
+ assert !tableDetails .isIdentifierTable ();
85
+
86
+ if ( inclusionChecker != null && !inclusionChecker .include ( tableDetails ) ) {
87
+ if ( MODEL_MUTATION_LOGGER_TRACE_ENABLED ) {
88
+ MODEL_MUTATION_LOGGER .tracef (
89
+ "Skipping execution of secondary insert : %s" ,
90
+ tableDetails .getTableName ()
91
+ );
92
+ }
93
+ return completedFuture (id );
94
+ }
95
+
96
+
97
+ tableDetails .getKeyMapping ().breakDownKeyJdbcValues (
98
+ id ,
99
+ (jdbcValue , columnMapping ) -> {
100
+ valueBindings .bindValue (
101
+ jdbcValue ,
102
+ tableName ,
103
+ columnMapping .getColumnName (),
104
+ ParameterUsage .SET
105
+ );
106
+ },
107
+ session
108
+ );
109
+
110
+ session .getJdbcServices ().getSqlStatementLogger ().logStatement ( statementDetails .getSqlString () );
111
+
112
+ Object [] params = PreparedStatementAdaptor .bind ( statement -> {
113
+ PreparedStatementDetails details = new PrepareStatementDetailsAdaptor ( statementDetails , statement , session .getJdbcServices () );
114
+ //noinspection resource
115
+ details .resolveStatement ();
116
+ valueBindings .beforeStatement ( details );
117
+ } );
118
+
119
+ ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier ) session ).getReactiveConnection ();
120
+ String sqlString = statementDetails .getSqlString ();
121
+ return reactiveConnection
122
+ .update ( sqlString , params )
123
+ .thenApply ( affectedRowCount -> {
124
+ if ( affectedRowCount == 0 && tableDetails .isOptional () ) {
125
+ // the optional table did not have a row
126
+ return completedFuture (id );
127
+ }
128
+ checkResults ( session , statementDetails , resultChecker , affectedRowCount , -1 );
129
+ return id ;
130
+ } ).whenComplete ( (unused , throwable ) -> {
131
+ if ( statementDetails .getStatement () != null ) {
132
+ statementDetails .releaseStatement ( session );
133
+ }
134
+ valueBindings .afterStatement ( tableDetails );
135
+ } );
31
136
}
137
+
138
+
32
139
}
0 commit comments