Skip to content

Commit 0a5089e

Browse files
authored
[24-3] Fix reads from many shards #11569 (#11658)
1 parent ee21311 commit 0a5089e

File tree

2 files changed

+98
-8
lines changed

2 files changed

+98
-8
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -991,14 +991,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
991991
}
992992

993993
std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
994-
// Special case for infinity
995-
if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) {
996-
return !lhs.Ranges->GetRightBorder().first->GetCells().empty();
997-
}
998-
return CompareTypedCellVectors(
999-
lhs.Ranges->GetRightBorder().first->GetCells().data(),
1000-
rhs.Ranges->GetRightBorder().first->GetCells().data(),
1001-
keyTypes.data(), keyTypes.size()) < 0;
994+
return CompareBorders<false, false>(
995+
lhs.Ranges->GetRightBorder().first->GetCells(),
996+
rhs.Ranges->GetRightBorder().first->GetCells(),
997+
lhs.Ranges->GetRightBorder().second,
998+
rhs.Ranges->GetRightBorder().second,
999+
keyTypes) < 0;
10021000
});
10031001

10041002
// One shard (ranges set) can be assigned only to one task. Otherwise, we can break some optimizations like removing unnecessary shuffle.

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4817,6 +4817,98 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
48174817
R"([[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]]])");
48184818
}
48194819
}
4820+
4821+
Y_UNIT_TEST(ReadManyRanges) {
4822+
NKikimrConfig::TAppConfig appConfig;
4823+
auto settings = TKikimrSettings()
4824+
.SetAppConfig(appConfig)
4825+
.SetWithSampleTables(false);
4826+
4827+
TKikimrRunner kikimr(settings);
4828+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
4829+
4830+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
4831+
4832+
const TString query = R"(
4833+
CREATE TABLE `/Root/DataShard` (
4834+
Col1 String,
4835+
Col2 String,
4836+
Col3 String,
4837+
PRIMARY KEY (Col1, Col2)
4838+
)
4839+
WITH (
4840+
STORE = ROW,
4841+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
4842+
PARTITION_AT_KEYS = (("a"), ("b"), ("c"), ("d"), ("e"), ("f"), ("g"), ("h"), ("k"), ("p"), ("q"), ("x"))
4843+
);
4844+
)";
4845+
4846+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
4847+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
4848+
4849+
auto client = kikimr.GetQueryClient();
4850+
4851+
{
4852+
auto prepareResult = client.ExecuteQuery(R"(
4853+
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("y", "1") , ("y", "2"), ("d", "1"), ("b", "1"), ("k", "1"), ("q", "1"), ("p", "1");
4854+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4855+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
4856+
}
4857+
4858+
{
4859+
auto result = client.ExecuteQuery(R"(
4860+
SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col1 IN ("d", "b", "k", "q", "p") OR (Col1 = "y" AND Col2 = "2");
4861+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4862+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4863+
CompareYson(R"([[6u]])", FormatResultSetYson(result.GetResultSet(0)));
4864+
}
4865+
}
4866+
4867+
Y_UNIT_TEST(ReadManyShardsRange) {
4868+
NKikimrConfig::TAppConfig appConfig;
4869+
auto settings = TKikimrSettings()
4870+
.SetAppConfig(appConfig)
4871+
.SetWithSampleTables(false);
4872+
4873+
TKikimrRunner kikimr(settings);
4874+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
4875+
4876+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
4877+
4878+
const TString query = R"(
4879+
CREATE TABLE `/Root/DataShard` (
4880+
Col1 String,
4881+
Col2 String,
4882+
Col3 String,
4883+
PRIMARY KEY (Col1, Col2)
4884+
)
4885+
WITH (
4886+
STORE = ROW,
4887+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
4888+
PARTITION_AT_KEYS = (("a", "0"), ("b", "b"), ("c", "d"))
4889+
);
4890+
)";
4891+
4892+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
4893+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
4894+
4895+
auto client = kikimr.GetQueryClient();
4896+
4897+
{
4898+
auto prepareResult = client.ExecuteQuery(R"(
4899+
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("a", "a") , ("c", "c"), ("d", "d");
4900+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4901+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
4902+
}
4903+
4904+
{
4905+
auto result = client.ExecuteQuery(R"(
4906+
SELECT COUNT(*) FROM `/Root/DataShard` WHERE "a" <= Col1 AND Col1 <= "c";
4907+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4908+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4909+
CompareYson(R"([[2u]])", FormatResultSetYson(result.GetResultSet(0)));
4910+
}
4911+
}
48204912
}
48214913

48224914
} // namespace NKqp

0 commit comments

Comments
 (0)