Skip to content

Implement SelectionQuery#getResultsCount #1937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ interface SelectionQuery<R> extends AbstractQuery {
*/
Uni<R> getSingleResultOrNull();

/**
* Determine the size of the query result list that would be
* returned by calling {@link #getResultList()} with no
* {@linkplain #getFirstResult() offset} or
* {@linkplain #getMaxResults() limit} applied to the query.
*
* @return the size of the list that would be returned
*/
@Incubating
Uni<Long> getResultCount();

/**
* Asynchronously execute this query, returning the query results
* as a {@link List}, via a {@link Uni}. If the query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public int getMaxResults() {
return delegate.getMaxResults();
}

@Override
public Uni<Long> getResultCount() {
return uni( delegate::getReactiveResultCount );
}

@Override
public Uni<List<R>> getResultList() {
return uni( delegate::getReactiveResultList );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public int getMaxResults() {
return delegate.getMaxResults();
}

@Override
public Uni<Long> getResultCount() {
return uni( delegate::getReactiveResultCount );
}

@Override
public Uni<List<R>> getResultList() {
return uni( delegate::getReactiveResultList );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ default CompletionStage<List<R>> getReactiveResultList() {

CompletionStage<R> getReactiveSingleResultOrNull();

CompletionStage<Long> getReactiveResultCount();

CompletionStage<R> reactiveUnique();

CompletionStage<Optional<R>> reactiveUniqueResultOptional();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.query.IllegalQueryOperationException;
import org.hibernate.query.hql.internal.QuerySplitter;
import org.hibernate.query.internal.DelegatingDomainQueryExecutionContext;
import org.hibernate.query.spi.DomainQueryExecutionContext;
import org.hibernate.query.spi.QueryInterpretationCache;
import org.hibernate.query.spi.QueryOptions;
import org.hibernate.query.sqm.internal.DomainParameterXref;
Expand All @@ -33,6 +35,7 @@
import org.hibernate.reactive.query.sqm.internal.AggregatedSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.internal.ConcreteSqmSelectReactiveQueryPlan;
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.reactive.sql.results.spi.ReactiveSingleResultConsumer;
import org.hibernate.sql.results.internal.TupleMetadata;

import jakarta.persistence.NoResultException;
Expand Down Expand Up @@ -146,6 +149,17 @@ public CompletionStage<R> getReactiveSingleResult() {
.exceptionally( this::convertException );
}

public CompletionStage<Long> getReactiveResultsCount(SqmSelectStatement<?> sqmStatement, DomainQueryExecutionContext domainQueryExecutionContext) {
final DelegatingDomainQueryExecutionContext context = new DelegatingDomainQueryExecutionContext( domainQueryExecutionContext ) {
@Override
public QueryOptions getQueryOptions() {
return QueryOptions.NONE;
}
};
return buildConcreteSelectQueryPlan( sqmStatement.createCountQuery(), Long.class, getQueryOptions() )
.reactiveExecuteQuery( context, new ReactiveSingleResultConsumer<>() );
}

private R reactiveSingleResult(List<R> list) {
if ( list.isEmpty() ) {
throw new NoResultException( String.format( "No result found for query [%s]", getQueryString() ) );
Expand Down Expand Up @@ -269,7 +283,7 @@ private ReactiveSelectQueryPlan<R> buildAggregatedSelectQueryPlan(SqmSelectState
return new AggregatedSelectReactiveQueryPlan<>( aggregatedQueryPlans );
}

private <T> ReactiveSelectQueryPlan<T> buildConcreteSelectQueryPlan(
public <T> ReactiveSelectQueryPlan<T> buildConcreteSelectQueryPlan(
SqmSelectStatement<?> concreteSqmStatement,
Class<T> resultType,
QueryOptions queryOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ private <T> T getNull() {
return null;
}

@Override
public long getResultCount() {
throw LOG.nonReactiveMethodCall( "getReactiveResultCount()" );
}

@Override
public CompletionStage<Long> getReactiveResultCount() {
throw LOG.notYetImplemented();
}

private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSessionContractImplementor session) {
return new ReactiveAbstractSelectionQuery<>(
this::getQueryOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor;
import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer;
import org.hibernate.reactive.sql.results.spi.ReactiveResultsConsumer;
import org.hibernate.sql.ast.SqlAstTranslator;
import org.hibernate.sql.ast.SqlAstTranslatorFactory;
import org.hibernate.sql.ast.spi.FromClauseAccess;
Expand All @@ -59,6 +60,7 @@
public class ConcreteSqmSelectReactiveQueryPlan<R> extends ConcreteSqmSelectQueryPlan<R>
implements ReactiveSelectQueryPlan<R> {

private final SqmInterpreter<Object, ReactiveResultsConsumer<Object, R>> executeQueryInterpreter;
private final SqmInterpreter<List<R>, Void> listInterpreter;
private final RowTransformer<R> rowTransformer;

Expand All @@ -80,6 +82,8 @@ public ConcreteSqmSelectReactiveQueryPlan(
this.rowTransformer = determineRowTransformer( sqm, resultType, tupleMetadata, queryOptions );
this.listInterpreter = (unused, executionContext, sqmInterpretation, jdbcParameterBindings) ->
listInterpreter( hql, domainParameterXref, executionContext, sqmInterpretation, jdbcParameterBindings, rowTransformer );
this.executeQueryInterpreter = (resultsConsumer, executionContext, sqmInterpretation, jdbcParameterBindings) ->
executeQueryInterpreter( hql, domainParameterXref, executionContext, sqmInterpretation, jdbcParameterBindings, rowTransformer, resultsConsumer );
}

private static <R> CompletionStage<List<R>> listInterpreter(
Expand Down Expand Up @@ -110,6 +114,40 @@ private static <R> CompletionStage<List<R>> listInterpreter(
.whenComplete( (rs, t) -> domainParameterXref.clearExpansions() );
}

private static <R> CompletionStage<Object> executeQueryInterpreter(
String hql,
DomainParameterXref domainParameterXref,
DomainQueryExecutionContext executionContext,
CacheableSqmInterpretation sqmInterpretation,
JdbcParameterBindings jdbcParameterBindings,
RowTransformer<R> rowTransformer,
ReactiveResultsConsumer<Object, R> resultsConsumer) {
final ReactiveSharedSessionContractImplementor session = (ReactiveSharedSessionContractImplementor) executionContext.getSession();
final JdbcOperationQuerySelect jdbcSelect = sqmInterpretation.getJdbcSelect();
// I'm using a supplier so that the whenComplete at the end will catch any errors, like a finally-block
Supplier<SubselectFetch.RegistrationHandler> fetchHandlerSupplier = () -> SubselectFetch
.createRegistrationHandler( session.getPersistenceContext().getBatchFetchQueue(), sqmInterpretation.selectStatement, JdbcParametersList.empty(), jdbcParameterBindings );
return completedFuture( fetchHandlerSupplier )
.thenApply( Supplier::get )
.thenCompose( subSelectFetchKeyHandler -> session
.reactiveAutoFlushIfRequired( jdbcSelect.getAffectedTableNames() )
.thenCompose( required -> StandardReactiveSelectExecutor.INSTANCE
.executeQuery( jdbcSelect,
jdbcParameterBindings,
ConcreteSqmSelectQueryPlan.listInterpreterExecutionContext( hql, executionContext, jdbcSelect, subSelectFetchKeyHandler ),
rowTransformer,
null,
sql -> executionContext.getSession()
.getJdbcCoordinator()
.getStatementPreparer()
.prepareQueryStatement( sql, false, null ),
resultsConsumer
)
)
)
.whenComplete( (rs, t) -> domainParameterXref.clearExpansions() );
}

@Override
public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, DomainQueryExecutionContext executionContext) {
throw new UnsupportedOperationException();
Expand All @@ -119,10 +157,21 @@ public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, Doma
public CompletionStage<List<R>> reactivePerformList(DomainQueryExecutionContext executionContext) {
return executionContext.getQueryOptions().getEffectiveLimit().getMaxRowsJpa() == 0
? completedFuture( emptyList() )
: withCacheableSqmInterpretation( executionContext, listInterpreter );
: withCacheableSqmInterpretation( executionContext, null, listInterpreter );
}

@Override
public <T> CompletionStage<T> reactiveExecuteQuery(
DomainQueryExecutionContext executionContext,
ReactiveResultsConsumer<T, R> resultsConsumer) {
return withCacheableSqmInterpretation(
executionContext,
resultsConsumer,
(SqmInterpreter<T, ReactiveResultsConsumer<T, R>>) (SqmInterpreter) executeQueryInterpreter
);
}

private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext, SqmInterpreter<T, X> interpreter) {
private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExecutionContext executionContext, X context, SqmInterpreter<T, X> interpreter) {
// NOTE : VERY IMPORTANT - intentional double-lock checking
// The other option would be to leverage `java.util.concurrent.locks.ReadWriteLock`
// to protect access. However, synchronized is much simpler here. We will verify
Expand Down Expand Up @@ -162,7 +211,7 @@ private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExec
jdbcParameterBindings = createJdbcParameterBindings( localCopy, executionContext );
}

return interpreter.interpret( null, executionContext, localCopy, jdbcParameterBindings );
return interpreter.interpret( context, executionContext, localCopy, jdbcParameterBindings );
}

private JdbcParameterBindings createJdbcParameterBindings(CacheableSqmInterpretation sqmInterpretation, DomainQueryExecutionContext executionContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ public CompletionStage<R> getReactiveSingleResult() {
return selectionQueryDelegate.getReactiveSingleResult();
}

@Override
public long getResultCount() {
throw LOG.nonReactiveMethodCall( "getReactiveResultCount()" );
}

@Override
public CompletionStage<Long> getReactiveResultCount() {
return selectionQueryDelegate
.getReactiveResultsCount( ( (SqmSelectStatement<?>) getSqmStatement() ).createCountQuery(), this );
}

@Override
public CompletionStage<R> getReactiveSingleResultOrNull() {
return selectionQueryDelegate.getReactiveSingleResultOrNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ public R getSingleResultOrNull() {
return selectionQueryDelegate.getSingleResultOrNull();
}

@Override
public CompletionStage<Long> getReactiveResultCount() {
return selectionQueryDelegate
.getReactiveResultsCount( getSqmStatement().createCountQuery(), this );
}

@Override
public List<R> getResultList() {
return selectionQueryDelegate.getResultList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.hibernate.query.spi.ScrollableResultsImplementor;
import org.hibernate.query.spi.SelectQueryPlan;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.sql.results.spi.ReactiveResultsConsumer;
import org.hibernate.sql.results.spi.ResultsConsumer;

import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
Expand Down Expand Up @@ -44,7 +45,7 @@ default <T> T executeQuery(DomainQueryExecutionContext executionContext, Results
/**
* Execute the query
*/
default <T> CompletionStage<T> reactiveExecuteQuery(DomainQueryExecutionContext executionContext, ResultsConsumer<T, R> resultsConsumer) {
default <T> CompletionStage<T> reactiveExecuteQuery(DomainQueryExecutionContext executionContext, ReactiveResultsConsumer<T, R> resultsConsumer) {
return failedFuture( new UnsupportedOperationException() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public void finishUpRow(final RowProcessingState rowProcessingState) {
}
}

public void startLoading(final RowProcessingState rowProcessingState) {
for ( int i = initializers.length - 1; i >= 0; i-- ) {
initializers[i].startLoading( rowProcessingState );
}
}

public CompletionStage<Void> initializeInstance(final ReactiveRowProcessingState rowProcessingState) {
return loop( initializers, initializer -> {
if ( initializer instanceof ReactiveInitializer ) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.sql.results.spi;

import java.util.concurrent.CompletionStage;

import org.hibernate.Incubating;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState;
import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet;
import org.hibernate.sql.results.jdbc.internal.JdbcValuesSourceProcessingStateStandardImpl;
import org.hibernate.sql.results.jdbc.spi.JdbcValuesSourceProcessingOptions;

@Incubating
public class ReactiveSingleResultConsumer<T> implements ReactiveResultsConsumer<T, T> {

@Override
public CompletionStage<T> consume(
ReactiveValuesResultSet jdbcValues,
SharedSessionContractImplementor session,
JdbcValuesSourceProcessingOptions processingOptions,
JdbcValuesSourceProcessingStateStandardImpl jdbcValuesSourceProcessingState,
ReactiveRowProcessingState rowProcessingState,
ReactiveRowReader<T> rowReader) {
rowReader.getReactiveInitializersList().startLoading( rowProcessingState );
return rowProcessingState.next()
.thenCompose( hasNext -> rowReader
.reactiveReadRow( rowProcessingState, processingOptions )
.thenApply( result -> {
rowProcessingState.finishRowProcessing( true );
rowReader.finishUp( jdbcValuesSourceProcessingState );
jdbcValuesSourceProcessingState.finishUp( false );
return result;
} )
);
}

@Override
public boolean canResultsBeCached() {
return false;
}

}
Loading
Loading