Skip to content

Commit 128dc52

Browse files
committed
#150: Support choosing spliterator implementation for streaming scan queries
...that is, for `YdbRepositoryTransaction.executeScanQuery()` returning `Stream<RESULT>`
1 parent 85be392 commit 128dc52

File tree

6 files changed

+142
-28
lines changed

6 files changed

+142
-28
lines changed

repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2509,14 +2509,26 @@ public void scanUpdateFails() {
25092509
}
25102510

25112511
@Test
2512-
public void scanNotTruncated() {
2512+
public void scanNotTruncatedOldSpliterator() {
25132513
int maxPageSizeBiggerThatReal = 11_000;
25142514

25152515
db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
25162516
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
25172517
));
25182518

2519-
List<Project> result = db.scan().withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll());
2519+
List<Project> result = db.scan().useNewSpliterator(false).withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll());
2520+
assertEquals(maxPageSizeBiggerThatReal, result.size());
2521+
}
2522+
2523+
@Test
2524+
public void scanNotTruncatedNewSpliterator() {
2525+
int maxPageSizeBiggerThatReal = 11_000;
2526+
2527+
db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
2528+
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
2529+
));
2530+
2531+
List<Project> result = db.scan().useNewSpliterator(true).withMaxSize(maxPageSizeBiggerThatReal).run(() -> db.projects().findAll());
25202532
assertEquals(maxPageSizeBiggerThatReal, result.size());
25212533
}
25222534

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,8 @@ public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement
298298
result = doCall(statement.toDebugString(params), () -> {
299299
if (options.isScan()) {
300300
return options.getScanOptions().isUseNewSpliterator()
301-
? doExecuteScanQueryList(statement, params)
302-
: doExecuteScanQueryLegacy(statement, params);
301+
? listScanQueryNew(statement, params)
302+
: listScanQueryLegacy(statement, params);
303303
} else {
304304
return doExecuteDataQuery(statement, params);
305305
}
@@ -371,7 +371,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
371371
return new ResultSetConverter(resultSet).stream(statement::readResult).collect(toList());
372372
}
373373

374-
private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
374+
private <PARAMS, RESULT> List<RESULT> listScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
375375
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
376376
.withRequestTimeout(options.getScanOptions().getTimeout())
377377
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
@@ -396,9 +396,9 @@ private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS,
396396
return result;
397397
}
398398

399-
private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryList(Statement<PARAMS, RESULT> statement, PARAMS params) {
399+
private <PARAMS, RESULT> List<RESULT> listScanQueryNew(Statement<PARAMS, RESULT> statement, PARAMS params) {
400400
List<RESULT> result = new ArrayList<>();
401-
try (Stream<RESULT> stream = executeScanQuery(statement, params)) {
401+
try (Stream<RESULT> stream = streamScanQueryNew(statement, params)) {
402402
stream.forEach(r -> {
403403
if (result.size() >= options.getScanOptions().getMaxSize()) {
404404
throw new ResultTruncatedException(
@@ -415,9 +415,15 @@ private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryList(Statement<PARAMS, R
415415
@Override
416416
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
417417
if (!options.isScan()) {
418-
throw new IllegalStateException("Scan query can be used only from scan tx");
418+
throw new IllegalStateException("Streaming scan query can be used only from scan tx");
419419
}
420420

421+
return options.getScanOptions().isUseNewSpliterator()
422+
? streamScanQueryNew(statement, params)
423+
: streamScanQueryLegacy(statement, params);
424+
}
425+
426+
private <PARAMS, RESULT> Stream<RESULT> streamScanQueryNew(Statement<PARAMS, RESULT> statement, PARAMS params) {
421427
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
422428
.withRequestTimeout(options.getScanOptions().getTimeout())
423429
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
@@ -437,6 +443,36 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
437443
return spliterator.createStream();
438444
}
439445

446+
private <PARAMS, RESULT> Stream<RESULT> streamScanQueryLegacy(Statement<PARAMS, RESULT> statement, PARAMS params) {
447+
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
448+
.withRequestTimeout(options.getScanOptions().getTimeout())
449+
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
450+
.build();
451+
452+
String yql = getYql(statement);
453+
Params sdkParams = getSdkParams(statement, params);
454+
455+
try {
456+
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(false, action ->
457+
doCall(statement.toDebugString(params), () -> {
458+
Status status = YdbOperations.safeJoin(
459+
session.executeScanQuery(
460+
yql, sdkParams, settings,
461+
rs -> new ResultSetConverter(rs).stream(statement::readResult).forEach(action)
462+
),
463+
options.getScanOptions().getTimeout().plusMinutes(5)
464+
);
465+
validate("SCAN_QUERY: " + yql, status.getCode(), status.toString());
466+
})
467+
);
468+
return spliterator.makeStream();
469+
} catch (RepositoryException e) {
470+
throw e;
471+
} catch (Exception e) {
472+
throw new UnexpectedException("Could not perform scan query", e);
473+
}
474+
}
475+
440476
private QueryStatsCollectionMode getSdkStatsMode() {
441477
var queryStats = options.getQueryStats();
442478
return queryStats == null

repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import lombok.AllArgsConstructor;
1414
import lombok.Setter;
1515
import lombok.SneakyThrows;
16-
import lombok.Value;
1716
import lombok.experimental.Delegate;
1817
import org.assertj.core.api.Assertions;
1918
import org.junit.Assert;
@@ -74,6 +73,7 @@
7473
import tech.ydb.yoj.repository.ydb.sample.model.HintInt64Range;
7574
import tech.ydb.yoj.repository.ydb.sample.model.HintTablePreset;
7675
import tech.ydb.yoj.repository.ydb.sample.model.HintUniform;
76+
import tech.ydb.yoj.repository.ydb.statement.FindAllYqlStatement;
7777
import tech.ydb.yoj.repository.ydb.statement.FindStatement;
7878
import tech.ydb.yoj.repository.ydb.statement.YqlStatement;
7979
import tech.ydb.yoj.repository.ydb.table.YdbTable;
@@ -195,6 +195,18 @@ public void throwConversionExceptionOnSerializationProblem() {
195195

196196
@Test
197197
public void readYqlListAndMap() {
198+
record GroupByResult(
199+
String id,
200+
List<String> items,
201+
Map<String, String> map,
202+
203+
@Column(flatten = false)
204+
GroupByResult.Struct struct
205+
) {
206+
record Struct(String name) {
207+
}
208+
}
209+
198210
WithUnflattenableField entity = new WithUnflattenableField(
199211
new WithUnflattenableField.Id("id_yql_list"),
200212
new WithUnflattenableField.Unflattenable("Hello, world!", 100_500)
@@ -203,7 +215,7 @@ public void readYqlListAndMap() {
203215
db.tx(() -> {
204216
EntitySchema<WithUnflattenableField> schema = EntitySchema.of(WithUnflattenableField.class);
205217
var tableDescriptor = TableDescriptor.from(schema);
206-
List<GroupByResult> result = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction())
218+
List<GroupByResult> result = ydbRepositoryTransaction()
207219
.execute(new YqlStatement<>(tableDescriptor, schema, ObjectSchema.of(GroupByResult.class)) {
208220
@Override
209221
public String getQuery(String tablespace) {
@@ -230,20 +242,6 @@ public QueryType getQueryType() {
230242
});
231243
}
232244

233-
@Value
234-
static class GroupByResult {
235-
String id;
236-
List<String> items;
237-
Map<String, String> map;
238-
@Column(flatten = false)
239-
Struct struct;
240-
241-
@Value
242-
static class Struct {
243-
String name;
244-
}
245-
}
246-
247245
@Test
248246
public void readViewFromCache() {
249247
TypeFreak tf1 = newTypeFreak(0, "AAA1", "bbb");
@@ -893,15 +891,15 @@ public void creatingRepositoryDoesNotConnect() {
893891
public void ydbTransactionCompatibility() {
894892
db.tx(() -> {
895893
// No db tx or session yet!
896-
var sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
894+
var sdkTx = ydbRepositoryTransaction().toSdkTransaction();
897895
assertThatIllegalStateException().isThrownBy(sdkTx::getSessionId);
898896
assertThat(sdkTx.getId()).isNull();
899897
assertThat(sdkTx.getTxMode()).isEqualTo(TxMode.SERIALIZABLE_RW);
900898
assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(sdkTx::getStatusFuture);
901899

902900
// Perform any read - session and tx ID appear
903901
db.projects().countAll();
904-
sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
902+
sdkTx = ydbRepositoryTransaction().toSdkTransaction();
905903
assertThat(sdkTx.getSessionId()).isNotNull();
906904
assertThat(sdkTx.getId()).isNotNull();
907905
assertThat(sdkTx.getTxMode()).isEqualTo(TxMode.SERIALIZABLE_RW);
@@ -919,15 +917,15 @@ public void ydbTransactionCompatibility() {
919917

920918
db.readOnly().withStatementIsolationLevel(isolationLevel).run(() -> {
921919
// No db tx or session yet!
922-
var sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
920+
var sdkTx = ydbRepositoryTransaction().toSdkTransaction();
923921
assertThatIllegalStateException().isThrownBy(sdkTx::getSessionId);
924922
assertThat(sdkTx.getId()).isNull();
925923
assertThat(sdkTx.getTxMode()).isEqualTo(txMode);
926924
assertThatExceptionOfType(UnsupportedOperationException.class).isThrownBy(sdkTx::getStatusFuture);
927925

928926
// Perform any read - session and tx ID appear
929927
db.projects().countAll();
930-
sdkTx = ((YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction()).toSdkTransaction();
928+
sdkTx = ydbRepositoryTransaction().toSdkTransaction();
931929
assertThat(sdkTx.getSessionId()).isNotNull();
932930
// Read transactions might have no ID or might have an ID, depending on your YDB version (that's what YDB returns, folks!)
933931
assertThat(sdkTx.getTxMode()).isEqualTo(txMode);
@@ -1019,6 +1017,54 @@ public void queryStatsCollectionMode() {
10191017
assertThat(found).hasSize(4);
10201018
}
10211019

1020+
@Test
1021+
public void streamingScanNotTruncatedOldSpliterator() {
1022+
int maxPageSizeBiggerThatReal = 11_000;
1023+
1024+
db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
1025+
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
1026+
));
1027+
1028+
List<Project.Id> result = db.scan().useNewSpliterator(false).withMaxSize(maxPageSizeBiggerThatReal).run(() -> {
1029+
var schema = EntitySchema.of(Project.class);
1030+
var desc = TableDescriptor.from(schema);
1031+
var statement = new FindAllYqlStatement<>(desc, schema, schema);
1032+
1033+
var projectIds = new ArrayList<Project.Id>();
1034+
try (var stream = ydbRepositoryTransaction().executeScanQuery(statement, null)) {
1035+
stream.forEach(p -> projectIds.add(p.getId()));
1036+
}
1037+
return projectIds;
1038+
});
1039+
assertEquals(maxPageSizeBiggerThatReal, result.size());
1040+
}
1041+
1042+
@Test
1043+
public void streamingScanNotTruncatedNewSpliterator() {
1044+
int maxPageSizeBiggerThatReal = 11_000;
1045+
1046+
db.tx(() -> IntStream.range(0, maxPageSizeBiggerThatReal).forEach(
1047+
i -> db.projects().save(new Project(new Project.Id("id_" + i), "name"))
1048+
));
1049+
1050+
List<Project.Id> result = db.scan().useNewSpliterator(true).withMaxSize(maxPageSizeBiggerThatReal).run(() -> {
1051+
var schema = EntitySchema.of(Project.class);
1052+
var desc = TableDescriptor.from(schema);
1053+
var statement = new FindAllYqlStatement<>(desc, schema, schema);
1054+
1055+
var projectIds = new ArrayList<Project.Id>();
1056+
try (var stream = ydbRepositoryTransaction().executeScanQuery(statement, null)) {
1057+
stream.forEach(p -> projectIds.add(p.getId()));
1058+
}
1059+
return projectIds;
1060+
});
1061+
assertEquals(maxPageSizeBiggerThatReal, result.size());
1062+
}
1063+
1064+
private static YdbRepositoryTransaction<?> ydbRepositoryTransaction() {
1065+
return (YdbRepositoryTransaction<?>) Tx.Current.get().getRepositoryTransaction();
1066+
}
1067+
10221068
@AllArgsConstructor
10231069
private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.SchemeServiceImplBase {
10241070
@Delegate

repository/src/main/java/tech/ydb/yoj/repository/db/DelegatingTxManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ public ScanBuilder withTimeout(Duration timeout) {
210210
return new ScanBuilderImpl(delegate.withTimeout(timeout));
211211
}
212212

213+
@Override
214+
public ScanBuilder useNewSpliterator(boolean useNewSpliterator) {
215+
return new ScanBuilderImpl(delegate.useNewSpliterator(useNewSpliterator));
216+
}
217+
213218
@Override
214219
public <T> T run(Supplier<T> supplier) throws RetryableException {
215220
return doRunTx(() -> this.delegate.run(wrapTxBody(supplier)));

repository/src/main/java/tech/ydb/yoj/repository/db/StdTxManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,11 @@ public ScanBuilder withTimeout(Duration timeout) {
372372
return withOptions(options.withTimeout(timeout));
373373
}
374374

375+
@Override
376+
public ScanBuilder useNewSpliterator(boolean useNewSpliterator) {
377+
return withOptions(options.withUseNewSpliterator(useNewSpliterator));
378+
}
379+
375380
@Override
376381
public <T> T run(Supplier<T> supplier) throws RetryableException {
377382
TxOptions txOptions = StdTxManager.this.options

repository/src/main/java/tech/ydb/yoj/repository/db/TxManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tech.ydb.yoj.repository.db;
22

33
import lombok.NonNull;
4+
import tech.ydb.yoj.ExperimentalApi;
45
import tech.ydb.yoj.repository.db.cache.TransactionLog;
56
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
67
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
@@ -199,6 +200,15 @@ interface ScanBuilder {
199200

200201
ScanBuilder withTimeout(Duration timeout);
201202

203+
/**
204+
* Specifies whether the new {@code Spliterator} implementation is used for streaming scan query results.
205+
* The new implementation better conforms to the {@code Spliterator} contract and consumes less memory.
206+
* <p>Note that using the new implementation currently has a negative performance impact, for more information refer to
207+
* <a href="https://github.com/ydb-platform/yoj-project/issues/42">GitHub Issue #42</a>.
208+
*/
209+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
210+
ScanBuilder useNewSpliterator(boolean useNewSpliterator);
211+
202212
<T> T run(Supplier<T> supplier);
203213

204214
default void run(Runnable runnable) {

0 commit comments

Comments
 (0)