Skip to content

Commit 1433887

Browse files
committed
Introduce new multi-threaded stress test for the Id generation based on sequences
1 parent 6c6d7da commit 1433887

File tree

1 file changed

+304
-0
lines changed

1 file changed

+304
-0
lines changed
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive;
7+
8+
import io.vertx.core.AbstractVerticle;
9+
import io.vertx.core.DeploymentOptions;
10+
import io.vertx.core.Promise;
11+
import io.vertx.core.Vertx;
12+
import io.vertx.core.VertxOptions;
13+
import io.vertx.ext.unit.Async;
14+
import io.vertx.ext.unit.TestContext;
15+
import io.vertx.ext.unit.junit.VertxUnitRunner;
16+
import jakarta.persistence.Entity;
17+
import jakarta.persistence.GeneratedValue;
18+
import jakarta.persistence.Id;
19+
import jakarta.persistence.Table;
20+
21+
import org.hibernate.SessionFactory;
22+
import org.hibernate.boot.registry.StandardServiceRegistry;
23+
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
24+
import org.hibernate.cfg.Configuration;
25+
import org.hibernate.reactive.id.impl.ReactiveGeneratorWrapper;
26+
import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder;
27+
import org.hibernate.reactive.provider.Settings;
28+
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
29+
import org.hibernate.reactive.session.impl.ReactiveSessionFactoryImpl;
30+
import org.hibernate.reactive.stage.Stage;
31+
import org.hibernate.reactive.stage.impl.StageSessionImpl;
32+
import org.hibernate.reactive.testing.DatabaseSelectionRule;
33+
import org.hibernate.reactive.util.impl.CompletionStages;
34+
import org.hibernate.reactive.vertx.VertxInstance;
35+
36+
import org.junit.AfterClass;
37+
import org.junit.BeforeClass;
38+
import org.junit.Rule;
39+
import org.junit.Test;
40+
import org.junit.runner.RunWith;
41+
42+
import java.util.ArrayList;
43+
import java.util.BitSet;
44+
import java.util.List;
45+
import java.util.concurrent.CompletionStage;
46+
import java.util.concurrent.ConcurrentHashMap;
47+
import java.util.concurrent.ConcurrentMap;
48+
import java.util.concurrent.CountDownLatch;
49+
import java.util.concurrent.TimeUnit;
50+
51+
import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.SQLSERVER;
52+
53+
/**
54+
* This is a multi-threaded stress test, intentionally consuming some time.
55+
* The purpose is to verify that the sequence optimizer used by Hibernate Reactive
56+
* is indeed able to generate unique IDs backed by the database sequences, while
57+
* running multiple operations in different threads and on multiple Vert.x eventloops.
58+
* A typical reactive application will not require multiple threads, but we
59+
* specifically want to test for the case in which the single ID source is being
60+
* shared across multiple threads and eventloops.
61+
*/
62+
@RunWith(VertxUnitRunner.class)
63+
public class MultithreadedIdentityGenerationTest {
64+
65+
/* The number of threads should be higher than the default size of the connection pool so that
66+
* this test is also effective in detecting problems with resource starvation.
67+
*/
68+
private static final int N_THREADS = 48;
69+
private static final int IDS_GENERATED_PER_THREAD = 10000;
70+
71+
//Should finish much sooner, but generating this amount of IDs could be slow on some CIs
72+
private static final int TIMEOUT_MINUTES = 10;
73+
74+
private static final boolean LOG_SQL = false;
75+
private static final Latch startLatch = new Latch( "start", N_THREADS );
76+
private static final Latch endLatch = new Latch( "end", N_THREADS );
77+
78+
private static Stage.SessionFactory stageSessionFactory;
79+
private static Vertx vertx;
80+
private static SessionFactory sessionFactory;
81+
82+
@Rule // Currently failing for unrelated reasons on SQL Server https://github.com/hibernate/hibernate-reactive/issues/1609
83+
public DatabaseSelectionRule dbRule = DatabaseSelectionRule.skipTestsFor( SQLSERVER );
84+
85+
@BeforeClass
86+
public static void setupSessionFactory() {
87+
final VertxOptions vertxOptions = new VertxOptions();
88+
vertxOptions.setEventLoopPoolSize( N_THREADS );
89+
//We relax the blocked thread checks as we'll actually use latches to block them
90+
//intentionally for the purpose of the test; functionally this isn't required
91+
//but it's useful as self-test in the design of this, to ensure that the way
92+
//things are setup are indeed being run in multiple, separate threads.
93+
vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES );
94+
vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES );
95+
vertx = Vertx.vertx( vertxOptions );
96+
Configuration configuration = new Configuration();
97+
configuration.addAnnotatedClass( EntityWithGeneratedId.class );
98+
BaseReactiveTest.setDefaultProperties( configuration );
99+
configuration.setProperty( Settings.SHOW_SQL, String.valueOf( LOG_SQL ) );
100+
StandardServiceRegistryBuilder builder = new ReactiveServiceRegistryBuilder()
101+
.applySettings( configuration.getProperties() )
102+
//Inject our custom vert.x instance:
103+
.addService( VertxInstance.class, () -> vertx );
104+
StandardServiceRegistry registry = builder.build();
105+
sessionFactory = configuration.buildSessionFactory( registry );
106+
stageSessionFactory = sessionFactory.unwrap( Stage.SessionFactory.class );
107+
}
108+
109+
@AfterClass
110+
public static void closeSessionFactory() {
111+
stageSessionFactory.close();
112+
}
113+
114+
private ReactiveGeneratorWrapper getIdGenerator() {
115+
final ReactiveSessionFactoryImpl hibernateSessionFactory = (ReactiveSessionFactoryImpl) sessionFactory;
116+
final ReactiveGeneratorWrapper identifierGenerator = (ReactiveGeneratorWrapper) hibernateSessionFactory.getIdentifierGenerator(
117+
"org.hibernate.reactive.MultithreadedIdentityGenerationTest$EntityWithGeneratedId" );
118+
return identifierGenerator;
119+
}
120+
121+
@Test(timeout = ( 1000 * 60 * 10 ))//10 minutes timeout
122+
public void testIdentityGenerator(TestContext context) {
123+
final Async async = context.async();
124+
final ReactiveGeneratorWrapper idGenerator = getIdGenerator();
125+
context.assertNotNull( idGenerator );
126+
127+
final DeploymentOptions deploymentOptions = new DeploymentOptions();
128+
deploymentOptions.setInstances( N_THREADS );
129+
130+
ResultsCollector allResults = new ResultsCollector();
131+
132+
vertx
133+
.deployVerticle( () -> new IdGenVerticle( idGenerator, allResults ), deploymentOptions )
134+
.onSuccess( res -> {
135+
endLatch.waitForEveryone();
136+
if ( allResultsAreUnique( allResults ) ) {
137+
async.complete();
138+
}
139+
else {
140+
context.fail( "Non unique numbers detected" );
141+
}
142+
} )
143+
.onFailure( context::fail )
144+
.eventually( unused -> vertx.close() );
145+
}
146+
147+
private boolean allResultsAreUnique(ResultsCollector allResults) {
148+
//Add 50 per thread to the total amount of generated ids to allow for gaps
149+
//in the hi/lo partitioning (not likely to be necessary)
150+
final int expectedSize = N_THREADS * ( IDS_GENERATED_PER_THREAD + 50 );
151+
BitSet resultsSeen = new BitSet( expectedSize );
152+
boolean failed = false;
153+
for ( List<Long> partialResult : allResults.resultsByThread.values() ) {
154+
for ( Long aLong : partialResult ) {
155+
final int intValue = aLong.intValue();
156+
final boolean existing = resultsSeen.get( intValue );
157+
if ( existing ) {
158+
System.out.println( "Duplicate ID detected: " + intValue );
159+
failed = true;
160+
}
161+
resultsSeen.set( intValue );
162+
}
163+
}
164+
return !failed;
165+
}
166+
167+
private static class IdGenVerticle extends AbstractVerticle {
168+
169+
private final ReactiveGeneratorWrapper idGenerator;
170+
private final ResultsCollector allResults;
171+
private final ArrayList<Long> generatedIds = new ArrayList<>( IDS_GENERATED_PER_THREAD );
172+
173+
public IdGenVerticle(ReactiveGeneratorWrapper idGenerator, ResultsCollector allResults) {
174+
this.idGenerator = idGenerator;
175+
this.allResults = allResults;
176+
}
177+
178+
@Override
179+
public void start(Promise<Void> startPromise) {
180+
try {
181+
startLatch.reached();
182+
startLatch.waitForEveryone();//Not essential, but to ensure a good level of parallelism
183+
final String initialThreadName = Thread.currentThread().getName();
184+
stageSessionFactory.withSession(
185+
s -> generateMultipleIds( idGenerator, s, generatedIds )
186+
)
187+
.whenComplete( (o, throwable) -> {
188+
endLatch.reached();
189+
if ( throwable != null ) {
190+
startPromise.fail( throwable );
191+
}
192+
else {
193+
if ( !initialThreadName.equals( Thread.currentThread().getName() ) ) {
194+
startPromise.fail( "Thread switch detected!" );
195+
}
196+
else {
197+
allResults.deliverResulst( generatedIds );
198+
startPromise.complete();
199+
}
200+
}
201+
} );
202+
}
203+
catch (RuntimeException e) {
204+
startPromise.fail( e );
205+
}
206+
}
207+
208+
@Override
209+
public void stop() {
210+
prettyOut( "Verticle stopped " + super.toString() );
211+
}
212+
}
213+
214+
private static class ResultsCollector {
215+
216+
private final ConcurrentMap<String,List<Long>> resultsByThread = new ConcurrentHashMap<>();
217+
218+
public void deliverResulst(List<Long> generatedIds) {
219+
final String threadName = Thread.currentThread().getName();
220+
resultsByThread.put( threadName, generatedIds );
221+
}
222+
}
223+
224+
private static CompletionStage<Void> generateMultipleIds(
225+
ReactiveGeneratorWrapper idGenerator,
226+
Stage.Session s,
227+
ArrayList<Long> collector) {
228+
return CompletionStages.loop( 0, IDS_GENERATED_PER_THREAD, index -> generateIds( idGenerator, s, collector ) );
229+
}
230+
231+
private static CompletionStage<Void> generateIds(
232+
ReactiveGeneratorWrapper idGenerator,
233+
Stage.Session s,
234+
ArrayList<Long> collector) {
235+
final Thread beforeOperationThread = Thread.currentThread();
236+
return idGenerator.generate( ( (StageSessionImpl) s )
237+
.unwrap( ReactiveConnectionSupplier.class ), new EntityWithGeneratedId() )
238+
.thenAccept( o -> {
239+
if ( beforeOperationThread != Thread.currentThread() ) {
240+
throw new IllegalStateException( "Detected an unexpected switch of carrier threads!" );
241+
}
242+
collector.add( (Long) o );
243+
} );
244+
}
245+
246+
/**
247+
* Trivial entity using a Sequence for Id generation
248+
*/
249+
@Entity
250+
@Table(name="Entity")
251+
private static class EntityWithGeneratedId {
252+
@Id
253+
@GeneratedValue
254+
Long id;
255+
256+
String name;
257+
258+
public EntityWithGeneratedId() {
259+
}
260+
}
261+
262+
/**
263+
* Custom latch which is rather verbose about threads reaching the milestones, to help verifying the design
264+
*/
265+
private static final class Latch {
266+
private final String label;
267+
private final CountDownLatch countDownLatch;
268+
269+
public Latch(String label, int membersCount) {
270+
this.label = label;
271+
this.countDownLatch = new CountDownLatch( membersCount );
272+
}
273+
274+
public void reached() {
275+
final long count = countDownLatch.getCount();
276+
countDownLatch.countDown();
277+
prettyOut( "Reached latch '" + label + "', current countdown is " + ( count - 1 ) );
278+
}
279+
280+
public void waitForEveryone() {
281+
try {
282+
countDownLatch.await( TIMEOUT_MINUTES, TimeUnit.MINUTES );
283+
prettyOut( "Everyone has now breached '" + label + "'" );
284+
}
285+
catch ( InterruptedException e ) {
286+
e.printStackTrace();
287+
}
288+
}
289+
}
290+
291+
private static void prettyOut(final String message) {
292+
final String threadName = Thread.currentThread().getName();
293+
final long l = System.currentTimeMillis();
294+
final long seconds = ( l / 1000 ) - initialSecond;
295+
//We prefix log messages by seconds since bootstrap; I'm preferring this over millisecond precision
296+
//as it's not very relevant to see exactly how long each stage took (it's actually distracting)
297+
//but it's more useful to group things coarsely when some lock or timeout introduces a significant
298+
//divide between some operations (when a starvation or timeout happens it takes some seconds).
299+
System.out.println( seconds + " - " + threadName + ": " + message );
300+
}
301+
302+
private static final long initialSecond = ( System.currentTimeMillis() / 1000 );
303+
304+
}

0 commit comments

Comments
 (0)