From b54ce5ac7db273a85c369f2be3a6f14ba08f83b1 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Thu, 6 Mar 2025 18:13:51 +0100 Subject: [PATCH 1/4] [#2139] Refactoring of ReactiveStatelessDefaultBatchSizeTest * Rename it to ReactiveStatelessWithBatchTest * Clean up unit tests --- ...ReactiveStatelessDefaultBatchSizeTest.java | 622 ------------------ .../ReactiveStatelessWithBatchTest.java | 422 ++++++++++++ 2 files changed, 422 insertions(+), 622 deletions(-) delete mode 100644 hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessDefaultBatchSizeTest.java create mode 100644 hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessWithBatchTest.java diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessDefaultBatchSizeTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessDefaultBatchSizeTest.java deleted file mode 100644 index c4d74261e..000000000 --- a/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessDefaultBatchSizeTest.java +++ /dev/null @@ -1,622 +0,0 @@ -/* Hibernate, Relational Persistence for Idiomatic Java - * - * SPDX-License-Identifier: Apache-2.0 - * Copyright: Red Hat Inc. and Hibernate Authors - */ -package org.hibernate; - -import org.hibernate.boot.registry.StandardServiceRegistryBuilder; -import org.hibernate.cfg.AvailableSettings; -import org.hibernate.cfg.Configuration; -import org.hibernate.reactive.BaseReactiveTest; -import org.hibernate.reactive.stage.Stage; -import org.hibernate.reactive.testing.SqlStatementTracker; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import io.vertx.junit5.Timeout; -import io.vertx.junit5.VertxTestContext; -import jakarta.persistence.Entity; -import jakarta.persistence.Id; -import jakarta.persistence.Table; -import java.util.List; -import java.util.Objects; -import java.util.Set; - -import static java.util.concurrent.TimeUnit.MINUTES; -import static org.assertj.core.api.Assertions.assertThat; - -/** - * The test aims to check that methods accepting the batch size as parameter e.g. {@link Stage.StatelessSession#insert(int, Object...)} - * work when {@link AvailableSettings.STATEMENT_BATCH_SIZE} hasn't been set. - */ -@Timeout(value = 10, timeUnit = MINUTES) -public class ReactiveStatelessDefaultBatchSizeTest extends BaseReactiveTest { - private static SqlStatementTracker sqlTracker; - - private static final String PIG_ONE_NAME = "One"; - private static final String PIG_TWO_NAME = "Two"; - private static final String PIG_THREE_NAME = "Three"; - private static final String PIG_FOUR_NAME = "Four"; - private static final String PIG_FIVE_NAME = "Five"; - private static final String PIG_SIX_NAME = "Six"; - - private static final GuineaPig PIG_ONE = new GuineaPig( 11, PIG_ONE_NAME ); - private static final GuineaPig PIG_TWO = new GuineaPig( 22, PIG_TWO_NAME ); - private static final GuineaPig PIG_THREE = new GuineaPig( 33, PIG_THREE_NAME ); - private static final GuineaPig PIG_FOUR = new GuineaPig( 44, PIG_FOUR_NAME ); - private static final GuineaPig PIG_FIVE = new GuineaPig( 55, PIG_FIVE_NAME ); - private static final GuineaPig PIG_SIX = new GuineaPig( 66, PIG_SIX_NAME ); - - private static final GuineaPig[] PIGS = { PIG_ONE, PIG_TWO, PIG_THREE, PIG_FOUR, PIG_FIVE, PIG_SIX, }; - - @Override - protected Set> annotatedEntities() { - return Set.of( GuineaPig.class ); - } - - @Override - protected Configuration constructConfiguration() { - Configuration configuration = super.constructConfiguration(); - - // Construct a tracker that collects query statements via the SqlStatementLogger framework. - // Pass in configuration properties to hand off any actual logging properties - sqlTracker = new SqlStatementTracker( - ReactiveStatelessDefaultBatchSizeTest::filter, - configuration.getProperties() - ); - return configuration; - } - - @BeforeEach - public void clearTracker() { - sqlTracker.clear(); - } - - @Override - protected void addServices(StandardServiceRegistryBuilder builder) { - sqlTracker.registerService( builder ); - } - - private static boolean filter(String s) { - String[] accepted = { "insert ", "update ", "delete " }; - for ( String valid : accepted ) { - if ( s.toLowerCase().startsWith( valid ) ) { - return true; - } - } - return false; - } - - @Test - public void testMutinyBatchingInsert(VertxTestContext context) { - test( context, getMutinySessionFactory().withStatelessTransaction( s -> s.insertAll( 10, PIGS ) ) - .invoke( () -> { - // We expect only one insert query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) - .matches( "insert into pig \\(name,id\\) values (.*)" ); - } ) - ); - } - - @Test - public void testMutinyBatchingInsertMultiple(VertxTestContext context) { - test( context, getMutinySessionFactory().withStatelessTransaction( s -> s.insertMultiple( List.of( PIGS ) ) ) - .invoke( () -> { - // We expect only one insert query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) - .matches( "insert into pig \\(name,id\\) values (.*)" ); - } ) - .invoke( v -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() - .invoke( pigs -> assertThat( pigs ).hasSize( PIGS.length ) ) - ) - ) - ); - } - - @Test - public void testMutinyBatchingInsertAllNoBatchSizeParameter(VertxTestContext context) { - test( context, getMutinySessionFactory().withStatelessTransaction( s -> s.insertAll( PIGS ) ) - .invoke( () -> { - // We expect only one insert query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) - .matches( "insert into pig \\(name,id\\) values (.*)" ); - } ) - .invoke( v -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() - .invoke( pigs -> assertThat( pigs ).hasSize( PIGS.length ) ) - ) - ) - ); - } - - @Test - public void testStageBatchingInsert(VertxTestContext context) { - test( context, getSessionFactory().withStatelessTransaction( s -> s.insert( 10, PIGS ) ) - .thenAccept( v -> { - // We expect only one insert query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) - .matches( "insert into pig \\(name,id\\) values (.*)" ); - } ) - .thenAccept( v -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() - .thenAccept( pigs -> assertThat( pigs ).hasSize( PIGS.length ) ) - ) - ) - ); - } - - @Test - public void testStageBatchingInsertMultiple(VertxTestContext context) { - test( context, getSessionFactory().withStatelessTransaction( s -> s.insertMultiple( List.of(PIGS) ) ) - .thenAccept( v -> { - // We expect only one insert query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) - .matches( "insert into pig \\(name,id\\) values (.*)" ); - } ) - .thenAccept( v -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() - .thenAccept( pigs -> assertThat( pigs ).hasSize( PIGS.length ) ) - ) - ) - ); - } - - @Test - public void testStageBatchingInsertNoBatchSizeParameter(VertxTestContext context) { - test( context, getSessionFactory().withStatelessTransaction( s -> s.insert( PIGS ) ) - .thenAccept( v -> { - // We expect only one insert query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) - .matches( "insert into pig \\(name,id\\) values (.*)" ); - } ) - .thenAccept( v -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() - .thenAccept( pigs -> assertThat( pigs ).hasSize( PIGS.length ) ) - ) - ) - ); - } - - @Test - public void testMutinyBatchingDelete(VertxTestContext context) { - test( context, getMutinySessionFactory().withStatelessTransaction( s -> s.insertAll( 10, PIGS ) ) - .invoke( sqlTracker::clear ) - .chain( v -> getMutinySessionFactory().withStatelessTransaction(s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ).getResultList() - ) - .invoke( pigs -> sqlTracker.clear() ) - .chain( pigs -> getMutinySessionFactory().withStatelessTransaction( - s -> - s.deleteAll( 10, pigs.subList( 0, 2 ).toArray() ) - ) - ) - .invoke( () -> { - // We expect only one delete query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).matches( "delete from pig where id=.*" ); - } ) - .chain( () -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() - ) - .invoke( guineaPigs -> assertThat( guineaPigs.size() ).isEqualTo( 4 ) ) - ) ) - ); - } - - @Test - public void testMutinyBatchingDeleteMultiple(VertxTestContext context) { - test( context, getMutinySessionFactory().withStatelessTransaction( s -> s.insertAll( 10, PIGS ) ) - .invoke( sqlTracker::clear ) - .chain( v -> getMutinySessionFactory().withStatelessTransaction(s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ).getResultList() - ) - .invoke( pigs -> sqlTracker.clear() ) - .chain( pigs -> getMutinySessionFactory().withStatelessTransaction( - s -> s.deleteMultiple( pigs.subList( 0, 2 ) ) ) - ) - .invoke( () -> { - // We expect only one delete query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).matches( "delete from pig where id=.*" ); - } ) - .chain( () -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() - ) - .invoke( guineaPigs -> assertThat( guineaPigs.size() ).isEqualTo( 4 ) ) - ) ) - ); - } - - @Test - public void testMutinyBatchingDeleteAllNoBatchSizeParameter(VertxTestContext context) { - test( context, getMutinySessionFactory().withStatelessTransaction( s -> s.insertAll( PIGS ) ) - .invoke( sqlTracker::clear ) - .chain( v -> getMutinySessionFactory().withStatelessTransaction(s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ).getResultList() - ) - .invoke( pigs -> sqlTracker.clear() ) - .chain( pigs -> getMutinySessionFactory().withStatelessTransaction( - s -> s.deleteAll( pigs.subList( 0, 2 ).toArray() ) ) - ) - .invoke( () -> { - // We expect only one delete query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).matches( "delete from pig where id=.*" ); - } ) - .chain( () -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() - ) - .invoke( guineaPigs -> assertThat( guineaPigs.size() ).isEqualTo( 4 ) ) - ) ) - ); - } - - @Test - public void testStageBatchingDelete(VertxTestContext context) { - test( context, getSessionFactory().withStatelessTransaction( s -> s.insert( 10, PIGS ) ) - .thenAccept( v -> sqlTracker.clear() ) - .thenCompose( v -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ).getResultList() - .thenCompose( pigs -> { - sqlTracker.clear(); - return s.delete( 10, pigs.subList( 0, 2 ).toArray() ); - } - ) ) - .thenAccept( vo -> { - // We expect only one delete query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) - .matches( "delete from pig where id=.*" ); - } ) - .thenCompose( vo -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() - ) - .thenAccept( guineaPigs -> assertThat( guineaPigs.size() ).isEqualTo( 4 ) ) - ) ) - ); - } - - @Test - public void testStageBatchingDeleteMultiple(VertxTestContext context) { - test( context, getSessionFactory().withStatelessTransaction( s -> s.insert( 10, PIGS ) ) - .thenAccept( v -> sqlTracker.clear() ) - .thenCompose( v -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ).getResultList() - .thenCompose( pigs -> { - sqlTracker.clear(); - return s.deleteMultiple( pigs.subList( 0, 2 ) ); - } - ) ) - .thenAccept( vo -> { - // We expect only one delete query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) - .matches( "delete from pig where id=.*" ); - } ) - .thenCompose( vo -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() ) - .thenAccept( guineaPigs -> assertThat( guineaPigs.size() ).isEqualTo( 4 ) ) - ) ) - ); - } - - @Test - public void testStageBatchingDeleteNoBatchSizeParameter(VertxTestContext context) { - test( context, getSessionFactory().withStatelessTransaction( s -> s.insert( 10, PIGS ) ) - .thenAccept( v -> sqlTracker.clear() ) - .thenCompose( v -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ).getResultList() - .thenCompose( pigs -> { - sqlTracker.clear(); - return s.delete( pigs.subList( 0, 2 ).toArray() ); - } - ) ) - .thenAccept( vo -> { - // We expect only one delete query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) - .matches( "delete from pig where id=.*" ); - } ) - .thenCompose( vo -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p", GuineaPig.class ) - .getResultList() ) - .thenAccept( guineaPigs -> assertThat( guineaPigs.size() ).isEqualTo( 4 ) ) - ) ) - ); - } - - @Test - public void testMutinyBatchingUpdate(VertxTestContext context) { - final String pigOneUpdatedName = "One updated"; - final String pigTwoUpdatedName = "Two updated"; - test( context, getMutinySessionFactory().withStatelessTransaction( s -> s .insertAll( 10, PIGS )) - .invoke( sqlTracker::clear ) - .chain( v -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p order by p.id", GuineaPig.class ) - .getResultList() - .invoke( pigs -> sqlTracker.clear() ) - .chain( pigs -> { - GuineaPig guineaPigOne = pigs.get( 0 ); - guineaPigOne.setName( pigOneUpdatedName ); - GuineaPig guineaPigTwo = pigs.get( 1 ); - guineaPigTwo.setName( pigTwoUpdatedName ); - return s.updateAll( 10, new GuineaPig[] { guineaPigOne, guineaPigTwo } ); - } ) - ) ) - .invoke( () -> { - // We expect only one update query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).matches( "update pig set name=.* where id=.*" ); - } ) - .chain( () -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p order by id", GuineaPig.class ) - .getResultList() - .invoke( guineaPigs -> { - checkPigsAreCorrectlyUpdated( guineaPigs, pigOneUpdatedName, pigTwoUpdatedName ); - } ) ) ) - ); - } - - @Test - public void testMutinyBatchingUpdateMultiple(VertxTestContext context) { - final String pigOneUpdatedName = "One updated"; - final String pigTwoUpdatedName = "Two updated"; - test( context, getMutinySessionFactory().withStatelessTransaction( s -> s .insertAll( 10, PIGS )) - .invoke( sqlTracker::clear ) - .chain( v -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p order by p.id", GuineaPig.class ) - .getResultList() - .invoke( pigs -> sqlTracker.clear() ) - .chain( pigs -> { - GuineaPig guineaPigOne = pigs.get( 0 ); - guineaPigOne.setName( pigOneUpdatedName ); - GuineaPig guineaPigTwo = pigs.get( 1 ); - guineaPigTwo.setName( pigTwoUpdatedName ); - return s.updateMultiple( List.of( guineaPigOne, guineaPigTwo ) ); - } ) - ) ) - .invoke( () -> { - // We expect only one update query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).matches( "update pig set name=.* where id=.*" ); - } ) - .chain( () -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p order by id", GuineaPig.class ) - .getResultList() - .invoke( guineaPigs -> { - checkPigsAreCorrectlyUpdated( guineaPigs, pigOneUpdatedName, pigTwoUpdatedName ); - } ) ) ) - ); - } - - @Test - public void testMutinyBatchingUpdateAllNoBatchSizeParameter(VertxTestContext context) { - final String pigOneUpdatedName = "One updated"; - final String pigTwoUpdatedName = "Two updated"; - test( context, getMutinySessionFactory().withStatelessTransaction( s -> s .insertAll( 10, PIGS )) - .invoke( sqlTracker::clear ) - .chain( v -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p order by p.id", GuineaPig.class ) - .getResultList() - .invoke( pigs -> sqlTracker.clear() ) - .chain( pigs -> { - GuineaPig guineaPigOne = pigs.get( 0 ); - guineaPigOne.setName( pigOneUpdatedName ); - GuineaPig guineaPigTwo = pigs.get( 1 ); - guineaPigTwo.setName( pigTwoUpdatedName ); - return s.updateAll( guineaPigOne, guineaPigTwo ); - } ) - ) ) - .invoke( () -> { - // We expect only one update query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).matches( "update pig set name=.* where id=.*" ); - } ) - .chain( () -> getMutinySessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p order by id", GuineaPig.class ) - .getResultList() - .invoke( guineaPigs -> { - checkPigsAreCorrectlyUpdated( guineaPigs, pigOneUpdatedName, pigTwoUpdatedName ); - } ) ) ) - ); - } - - @Test - public void testStageBatchingUpdate(VertxTestContext context) { - final String pigOneUpdatedName = "One updated"; - final String pigTwoUpdatedName = "Two updated"; - test(context, getSessionFactory().withStatelessTransaction( s -> s.insert( 10, PIGS ) ) - .thenAccept( v -> sqlTracker.clear() ) - .thenCompose( v -> getSessionFactory().withStatelessTransaction(s -> s - .createQuery( "select p from GuineaPig p order by p.id", GuineaPig.class ) - .getResultList() - .thenApply( pigs -> { - sqlTracker.clear(); - GuineaPig guineaPigOne = pigs.get( 0 ); - guineaPigOne.setName( pigOneUpdatedName ); - GuineaPig guineaPigTwo = pigs.get( 1 ); - guineaPigTwo.setName( pigTwoUpdatedName ); - return s.update( 10, new GuineaPig[] { guineaPigOne, guineaPigTwo } ); - } ) - ) - .thenAccept( vo -> { - // We expect only one update query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).matches( "update pig set name=.* where id=.*" ); - } ) - .thenCompose( vo -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p order by id", GuineaPig.class ) - .getResultList() - .thenAccept( guineaPigs -> - checkPigsAreCorrectlyUpdated( guineaPigs, pigOneUpdatedName, pigTwoUpdatedName ) - ) - ) ) ) - ); - } - - @Test - public void testStageBatchingUpdateMultiple(VertxTestContext context) { - final String pigOneUpdatedName = "One updated"; - final String pigTwoUpdatedName = "Two updated"; - test(context, getSessionFactory().withStatelessTransaction( s -> s.insert( 10, PIGS ) ) - .thenAccept( v -> sqlTracker.clear() ) - .thenCompose( v -> getSessionFactory().withStatelessTransaction(s -> s - .createQuery( "select p from GuineaPig p order by p.id", GuineaPig.class ) - .getResultList() - .thenApply( pigs -> { - sqlTracker.clear(); - GuineaPig guineaPigOne = pigs.get( 0 ); - guineaPigOne.setName( pigOneUpdatedName ); - GuineaPig guineaPigTwo = pigs.get( 1 ); - guineaPigTwo.setName( pigTwoUpdatedName ); - return s.updateMultiple( List.of( guineaPigOne, guineaPigTwo ) ); - } ) - ) - .thenAccept( vo -> { - // We expect only one update query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).matches( "update pig set name=.* where id=.*" ); - } ) - .thenCompose( vo -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p order by id", GuineaPig.class ) - .getResultList() - .thenAccept( guineaPigs -> - checkPigsAreCorrectlyUpdated( guineaPigs, pigOneUpdatedName, pigTwoUpdatedName ) - ) - ) ) ) - ); - } - - @Test - public void testStageBatchingUpdateNoBatchSizeParameter(VertxTestContext context) { - final String pigOneUpdatedName = "One updated"; - final String pigTwoUpdatedName = "Two updated"; - test(context, getSessionFactory().withStatelessTransaction( s -> s.insert( 10, PIGS ) ) - .thenAccept( v -> sqlTracker.clear() ) - .thenCompose( v -> getSessionFactory().withStatelessTransaction(s -> s - .createQuery( "select p from GuineaPig p order by p.id", GuineaPig.class ) - .getResultList() - .thenApply( pigs -> { - sqlTracker.clear(); - GuineaPig guineaPigOne = pigs.get( 0 ); - guineaPigOne.setName( pigOneUpdatedName ); - GuineaPig guineaPigTwo = pigs.get( 1 ); - guineaPigTwo.setName( pigTwoUpdatedName ); - return s.update( guineaPigOne, guineaPigTwo ); - } ) - ) - .thenAccept( vo -> { - // We expect only one update query - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - // Parameters are different for different dbs, so we cannot do an exact match - assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).matches( "update pig set name=.* where id=.*" ); - } ) - .thenCompose( vo -> getSessionFactory().withStatelessTransaction( s -> s - .createQuery( "select p from GuineaPig p order by id", GuineaPig.class ) - .getResultList() - .thenAccept( guineaPigs -> - checkPigsAreCorrectlyUpdated( guineaPigs, pigOneUpdatedName, pigTwoUpdatedName ) - ) - ) ) ) - ); - } - - private static void checkPigsAreCorrectlyUpdated(List guineaPigs, String pigOneUpdatedName, String pigTwoUpdatedName) { - assertThat( guineaPigs.get( 0 ).getName() ).isEqualTo( pigOneUpdatedName ); - assertThat( guineaPigs.get( 1 ).getName() ).isEqualTo( pigTwoUpdatedName ); - assertThat( guineaPigs.get( 2 ).getName() ).isEqualTo( PIG_THREE_NAME ); - assertThat( guineaPigs.get( 3 ).getName() ).isEqualTo( PIG_FOUR_NAME ); - assertThat( guineaPigs.get( 4 ).getName() ).isEqualTo( PIG_FIVE_NAME ); - assertThat( guineaPigs.get( 5 ).getName() ).isEqualTo( PIG_SIX_NAME ); - } - - @Entity(name = "GuineaPig") - @Table(name = "pig") - public static class GuineaPig { - @Id - private Integer id; - private String name; - - public GuineaPig() { - } - - public GuineaPig(Integer id, String name) { - this.id = id; - this.name = name; - } - - public Integer getId() { - return id; - } - - public void setId(Integer id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - @Override - public String toString() { - return id + ": " + name; - } - - @Override - public boolean equals(Object o) { - if ( this == o ) { - return true; - } - if ( o == null || getClass() != o.getClass() ) { - return false; - } - GuineaPig guineaPig = (GuineaPig) o; - return Objects.equals( name, guineaPig.name ); - } - - @Override - public int hashCode() { - return Objects.hash( name ); - } - } -} diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessWithBatchTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessWithBatchTest.java new file mode 100644 index 000000000..f934090b8 --- /dev/null +++ b/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessWithBatchTest.java @@ -0,0 +1,422 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate; + +import org.hibernate.boot.registry.StandardServiceRegistryBuilder; +import org.hibernate.cfg.Configuration; +import org.hibernate.reactive.BaseReactiveTest; +import org.hibernate.reactive.testing.SqlStatementTracker; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Uni; +import io.vertx.junit5.Timeout; +import io.vertx.junit5.VertxTestContext; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletionStage; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test the stateless session actually execute the operations in batch. + */ +@Timeout(value = 10, timeUnit = MINUTES) +public class ReactiveStatelessWithBatchTest extends BaseReactiveTest { + private static SqlStatementTracker sqlTracker; + + private static final Object[] PIGS = { + new GuineaPig( 11, "One" ), + new GuineaPig( 22, "Two" ), + new GuineaPig( 33, "Three" ), + new GuineaPig( 44, "Four" ), + new GuineaPig( 55, "Five" ), + new GuineaPig( 66, "Six" ) + }; + + private static final Object[] PIGS_AFTER_DELETE = List.of( PIGS ) + .subList( 2, PIGS.length ) + .toArray(); + + private static final Object[] PIGS_AFTER_UPDATE = { + new GuineaPig( 11, "One updated" ), + new GuineaPig( 22, "Two updated" ), + new GuineaPig( 33, "Three" ), + new GuineaPig( 44, "Four" ), + new GuineaPig( 55, "Five" ), + new GuineaPig( 66, "Six" ) + }; + + @Override + protected Set> annotatedEntities() { + return Set.of( GuineaPig.class ); + } + + @Override + protected Configuration constructConfiguration() { + Configuration configuration = super.constructConfiguration(); + + // Construct a tracker that collects query statements via the SqlStatementLogger framework. + // Pass in configuration properties to hand off any actual logging properties + sqlTracker = new SqlStatementTracker( + ReactiveStatelessWithBatchTest::filter, + configuration.getProperties() + ); + return configuration; + } + + @BeforeEach + public void clearTracker() { + sqlTracker.clear(); + } + + @Override + protected void addServices(StandardServiceRegistryBuilder builder) { + sqlTracker.registerService( builder ); + } + + private static boolean filter(String s) { + String[] accepted = { "insert ", "update ", "delete " }; + for ( String valid : accepted ) { + if ( s.toLowerCase().startsWith( valid ) ) { + return true; + } + } + return false; + } + + @Test + public void testMutinyBatchingInsert(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.insertAll( 10, PIGS ) ) + .invoke( () -> assertSqlLogTracker( "insert into pig \\(name,id\\) values (.*)" ) ) + .chain( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS ) ) ) + ); + } + + @Test + public void testMutinyBatchingInsertMultiple(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.insertMultiple( List.of( PIGS ) ) ) + .invoke( () -> assertSqlLogTracker( "insert into pig \\(name,id\\) values (.*)" ) ) + .chain( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS ) ) ) + ); + } + + @Test + public void testMutinyBatchingInsertAllNoBatchSizeParameter(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.insertAll( PIGS ) ) + .invoke( () -> assertSqlLogTracker( "insert into pig \\(name,id\\) values (.*)" ) ) + .chain( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS ) ) ) + ); + } + + @Test + public void testStageBatchingInsert(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.insert( 10, PIGS ) ) + .thenAccept( v -> assertSqlLogTracker( "insert into pig \\(name,id\\) values (.*)" ) ) + .thenCompose( v -> assertExpectedResult( PIGS ) ) + ); + } + + @Test + public void testStageBatchingInsertMultiple(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.insertMultiple( List.of( PIGS ) ) ) + .thenAccept( v -> assertSqlLogTracker( "insert into pig \\(name,id\\) values (.*)" ) ) + .thenCompose( v -> assertExpectedResult( PIGS ) ) + ); + } + + @Test + public void testStageBatchingInsertNoBatchSizeParameter(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.insert( PIGS ) ) + .thenAccept( v -> assertSqlLogTracker( "insert into pig \\(name,id\\) values (.*)" ) ) + .thenCompose( v -> assertExpectedResult( PIGS ) ) + ); + } + + @Test + public void testMutinyBatchingDelete(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.insertAll( 10, PIGS ) ) + .invoke( sqlTracker::clear ) + .chain( v -> getMutinySessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p", GuineaPig.class ).getResultList() + ) ) + .chain( pigs -> getMutinySessionFactory().withStatelessTransaction( s -> s + .deleteAll( 10, pigs.subList( 0, 2 ).toArray() ) + ) ) + .invoke( () -> assertSqlLogTracker( "delete from pig where id=.*" ) ) + .call( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS_AFTER_DELETE ) ) ) + ); + } + + @Test + public void testMutinyBatchingDeleteMultiple(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.insertAll( 10, PIGS ) ) + .invoke( sqlTracker::clear ) + .chain( v -> getMutinySessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p", GuineaPig.class ).getResultList() + ) ) + .chain( pigs -> getMutinySessionFactory().withStatelessTransaction( s -> s + .deleteMultiple( pigs.subList( 0, 2 ) ) ) + ) + .invoke( () -> assertSqlLogTracker( "delete from pig where id=.*" ) ) + .call( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS_AFTER_DELETE ) ) ) + ); + } + + @Test + public void testMutinyBatchingDeleteAllNoBatchSizeParameter(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.insertAll( PIGS ) ) + .invoke( sqlTracker::clear ) + .chain( v -> getMutinySessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p", GuineaPig.class ).getResultList() + ) ) + .chain( pigs -> getMutinySessionFactory().withStatelessTransaction( s -> s + .deleteAll( pigs.subList( 0, 2 ).toArray() ) + ) ) + .invoke( () -> assertSqlLogTracker( "delete from pig where id=.*" ) ) + .call( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS_AFTER_DELETE ) ) ) + ); + } + + @Test + public void testStageBatchingDelete(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.insert( 10, PIGS ) ) + .thenRun( sqlTracker::clear ) + .thenCompose( v -> getSessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p", GuineaPig.class ).getResultList() + .thenCompose( pigs -> s.delete( 10, pigs.subList( 0, 2 ).toArray() ) ) + ) ) + .thenAccept( v -> assertSqlLogTracker( "delete from pig where id=.*" ) ) + .thenCompose( v -> assertExpectedResult( PIGS_AFTER_DELETE ) ) + ); + } + + @Test + public void testStageBatchingDeleteMultiple(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.insert( 10, PIGS ) ) + .thenRun( sqlTracker::clear ) + .thenCompose( v -> getSessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p", GuineaPig.class ).getResultList() + .thenCompose( pigs -> s.deleteMultiple( pigs.subList( 0, 2 ) ) ) + ) ) + .thenAccept( v -> assertSqlLogTracker( "delete from pig where id=.*" ) ) + .thenCompose( v -> assertExpectedResult( PIGS_AFTER_DELETE ) ) + ); + } + + @Test + public void testStageBatchingDeleteNoBatchSizeParameter(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.insert( 10, PIGS ) ) + .thenRun( sqlTracker::clear ) + .thenCompose( v -> getSessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p", GuineaPig.class ).getResultList() + .thenCompose( pigs -> s.delete( pigs.subList( 0, 2 ).toArray() ) ) + ) ) + .thenAccept( v -> assertSqlLogTracker( "delete from pig where id=.*" ) ) + .thenCompose( v -> assertExpectedResult( PIGS_AFTER_DELETE ) ) + ); + } + + @Test + public void testMutinyBatchingUpdate(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.insertAll( 10, PIGS ) ) + .invoke( sqlTracker::clear ) + .chain( v -> getMutinySessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p order by p.id", GuineaPig.class ) + .getResultList() + .chain( pigs -> { + pigs.get( 0 ).setName( "One updated" ); + pigs.get( 1 ).setName( "Two updated" ); + return s.updateAll( 10, pigs.toArray() ); + } ) + ) ) + .invoke( () -> assertSqlLogTracker( "update pig set name=.* where id=.*" ) ) + .call( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS_AFTER_UPDATE ) ) ) + ); + } + + @Test + public void testMutinyBatchingUpdateMultiple(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.insertAll( 10, PIGS ) ) + .invoke( sqlTracker::clear ) + .chain( v -> getMutinySessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p order by p.id", GuineaPig.class ) + .getResultList() + .chain( pigs -> { + pigs.get( 0 ).setName( "One updated" ); + pigs.get( 1 ).setName( "Two updated" ); + return s.updateMultiple( pigs.subList( 0, 2 ) ); + } ) ) + ) + .invoke( () -> assertSqlLogTracker( "update pig set name=.* where id=.*" ) ) + .call( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS_AFTER_UPDATE ) ) ) + ); + } + + @Test + public void testMutinyBatchingUpdateAllNoBatchSizeParameter(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.insertAll( 10, PIGS ) ) + .invoke( sqlTracker::clear ) + .chain( v -> getMutinySessionFactory().withStatelessTransaction( s -> s + .createQuery( "select p from GuineaPig p order by p.id", GuineaPig.class ) + .getResultList() + .chain( pigs -> { + pigs.get( 0 ).setName( "One updated" ); + pigs.get( 1 ).setName( "Two updated" ); + return s.updateAll( pigs.toArray() ); + } ) ) + ) + .invoke( () -> assertSqlLogTracker( "update pig set name=.* where id=.*" ) ) + .call( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS_AFTER_UPDATE ) ) ) + ); + } + + @Test + public void testStageBatchingUpdate(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.insert( 10, PIGS ) ) + .thenAccept( v -> sqlTracker.clear() ) + .thenCompose( v -> getSessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p order by p.id", GuineaPig.class ) + .getResultList() + .thenApply( pigs -> { + pigs.get( 0 ).setName( "One updated" ); + pigs.get( 1 ).setName( "Two updated" ); + return s.update( 10, pigs.toArray() ); + } ) + ) ) + .thenAccept( v -> assertSqlLogTracker( "update pig set name=.* where id=.*" ) ) + .thenCompose( v -> assertExpectedResult( PIGS_AFTER_UPDATE ) ) + ); + } + + @Test + public void testStageBatchingUpdateMultiple(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.insert( 10, PIGS ) ) + .thenRun( sqlTracker::clear ) + .thenCompose( v -> getSessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p order by p.id", GuineaPig.class ) + .getResultList() + .thenApply( pigs -> { + pigs.get( 0 ).setName( "One updated" ); + pigs.get( 1 ).setName( "Two updated" ); + return s.updateMultiple( pigs ); + } ) + ) ) + .thenAccept( v -> assertSqlLogTracker( "update pig set name=.* where id=.*" ) ) + .thenCompose( v -> assertExpectedResult( PIGS_AFTER_UPDATE ) ) + ); + } + + @Test + public void testStageBatchingUpdateNoBatchSizeParameter(VertxTestContext context) { + test(context, getSessionFactory() + .withStatelessTransaction( s -> s.insert( 10, PIGS ) ) + .thenRun( sqlTracker::clear ) + .thenCompose( v -> getSessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p order by p.id", GuineaPig.class ) + .getResultList() + .thenApply( pigs -> { + pigs.get( 0 ).setName( "One updated" ); + pigs.get( 1 ).setName( "Two updated" ); + return s.update( pigs.get( 0 ), pigs.get( 1 ) ); + } ) + ) ) + .thenAccept( v -> assertSqlLogTracker( "update pig set name=.* where id=.*" ) ) + .thenCompose( v -> assertExpectedResult( PIGS_AFTER_UPDATE ) ) + ); + } + + private CompletionStage assertExpectedResult(Object[] expected) { + return getSessionFactory().withStatelessTransaction( s -> s + .createQuery( "from GuineaPig p order by id", Object.class ) + .getResultList() + .thenAccept( pigs -> assertThat( pigs ).containsExactly( expected ) ) ); + } + + private static void assertSqlLogTracker(String queryRegex) { + // We expect only one query for each batched operations + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); + // Parameters are different for different dbs, so the regex must keep that in consideration + assertThat( sqlTracker.getLoggedQueries() ).allMatch( s -> s.matches( queryRegex ) ); + } + + @Entity(name = "GuineaPig") + @Table(name = "pig") + public static class GuineaPig { + @Id + private Integer id; + private String name; + + public GuineaPig() { + } + + public GuineaPig(Integer id, String name) { + this.id = id; + this.name = name; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return id + ": " + name; + } + + @Override + public boolean equals(Object o) { + if ( this == o ) { + return true; + } + if ( o == null || getClass() != o.getClass() ) { + return false; + } + GuineaPig guineaPig = (GuineaPig) o; + return Objects.equals( name, guineaPig.name ); + } + + @Override + public int hashCode() { + return Objects.hash( name ); + } + } +} From 089a609840efabb915e3a556eb792091e6169909 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Fri, 7 Mar 2025 10:48:17 +0100 Subject: [PATCH 2/4] [#2139] Add batching upsert to the StatelessSession --- .../org/hibernate/reactive/mutiny/Mutiny.java | 36 +++++++++++++++++++ .../impl/MutinyStatelessSessionImpl.java | 15 ++++++++ .../session/ReactiveStatelessSession.java | 2 ++ .../impl/ReactiveStatelessSessionImpl.java | 10 ++++++ .../org/hibernate/reactive/stage/Stage.java | 36 +++++++++++++++++++ .../stage/impl/StageStatelessSessionImpl.java | 23 +++++++++--- 6 files changed, 118 insertions(+), 4 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java index a648a7bd7..59edbb875 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java @@ -1908,6 +1908,42 @@ default Uni get(Class entityClass, Object id, LockModeType lockModeTyp @Incubating Uni upsert(String entityName, Object entity); + /** + * Use a SQL {@code merge into} statement to perform + * an upsert on multiple rows using the size of the given array + * as batch size. + * + * @param entities the entities to upsert + * + * @see org.hibernate.StatelessSession#upsert(Object) + */ + @Incubating + Uni upsertAll(Object... entities); + + /** + * Use a SQL {@code merge into} statement to perform + * an upsert on multiple rows using the specified batch size. + * + * @param batchSize the batch size + * @param entities the list of entities to upsert + * + * @see org.hibernate.StatelessSession#upsert(Object) + */ + @Incubating + Uni upsertAll(int batchSize, Object... entities); + + /** + * Use a SQL {@code merge into} statement to perform + * an upsert on multiple rows using the size of the given list + * as batch size. + * + * @param entities the entities to upsert + * + * @see org.hibernate.StatelessSession#upsert(Object) + */ + @Incubating + Uni upsertMultiple(List entities); + /** * Refresh the entity instance state from the database. * diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java index a80d79750..d6e8e984e 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java @@ -206,6 +206,21 @@ public Uni upsert(String entityName, Object entity) { return uni( () -> delegate.reactiveUpsert( entityName, entity ) ); } + @Override + public Uni upsertAll(Object... entities) { + return uni( () -> delegate.reactiveUpsertAll( entities.length, entities ) ); + } + + @Override + public Uni upsertAll(int batchSize, Object... entities) { + return uni( () -> delegate.reactiveUpsertAll( batchSize, entities ) ); + } + + @Override + public Uni upsertMultiple(List entities) { + return uni( () -> delegate.reactiveUpsertAll( entities.size(), entities.toArray() ) ); + } + @Override public Uni refreshAll(Object... entities) { return uni( () -> delegate.reactiveRefreshAll( entities.length, entities ) ); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveStatelessSession.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveStatelessSession.java index 33e34917b..e6d2aa1ee 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveStatelessSession.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveStatelessSession.java @@ -48,6 +48,8 @@ public interface ReactiveStatelessSession extends ReactiveQueryProducer, Reactiv CompletionStage reactiveUpsert(String entityName, Object entity); + CompletionStage reactiveUpsertAll(int batchSize, Object... entities); + CompletionStage reactiveRefresh(Object entity); CompletionStage reactiveRefresh(String entityName, Object entity); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java index 53b928903..0c018d789 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java @@ -542,6 +542,16 @@ public CompletionStage reactiveUpsert(String entityName, Object entity) { .mergeReactive( id, state, null, false, null, oldVersion, entity, null, this ); } + @Override + public CompletionStage reactiveUpsertAll(int batchSize, Object... entities) { + final Integer jdbcBatchSize = batchingHelperSession.getJdbcBatchSize(); + batchingHelperSession.setJdbcBatchSize( batchSize ); + final ReactiveConnection connection = batchingConnection( batchSize ); + return loop( entities, batchingHelperSession::reactiveUpsert ) + .thenCompose( v -> connection.executeBatch() ) + .whenComplete( (v, throwable) -> batchingHelperSession.setJdbcBatchSize( jdbcBatchSize ) ); + } + @Override public CompletionStage reactiveInsertAll(Object... entities) { return loop( entities, batchingHelperSession::reactiveInsert ) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java index 074f63e58..b490f2fb3 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java @@ -2021,6 +2021,42 @@ default CompletionStage refresh(Object entity, LockModeType lockModeType) */ CompletionStage upsert(String entityName, Object entity); + /** + * Use a SQL {@code merge into} statement to perform + * an upsert on multiple rows using the size of the given array + * as batch size. + * + * @param entities the entities to upsert + * + * @see org.hibernate.StatelessSession#upsert(Object) + */ + @Incubating + CompletionStage upsertAll(Object... entities); + + /** + * Use a SQL {@code merge into} statement to perform + * an upsert on multiple rows using the specified batch size. + * + * @param batchSize the batch size + * @param entities the list of entities to upsert + * + * @see org.hibernate.StatelessSession#upsert(Object) + */ + @Incubating + CompletionStage upsertAll(int batchSize, Object... entities); + + /** + * Use a SQL {@code merge into} statement to perform + * an upsert on multiple rows using the size of the given list + * as batch size. + * + * @param entities the entities to upsert + * + * @see org.hibernate.StatelessSession#upsert(Object) + */ + @Incubating + CompletionStage upsertMultiple(List entities); + /** * Asynchronously fetch an association that's configured for lazy loading. * diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java index c97be1a94..aadd76285 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java @@ -5,10 +5,6 @@ */ package org.hibernate.reactive.stage.impl; -import jakarta.persistence.EntityGraph; -import jakarta.persistence.criteria.CriteriaDelete; -import jakarta.persistence.criteria.CriteriaQuery; -import jakarta.persistence.criteria.CriteriaUpdate; import org.hibernate.LockMode; import org.hibernate.graph.spi.RootGraphImplementor; import org.hibernate.query.criteria.JpaCriteriaInsert; @@ -20,6 +16,10 @@ import org.hibernate.reactive.stage.Stage.Query; import org.hibernate.reactive.stage.Stage.SelectionQuery; +import jakarta.persistence.EntityGraph; +import jakarta.persistence.criteria.CriteriaDelete; +import jakarta.persistence.criteria.CriteriaQuery; +import jakarta.persistence.criteria.CriteriaUpdate; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -155,6 +155,21 @@ public CompletionStage upsert(String entityName, Object entity) { return delegate.reactiveUpsert( entityName, entity ); } + @Override + public CompletionStage upsertAll(Object... entities) { + return delegate.reactiveUpsertAll( entities.length, entities ); + } + + @Override + public CompletionStage upsertAll(int batchSize, Object... entities) { + return delegate.reactiveUpsertAll( batchSize, entities ); + } + + @Override + public CompletionStage upsertMultiple(List entities) { + return delegate.reactiveUpsertAll( entities.size(), entities.toArray() ); + } + @Override public CompletionStage fetch(T association) { return delegate.reactiveFetch( association, false ); From 2149d571d64dba2036176c4aff4f9c387018a882 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Fri, 7 Mar 2025 10:49:16 +0100 Subject: [PATCH 3/4] [#2139] Add test for upsert with batching --- .../ReactiveStatelessWithBatchTest.java | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessWithBatchTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessWithBatchTest.java index f934090b8..563fad40b 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessWithBatchTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/ReactiveStatelessWithBatchTest.java @@ -8,6 +8,7 @@ import org.hibernate.boot.registry.StandardServiceRegistryBuilder; import org.hibernate.cfg.Configuration; import org.hibernate.reactive.BaseReactiveTest; +import org.hibernate.reactive.annotations.EnabledFor; import org.hibernate.reactive.testing.SqlStatementTracker; import org.junit.jupiter.api.BeforeEach; @@ -26,6 +27,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static org.assertj.core.api.Assertions.assertThat; +import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.POSTGRESQL; /** * Test the stateless session actually execute the operations in batch. @@ -85,7 +87,7 @@ protected void addServices(StandardServiceRegistryBuilder builder) { } private static boolean filter(String s) { - String[] accepted = { "insert ", "update ", "delete " }; + String[] accepted = { "merge ", "insert ", "update ", "delete " }; for ( String valid : accepted ) { if ( s.toLowerCase().startsWith( valid ) ) { return true; @@ -94,6 +96,66 @@ private static boolean filter(String s) { return false; } + @Test + @EnabledFor(POSTGRESQL) + public void testMutinyMergeUpsertAll(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.upsertAll( PIGS ) ) + .invoke( () -> assertSqlLogTracker( "merge into pig as t using (.*)" ) ) + .chain( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS ) ) ) + ); + } + + @Test + @EnabledFor(POSTGRESQL) + public void testMutinyMergeUpsertAllWithBatchSize(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.upsertAll( 10, PIGS ) ) + .invoke( () -> assertSqlLogTracker( "merge into pig as t using (.*)" ) ) + .chain( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS ) ) ) + ); + } + + @Test + @EnabledFor(POSTGRESQL) + public void testMutinyMergeUpsertMultiple(VertxTestContext context) { + test( context, getMutinySessionFactory() + .withStatelessTransaction( s -> s.upsertMultiple( List.of( PIGS ) ) ) + .invoke( () -> assertSqlLogTracker( "merge into pig as t using (.*)" ) ) + .chain( () -> Uni.createFrom().completionStage( assertExpectedResult( PIGS ) ) ) + ); + } + + @Test + @EnabledFor(POSTGRESQL) + public void testStageMergeUpsertAll(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.upsertAll( PIGS ) ) + .thenRun( () -> assertSqlLogTracker( "merge into pig as t using (.*)" ) ) + .thenCompose( v -> assertExpectedResult( PIGS ) ) + ); + } + + @Test + @EnabledFor(POSTGRESQL) + public void testStageMergeUpsertAllWithBatchSize(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.upsertAll( 10, PIGS ) ) + .thenRun(() -> assertSqlLogTracker( "merge into pig as t using (.*)" ) ) + .thenCompose( v -> assertExpectedResult( PIGS ) ) + ); + } + + @Test + @EnabledFor(POSTGRESQL) + public void testStageMergeUpsertMultiple(VertxTestContext context) { + test( context, getSessionFactory() + .withStatelessTransaction( s -> s.upsertMultiple( List.of( PIGS ) ) ) + .thenRun( () -> assertSqlLogTracker( "merge into pig as t using (.*)" ) ) + .thenCompose( v -> assertExpectedResult( PIGS ) ) + ); + } + @Test public void testMutinyBatchingInsert(VertxTestContext context) { test( context, getMutinySessionFactory() From 28e9d602ffadc39b5b052b249d3d60776cfe2320 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Fri, 7 Mar 2025 11:04:34 +0100 Subject: [PATCH 4/4] [#2139] Explain why we set the default batch size to 0 --- .../session/impl/ReactiveStatelessSessionImpl.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java index 0c018d789..4941dda50 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java @@ -150,8 +150,11 @@ private ReactiveStatelessSessionImpl( PersistenceContext persistenceContext) { super( factory, options ); this.persistenceContext = persistenceContext; - // Setting batch size to 0 because `StatelessSession` does not consider - // the value of `hibernate.jdbc.batch_size` + // StatelessSession should not allow JDBC batching, because that would change + // its "immediate synchronous execution" model into something more like transactional + // write-behind and be confusing. For this reason, the default batch size is always set to 0. + // When a user calls the CRUD operations for batching, we set the batch size to the same number of + // objects to process, therefore, there is no write-behind behavior. reactiveConnection = new BatchingConnection( connection, 0 ); batchingHelperSession = this; influencers = new LoadQueryInfluencers( factory );