diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java index ef2271d3..8d2deae6 100644 --- a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java @@ -2509,14 +2509,26 @@ public void scanUpdateFails() { } @Test - public void scanNotTruncated() { + public void scanNotTruncatedOldSpliterator() { int maxPageSizeBiggerThatReal = 11_000; db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach( i -> db.projects().save(new Project(new Project.Id("id_" + i), "name")) )); - List result = db.scan().withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll()); + List result = db.scan().useNewSpliterator(false).withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll()); + assertEquals(maxPageSizeBiggerThatReal, result.size()); + } + + @Test + public void scanNotTruncatedNewSpliterator() { + int maxPageSizeBiggerThatReal = 11_000; + + db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach( + i -> db.projects().save(new Project(new Project.Id("id_" + i), "name")) + )); + + List result = db.scan().useNewSpliterator(true).withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll()); assertEquals(maxPageSizeBiggerThatReal, result.size()); } diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java index dc7d79a1..014edc9a 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java @@ -298,8 +298,8 @@ public List execute(Statement statement result = doCall(statement.toDebugString(params), () -> { if (options.isScan()) { return options.getScanOptions().isUseNewSpliterator() - ? doExecuteScanQueryList(statement, params) - : doExecuteScanQueryLegacy(statement, params); + ? listScanQueryNew(statement, params) + : listScanQueryLegacy(statement, params); } else { return doExecuteDataQuery(statement, params); } @@ -371,7 +371,7 @@ private List doExecuteDataQuery(Statement List doExecuteScanQueryLegacy(Statement statement, PARAMS params) { + private List listScanQueryLegacy(Statement statement, PARAMS params) { ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder() .withRequestTimeout(options.getScanOptions().getTimeout()) .setMode(ExecuteScanQuerySettings.Mode.EXEC) @@ -396,9 +396,9 @@ private List doExecuteScanQueryLegacy(Statement List doExecuteScanQueryList(Statement statement, PARAMS params) { + private List listScanQueryNew(Statement statement, PARAMS params) { List result = new ArrayList<>(); - try (Stream stream = executeScanQuery(statement, params)) { + try (Stream stream = streamScanQueryNew(statement, params)) { stream.forEach(r -> { if (result.size() >= options.getScanOptions().getMaxSize()) { throw new ResultTruncatedException( @@ -418,6 +418,12 @@ public Stream executeScanQuery(Statement Stream streamScanQueryNew(Statement statement, PARAMS params) { ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder() .withRequestTimeout(options.getScanOptions().getTimeout()) .setMode(ExecuteScanQuerySettings.Mode.EXEC) @@ -437,6 +443,36 @@ public Stream executeScanQuery(Statement Stream streamScanQueryLegacy(Statement statement, PARAMS params) { + ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder() + .withRequestTimeout(options.getScanOptions().getTimeout()) + .setMode(ExecuteScanQuerySettings.Mode.EXEC) + .build(); + + String yql = getYql(statement); + Params sdkParams = getSdkParams(statement, params); + + try { + YdbLegacySpliterator spliterator = new YdbLegacySpliterator<>(false, action -> + doCall(statement.toDebugString(params), () -> { + Status status = YdbOperations.safeJoin( + session.executeScanQuery( + yql, sdkParams, settings, + rs -> new ResultSetConverter(rs).stream(statement::readResult).forEach(action) + ), + options.getScanOptions().getTimeout().plusMinutes(5) + ); + validate("SCAN_QUERY: " + yql, status.getCode(), status.toString()); + }) + ); + return spliterator.makeStream(); + } catch (RepositoryException e) { + throw e; + } catch (Exception e) { + throw new UnexpectedException("Could not perform scan query", e); + } + } + private QueryStatsCollectionMode getSdkStatsMode() { var queryStats = options.getQueryStats(); return queryStats == null diff --git a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java index 817a1540..08918484 100644 --- a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java +++ b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java @@ -13,7 +13,6 @@ import lombok.AllArgsConstructor; import lombok.Setter; import lombok.SneakyThrows; -import lombok.Value; import lombok.experimental.Delegate; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -74,6 +73,7 @@ import tech.ydb.yoj.repository.ydb.sample.model.HintInt64Range; import tech.ydb.yoj.repository.ydb.sample.model.HintTablePreset; import tech.ydb.yoj.repository.ydb.sample.model.HintUniform; +import tech.ydb.yoj.repository.ydb.statement.FindAllYqlStatement; import tech.ydb.yoj.repository.ydb.statement.FindStatement; import tech.ydb.yoj.repository.ydb.statement.YqlStatement; import tech.ydb.yoj.repository.ydb.table.YdbTable; @@ -195,6 +195,18 @@ public void throwConversionExceptionOnSerializationProblem() { @Test public void readYqlListAndMap() { + record GroupByResult( + String id, + List items, + Map map, + + @Column(flatten = false) + GroupByResult.Struct struct + ) { + record Struct(String name) { + } + } + WithUnflattenableField entity = new WithUnflattenableField( new WithUnflattenableField.Id("id_yql_list"), new WithUnflattenableField.Unflattenable("Hello, world!", 100_500) @@ -203,7 +215,7 @@ public void readYqlListAndMap() { db.tx(() -> { EntitySchema schema = EntitySchema.of(WithUnflattenableField.class); var tableDescriptor = TableDescriptor.from(schema); - List result = ((YdbRepositoryTransaction) Tx.Current.get().getRepositoryTransaction()) + List result = ydbRepositoryTransaction() .execute(new YqlStatement<>(tableDescriptor, schema, ObjectSchema.of(GroupByResult.class)) { @Override public String getQuery(String tablespace) { @@ -230,20 +242,6 @@ public QueryType getQueryType() { }); } - @Value - static class GroupByResult { - String id; - List items; - Map map; - @Column(flatten = false) - Struct struct; - - @Value - static class Struct { - String name; - } - } - @Test public void readViewFromCache() { TypeFreak tf1 = newTypeFreak(0, "AAA1", "bbb"); @@ -893,7 +891,7 @@ public void creatingRepositoryDoesNotConnect() { public void ydbTransactionCompatibility() { db.tx(() -> { // No db tx or session yet! - var sdkTx = ((YdbRepositoryTransaction) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction(); + var sdkTx = ydbRepositoryTransaction().toSdkTransaction(); assertThatIllegalStateException().isThrownBy(sdkTx::getSessionId); assertThat(sdkTx.getId()).isNull(); assertThat(sdkTx.getTxMode()).isEqualTo(TxMode.SERIALIZABLE_RW); @@ -901,7 +899,7 @@ public void ydbTransactionCompatibility() { // Perform any read - session and tx ID appear db.projects().countAll(); - sdkTx = ((YdbRepositoryTransaction) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction(); + sdkTx = ydbRepositoryTransaction().toSdkTransaction(); assertThat(sdkTx.getSessionId()).isNotNull(); assertThat(sdkTx.getId()).isNotNull(); assertThat(sdkTx.getTxMode()).isEqualTo(TxMode.SERIALIZABLE_RW); @@ -919,7 +917,7 @@ public void ydbTransactionCompatibility() { db.readOnly().withStatementIsolationLevel(isolationLevel).run(() -> { // No db tx or session yet! - var sdkTx = ((YdbRepositoryTransaction) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction(); + var sdkTx = ydbRepositoryTransaction().toSdkTransaction(); assertThatIllegalStateException().isThrownBy(sdkTx::getSessionId); assertThat(sdkTx.getId()).isNull(); assertThat(sdkTx.getTxMode()).isEqualTo(txMode); @@ -927,7 +925,7 @@ public void ydbTransactionCompatibility() { // Perform any read - session and tx ID appear db.projects().countAll(); - sdkTx = ((YdbRepositoryTransaction) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction(); + sdkTx = ydbRepositoryTransaction().toSdkTransaction(); assertThat(sdkTx.getSessionId()).isNotNull(); // Read transactions might have no ID or might have an ID, depending on your YDB version (that's what YDB returns, folks!) assertThat(sdkTx.getTxMode()).isEqualTo(txMode); @@ -1019,6 +1017,54 @@ public void queryStatsCollectionMode() { assertThat(found).hasSize(4); } + @Test + public void streamingScanNotTruncatedOldSpliterator() { + int maxPageSizeBiggerThatReal = 11_000; + + db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach( + i -> db.projects().save(new Project(new Project.Id("id_" + i), "name")) + )); + + List result = db.scan().useNewSpliterator(false).withMaxSize(maxPageSizeBiggerThatReal).run(() -> { + var schema = EntitySchema.of(Project.class); + var desc = TableDescriptor.from(schema); + var statement = new FindAllYqlStatement<>(desc, schema, schema); + + var projectIds = new ArrayList(); + try (var stream = ydbRepositoryTransaction().executeScanQuery(statement, null)) { + stream.forEach(p -> projectIds.add(p.getId())); + } + return projectIds; + }); + assertEquals(maxPageSizeBiggerThatReal, result.size()); + } + + @Test + public void streamingScanNotTruncatedNewSpliterator() { + int maxPageSizeBiggerThatReal = 11_000; + + db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach( + i -> db.projects().save(new Project(new Project.Id("id_" + i), "name")) + )); + + List result = db.scan().useNewSpliterator(true).withMaxSize(maxPageSizeBiggerThatReal).run(() -> { + var schema = EntitySchema.of(Project.class); + var desc = TableDescriptor.from(schema); + var statement = new FindAllYqlStatement<>(desc, schema, schema); + + var projectIds = new ArrayList(); + try (var stream = ydbRepositoryTransaction().executeScanQuery(statement, null)) { + stream.forEach(p -> projectIds.add(p.getId())); + } + return projectIds; + }); + assertEquals(maxPageSizeBiggerThatReal, result.size()); + } + + private static YdbRepositoryTransaction ydbRepositoryTransaction() { + return (YdbRepositoryTransaction) Tx.Current.get().getRepositoryTransaction(); + } + @AllArgsConstructor private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.SchemeServiceImplBase { @Delegate diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/DelegatingTxManager.java b/repository/src/main/java/tech/ydb/yoj/repository/db/DelegatingTxManager.java index 2e93c088..21b18f67 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/DelegatingTxManager.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/DelegatingTxManager.java @@ -210,6 +210,11 @@ public ScanBuilder withTimeout(Duration timeout) { return new ScanBuilderImpl(delegate.withTimeout(timeout)); } + @Override + public ScanBuilder useNewSpliterator(boolean useNewSpliterator) { + return new ScanBuilderImpl(delegate.useNewSpliterator(useNewSpliterator)); + } + @Override public T run(Supplier supplier) throws RetryableException { return doRunTx(() -> this.delegate.run(wrapTxBody(supplier))); diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java index 1726656a..25ba29b6 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java @@ -372,6 +372,11 @@ public ScanBuilder withTimeout(Duration timeout) { return withOptions(options.withTimeout(timeout)); } + @Override + public ScanBuilder useNewSpliterator(boolean useNewSpliterator) { + return withOptions(options.withUseNewSpliterator(useNewSpliterator)); + } + @Override public T run(Supplier supplier) throws RetryableException { TxOptions txOptions = StdTxManager.this.options diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java b/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java index 96685634..fec05bf0 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java @@ -1,6 +1,7 @@ package tech.ydb.yoj.repository.db; import lombok.NonNull; +import tech.ydb.yoj.ExperimentalApi; import tech.ydb.yoj.repository.db.cache.TransactionLog; import tech.ydb.yoj.repository.db.exception.DeadlineExceededException; import tech.ydb.yoj.repository.db.exception.OptimisticLockException; @@ -199,6 +200,15 @@ interface ScanBuilder { ScanBuilder withTimeout(Duration timeout); + /** + * Specifies whether the new {@code Spliterator} implementation is used for streaming scan query results. + * The new implementation better conforms to the {@code Spliterator} contract and consumes less memory. + *

Note that using the new implementation currently has a negative performance impact, for more information refer to + * GitHub Issue #42. + */ + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42") + ScanBuilder useNewSpliterator(boolean useNewSpliterator); + T run(Supplier supplier); default void run(Runnable runnable) {