Skip to content

Commit f353a9c

Browse files
authored
Merge pull request #479 from alex268/add_concurrent_result_set_flag
Added support of concurrent_result_sets option
2 parents c16d078 + 9c48e30 commit f353a9c

File tree

3 files changed

+68
-17
lines changed

3 files changed

+68
-17
lines changed

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -182,31 +182,26 @@ private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) {
182182
}
183183
}
184184

185-
private String mapPoolId(ExecuteQuerySettings settings) {
186-
String actualPoolId = settings.getResourcePool();
187-
188-
if (actualPoolId == null) {
189-
return YdbQuery.ExecuteQueryRequest.getDefaultInstance().getPoolId();
190-
}
191-
192-
return actualPoolId;
193-
}
194-
195185
GrpcReadStream<YdbQuery.ExecuteQueryResponsePart> createGrpcStream(
196186
String query, YdbQuery.TransactionControl tx, Params prms, ExecuteQuerySettings settings
197187
) {
198188
YdbQuery.ExecuteQueryRequest.Builder requestBuilder = YdbQuery.ExecuteQueryRequest.newBuilder()
199189
.setSessionId(sessionId)
200190
.setExecMode(mapExecMode(settings.getExecMode()))
201191
.setStatsMode(mapStatsMode(settings.getStatsMode()))
192+
.setConcurrentResultSets(settings.isConcurrentResultSets())
202193
.setQueryContent(YdbQuery.QueryContent.newBuilder()
203194
.setSyntax(YdbQuery.Syntax.SYNTAX_YQL_V1)
204195
.setText(query)
205196
.build()
206197
)
207-
.setPoolId(mapPoolId(settings))
208198
.putAllParameters(prms.toPb());
209199

200+
String resourcePool = settings.getResourcePool();
201+
if (resourcePool != null && !resourcePool.isEmpty()) {
202+
requestBuilder.setPoolId(resourcePool);
203+
}
204+
210205
if (tx != null) {
211206
requestBuilder.setTxControl(tx);
212207
}

query/src/main/java/tech/ydb/query/settings/ExecuteQuerySettings.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,14 @@
99
public class ExecuteQuerySettings extends BaseRequestSettings {
1010
private final QueryExecMode execMode;
1111
private final QueryStatsMode statsMode;
12-
13-
/**
14-
* Resource pool
15-
*/
12+
private final boolean concurrentResultSets;
1613
private final String resourcePool;
1714

1815
private ExecuteQuerySettings(Builder builder) {
1916
super(builder);
2017
this.execMode = builder.execMode;
2118
this.statsMode = builder.statsMode;
19+
this.concurrentResultSets = builder.concurrentResultSets;
2220
this.resourcePool = builder.resourcePool;
2321
}
2422

@@ -30,6 +28,14 @@ public QueryStatsMode getStatsMode() {
3028
return this.statsMode;
3129
}
3230

31+
public boolean isConcurrentResultSets() {
32+
return this.concurrentResultSets;
33+
}
34+
35+
/**
36+
* Get resource pool for query execution
37+
* @return resource pool name
38+
*/
3339
public String getResourcePool() {
3440
return this.resourcePool;
3541
}
@@ -41,6 +47,7 @@ public static Builder newBuilder() {
4147
public static class Builder extends BaseBuilder<Builder> {
4248
private QueryExecMode execMode = QueryExecMode.EXECUTE;
4349
private QueryStatsMode statsMode = QueryStatsMode.NONE;
50+
private boolean concurrentResultSets = false;
4451
private String resourcePool = null;
4552

4653
public Builder withExecMode(QueryExecMode mode) {
@@ -53,12 +60,17 @@ public Builder withStatsMode(QueryStatsMode mode) {
5360
return this;
5461
}
5562

63+
public Builder withConcurrentResultSets(boolean value) {
64+
this.concurrentResultSets = value;
65+
return this;
66+
}
67+
5668
/**
5769
* Set resource pool which query try to use.
5870
* If no pool specify or poolId is empty or poolId equals "default"
59-
* the undeleted resource pool "default" wll be used
71+
* the unremovable resource pool "default" will be used
6072
*
61-
* @param poolId poolId in ydb
73+
* @param poolId resource pool identifier
6274
*
6375
* @return builder
6476
*/

query/src/test/java/tech/ydb/query/impl/QueryIntegrationTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package tech.ydb.query.impl;
22

33
import java.time.Duration;
4+
import java.util.ArrayDeque;
45
import java.util.ArrayList;
6+
import java.util.Deque;
7+
import java.util.HashSet;
58
import java.util.Iterator;
69
import java.util.List;
10+
import java.util.Set;
711
import java.util.concurrent.CompletableFuture;
812

913
import org.junit.AfterClass;
@@ -315,6 +319,46 @@ public void updateMultipleTablesInOneTransaction() {
315319
}
316320
}
317321

322+
@Test
323+
public void concurrentResultSetsTest() {
324+
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {
325+
try (QuerySession session = client.createSession(Duration.ofSeconds(5)).join().getValue()) {
326+
String query = ""
327+
+ "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id;\n"
328+
+ "SELECT 3 + 5;\n"
329+
+ "SELECT id, name, payload, is_valid FROM " + TEST_TABLE + " ORDER BY id DESC;\n"
330+
+ "SELECT 3 + 1;\n";
331+
332+
// consistent read - all result sets are ordered
333+
QueryStream qs1 = session.createQuery(query, TxMode.SNAPSHOT_RO, Params.empty(),
334+
ExecuteQuerySettings.newBuilder().withConcurrentResultSets(false).build());
335+
336+
Deque<Long> ordered = new ArrayDeque<>();
337+
Result<QueryInfo> res1 = qs1.execute(part -> {
338+
Long id = part.getResultSetIndex();
339+
if (!ordered.isEmpty()) {
340+
Assert.assertTrue(id >= ordered.getLast());
341+
}
342+
ordered.addLast(id);
343+
}).join();
344+
Assert.assertTrue(res1.isSuccess());
345+
346+
// concurrent read - all result sets are unordered
347+
QueryStream qs2 = session.createQuery(query, TxMode.SNAPSHOT_RO, Params.empty(),
348+
ExecuteQuerySettings.newBuilder().withConcurrentResultSets(true).build());
349+
350+
Set<Long> unordered = new HashSet<>();
351+
Result<QueryInfo> res2 = qs2.execute(part -> {
352+
unordered.add(part.getResultSetIndex());
353+
}).join();
354+
Assert.assertTrue(res2.isSuccess());
355+
356+
Assert.assertTrue(ordered.containsAll(unordered));
357+
Assert.assertTrue(unordered.containsAll(ordered));
358+
}
359+
}
360+
}
361+
318362
@Test
319363
public void interactiveTransaction() {
320364
try (QueryClient client = QueryClient.newClient(ydbTransport).build()) {

0 commit comments

Comments
 (0)