Skip to content

Commit 2ff8722

Browse files
authored
Fix reads from many shards (#11569)
1 parent bb43627 commit 2ff8722

File tree

2 files changed

+98
-8
lines changed

2 files changed

+98
-8
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,14 +1062,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
10621062
}
10631063

10641064
std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
1065-
// Special case for infinity
1066-
if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) {
1067-
return !lhs.Ranges->GetRightBorder().first->GetCells().empty();
1068-
}
1069-
return CompareTypedCellVectors(
1070-
lhs.Ranges->GetRightBorder().first->GetCells().data(),
1071-
rhs.Ranges->GetRightBorder().first->GetCells().data(),
1072-
keyTypes.data(), keyTypes.size()) < 0;
1065+
return CompareBorders<false, false>(
1066+
lhs.Ranges->GetRightBorder().first->GetCells(),
1067+
rhs.Ranges->GetRightBorder().first->GetCells(),
1068+
lhs.Ranges->GetRightBorder().second,
1069+
rhs.Ranges->GetRightBorder().second,
1070+
keyTypes) < 0;
10731071
});
10741072

10751073
// One shard (ranges set) can be assigned only to one task. Otherwise, we can break some optimizations like removing unnecessary shuffle.

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5176,6 +5176,98 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
51765176
R"([[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]]])");
51775177
}
51785178
}
5179+
5180+
Y_UNIT_TEST(ReadManyRanges) {
5181+
NKikimrConfig::TAppConfig appConfig;
5182+
auto settings = TKikimrSettings()
5183+
.SetAppConfig(appConfig)
5184+
.SetWithSampleTables(false);
5185+
5186+
TKikimrRunner kikimr(settings);
5187+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
5188+
5189+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
5190+
5191+
const TString query = R"(
5192+
CREATE TABLE `/Root/DataShard` (
5193+
Col1 String,
5194+
Col2 String,
5195+
Col3 String,
5196+
PRIMARY KEY (Col1, Col2)
5197+
)
5198+
WITH (
5199+
STORE = ROW,
5200+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
5201+
PARTITION_AT_KEYS = (("a"), ("b"), ("c"), ("d"), ("e"), ("f"), ("g"), ("h"), ("k"), ("p"), ("q"), ("x"))
5202+
);
5203+
)";
5204+
5205+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
5206+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
5207+
5208+
auto client = kikimr.GetQueryClient();
5209+
5210+
{
5211+
auto prepareResult = client.ExecuteQuery(R"(
5212+
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("y", "1") , ("y", "2"), ("d", "1"), ("b", "1"), ("k", "1"), ("q", "1"), ("p", "1");
5213+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
5214+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
5215+
}
5216+
5217+
{
5218+
auto result = client.ExecuteQuery(R"(
5219+
SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col1 IN ("d", "b", "k", "q", "p") OR (Col1 = "y" AND Col2 = "2");
5220+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
5221+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
5222+
CompareYson(R"([[6u]])", FormatResultSetYson(result.GetResultSet(0)));
5223+
}
5224+
}
5225+
5226+
Y_UNIT_TEST(ReadManyShardsRange) {
5227+
NKikimrConfig::TAppConfig appConfig;
5228+
auto settings = TKikimrSettings()
5229+
.SetAppConfig(appConfig)
5230+
.SetWithSampleTables(false);
5231+
5232+
TKikimrRunner kikimr(settings);
5233+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
5234+
5235+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
5236+
5237+
const TString query = R"(
5238+
CREATE TABLE `/Root/DataShard` (
5239+
Col1 String,
5240+
Col2 String,
5241+
Col3 String,
5242+
PRIMARY KEY (Col1, Col2)
5243+
)
5244+
WITH (
5245+
STORE = ROW,
5246+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
5247+
PARTITION_AT_KEYS = (("a", "0"), ("b", "b"), ("c", "d"))
5248+
);
5249+
)";
5250+
5251+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
5252+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
5253+
5254+
auto client = kikimr.GetQueryClient();
5255+
5256+
{
5257+
auto prepareResult = client.ExecuteQuery(R"(
5258+
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("a", "a") , ("c", "c"), ("d", "d");
5259+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
5260+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
5261+
}
5262+
5263+
{
5264+
auto result = client.ExecuteQuery(R"(
5265+
SELECT COUNT(*) FROM `/Root/DataShard` WHERE "a" <= Col1 AND Col1 <= "c";
5266+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
5267+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
5268+
CompareYson(R"([[2u]])", FormatResultSetYson(result.GetResultSet(0)));
5269+
}
5270+
}
51795271
}
51805272

51815273
} // namespace NKqp

0 commit comments

Comments
 (0)