Skip to content

#150: Support choosing spliterator implementation for streaming scan queries #152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2509,14 +2509,26 @@ public void scanUpdateFails() {
}

@Test
public void scanNotTruncated() {
public void scanNotTruncatedOldSpliterator() {
int maxPageSizeBiggerThatReal = 11_000;

db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
));

List<Project> result = db.scan().withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll());
List<Project> result = db.scan().useNewSpliterator(false).withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll());
assertEquals(maxPageSizeBiggerThatReal, result.size());
}

@Test
public void scanNotTruncatedNewSpliterator() {
int maxPageSizeBiggerThatReal = 11_000;

db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
));

List<Project> result = db.scan().useNewSpliterator(true).withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll());
assertEquals(maxPageSizeBiggerThatReal, result.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement
result = doCall(statement.toDebugString(params), () -> {
if (options.isScan()) {
return options.getScanOptions().isUseNewSpliterator()
? doExecuteScanQueryList(statement, params)
: doExecuteScanQueryLegacy(statement, params);
? listScanQueryNew(statement, params)
: listScanQueryLegacy(statement, params);
} else {
return doExecuteDataQuery(statement, params);
}
Expand Down Expand Up @@ -371,7 +371,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
return new ResultSetConverter(resultSet).stream(statement::readResult).collect(toList());
}

private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
private <PARAMS, RESULT> List<RESULT> listScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(options.getScanOptions().getTimeout())
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
Expand All @@ -396,9 +396,9 @@ private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS,
return result;
}

private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryList(Statement<PARAMS, RESULT> statement, PARAMS params) {
private <PARAMS, RESULT> List<RESULT> listScanQueryNew(Statement<PARAMS, RESULT> statement, PARAMS params) {
List<RESULT> result = new ArrayList<>();
try (Stream<RESULT> stream = executeScanQuery(statement, params)) {
try (Stream<RESULT> stream = streamScanQueryNew(statement, params)) {
stream.forEach(r -> {
if (result.size() >= options.getScanOptions().getMaxSize()) {
throw new ResultTruncatedException(
Expand All @@ -418,6 +418,12 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
throw new IllegalStateException("Scan query can be used only from scan tx");
}

return options.getScanOptions().isUseNewSpliterator()
? streamScanQueryNew(statement, params)
: streamScanQueryLegacy(statement, params);
}

private <PARAMS, RESULT> Stream<RESULT> streamScanQueryNew(Statement<PARAMS, RESULT> statement, PARAMS params) {
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(options.getScanOptions().getTimeout())
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
Expand All @@ -437,6 +443,36 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
return spliterator.createStream();
}

private <PARAMS, RESULT> Stream<RESULT> streamScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(options.getScanOptions().getTimeout())
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
.build();

String yql = getYql(statement);
Params sdkParams = getSdkParams(statement, params);

try {
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(false, action ->
doCall(statement.toDebugString(params), () -> {
Status status = YdbOperations.safeJoin(
session.executeScanQuery(
yql, sdkParams, settings,
rs -> new ResultSetConverter(rs).stream(statement::readResult).forEach(action)
),
options.getScanOptions().getTimeout().plusMinutes(5)
);
validate("SCAN_QUERY: " + yql, status.getCode(), status.toString());
})
);
return spliterator.makeStream();
} catch (RepositoryException e) {
throw e;
} catch (Exception e) {
throw new UnexpectedException("Could not perform scan query", e);
}
}

private QueryStatsCollectionMode getSdkStatsMode() {
var queryStats = options.getQueryStats();
return queryStats == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import lombok.AllArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.experimental.Delegate;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
Expand Down Expand Up @@ -74,6 +73,7 @@
import tech.ydb.yoj.repository.ydb.sample.model.HintInt64Range;
import tech.ydb.yoj.repository.ydb.sample.model.HintTablePreset;
import tech.ydb.yoj.repository.ydb.sample.model.HintUniform;
import tech.ydb.yoj.repository.ydb.statement.FindAllYqlStatement;
import tech.ydb.yoj.repository.ydb.statement.FindStatement;
import tech.ydb.yoj.repository.ydb.statement.YqlStatement;
import tech.ydb.yoj.repository.ydb.table.YdbTable;
Expand Down Expand Up @@ -195,6 +195,18 @@ public void throwConversionExceptionOnSerializationProblem() {

@Test
public void readYqlListAndMap() {
record GroupByResult(
String id,
List<String> items,
Map<String, String> map,

@Column(flatten = false)
GroupByResult.Struct struct
) {
record Struct(String name) {
}
}

WithUnflattenableField entity = new WithUnflattenableField(
new WithUnflattenableField.Id("id_yql_list"),
new WithUnflattenableField.Unflattenable("Hello, world!", 100_500)
Expand All @@ -203,7 +215,7 @@ public void readYqlListAndMap() {
db.tx(() -> {
EntitySchema<WithUnflattenableField> schema = EntitySchema.of(WithUnflattenableField.class);
var tableDescriptor = TableDescriptor.from(schema);
List<GroupByResult> result = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction())
List<GroupByResult> result = ydbRepositoryTransaction()
.execute(new YqlStatement<>(tableDescriptor, schema, ObjectSchema.of(GroupByResult.class)) {
@Override
public String getQuery(String tablespace) {
Expand All @@ -230,20 +242,6 @@ public QueryType getQueryType() {
});
}

@Value
static class GroupByResult {
String id;
List<String> items;
Map<String, String> map;
@Column(flatten = false)
Struct struct;

@Value
static class Struct {
String name;
}
}

@Test
public void readViewFromCache() {
TypeFreak tf1 = newTypeFreak(0, "AAA1", "bbb");
Expand Down Expand Up @@ -893,15 +891,15 @@ public void creatingRepositoryDoesNotConnect() {
public void ydbTransactionCompatibility() {
db.tx(() -> {
// No db tx or session yet!
var sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
var sdkTx = ydbRepositoryTransaction().toSdkTransaction();
assertThatIllegalStateException().isThrownBy(sdkTx::getSessionId);
assertThat(sdkTx.getId()).isNull();
assertThat(sdkTx.getTxMode()).isEqualTo(TxMode.SERIALIZABLE_RW);
assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(sdkTx::getStatusFuture);

// Perform any read - session and tx ID appear
db.projects().countAll();
sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
sdkTx = ydbRepositoryTransaction().toSdkTransaction();
assertThat(sdkTx.getSessionId()).isNotNull();
assertThat(sdkTx.getId()).isNotNull();
assertThat(sdkTx.getTxMode()).isEqualTo(TxMode.SERIALIZABLE_RW);
Expand All @@ -919,15 +917,15 @@ public void ydbTransactionCompatibility() {

db.readOnly().withStatementIsolationLevel(isolationLevel).run(() -> {
// No db tx or session yet!
var sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
var sdkTx = ydbRepositoryTransaction().toSdkTransaction();
assertThatIllegalStateException().isThrownBy(sdkTx::getSessionId);
assertThat(sdkTx.getId()).isNull();
assertThat(sdkTx.getTxMode()).isEqualTo(txMode);
assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(sdkTx::getStatusFuture);

// Perform any read - session and tx ID appear
db.projects().countAll();
sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
sdkTx = ydbRepositoryTransaction().toSdkTransaction();
assertThat(sdkTx.getSessionId()).isNotNull();
// Read transactions might have no ID or might have an ID, depending on your YDB version (that's what YDB returns, folks!)
assertThat(sdkTx.getTxMode()).isEqualTo(txMode);
Expand Down Expand Up @@ -1019,6 +1017,54 @@ public void queryStatsCollectionMode() {
assertThat(found).hasSize(4);
}

@Test
public void streamingScanNotTruncatedOldSpliterator() {
int maxPageSizeBiggerThatReal = 11_000;

db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
));

List<Project.Id> result = db.scan().useNewSpliterator(false).withMaxSize(maxPageSizeBiggerThatReal).run(() -> {
var schema = EntitySchema.of(Project.class);
var desc = TableDescriptor.from(schema);
var statement = new FindAllYqlStatement<>(desc, schema, schema);

var projectIds = new ArrayList<Project.Id>();
try (var stream = ydbRepositoryTransaction().executeScanQuery(statement, null)) {
stream.forEach(p -> projectIds.add(p.getId()));
}
return projectIds;
});
assertEquals(maxPageSizeBiggerThatReal, result.size());
}

@Test
public void streamingScanNotTruncatedNewSpliterator() {
int maxPageSizeBiggerThatReal = 11_000;

db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
));

List<Project.Id> result = db.scan().useNewSpliterator(true).withMaxSize(maxPageSizeBiggerThatReal).run(() -> {
var schema = EntitySchema.of(Project.class);
var desc = TableDescriptor.from(schema);
var statement = new FindAllYqlStatement<>(desc, schema, schema);

var projectIds = new ArrayList<Project.Id>();
try (var stream = ydbRepositoryTransaction().executeScanQuery(statement, null)) {
stream.forEach(p -> projectIds.add(p.getId()));
}
return projectIds;
});
assertEquals(maxPageSizeBiggerThatReal, result.size());
}

private static YdbRepositoryTransaction<?> ydbRepositoryTransaction() {
return (YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction();
}

@AllArgsConstructor
private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.SchemeServiceImplBase {
@Delegate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ public ScanBuilder withTimeout(Duration timeout) {
return new ScanBuilderImpl(delegate.withTimeout(timeout));
}

@Override
public ScanBuilder useNewSpliterator(boolean useNewSpliterator) {
return new ScanBuilderImpl(delegate.useNewSpliterator(useNewSpliterator));
}

@Override
public <T> T run(Supplier<T> supplier) throws RetryableException {
return doRunTx(() -> this.delegate.run(wrapTxBody(supplier)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ public ScanBuilder withTimeout(Duration timeout) {
return withOptions(options.withTimeout(timeout));
}

@Override
public ScanBuilder useNewSpliterator(boolean useNewSpliterator) {
return withOptions(options.withUseNewSpliterator(useNewSpliterator));
}

@Override
public <T> T run(Supplier<T> supplier) throws RetryableException {
TxOptions txOptions = StdTxManager.this.options
Expand Down
10 changes: 10 additions & 0 deletions repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tech.ydb.yoj.repository.db;

import lombok.NonNull;
import tech.ydb.yoj.ExperimentalApi;
import tech.ydb.yoj.repository.db.cache.TransactionLog;
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
Expand Down Expand Up @@ -199,6 +200,15 @@ interface ScanBuilder {

ScanBuilder withTimeout(Duration timeout);

/**
* Specifies whether the new {@code Spliterator} implementation is used for streaming scan query results.
* The new implementation better conforms to the {@code Spliterator} contract and consumes less memory.
* <p>Note that using the new implementation currently has a negative performance impact, for more information refer to
* <a href="https://github.com/ydb-platform/yoj-project/issues/42">GitHub Issue #42</a>.
*/
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
ScanBuilder useNewSpliterator(boolean useNewSpliterator);

<T> T run(Supplier<T> supplier);

default void run(Runnable runnable) {
Expand Down