Skip to content

Commit 7435c72

Browse files
raydzastazevaykin
authored andcommitted
Followers support for secondary indexes (#17965)
1 parent a5a96d9 commit 7435c72

File tree

16 files changed

+439
-138
lines changed

16 files changed

+439
-138
lines changed

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,28 @@ namespace {
677677
return true;
678678
}
679679

680+
[[nodiscard]] bool ParseReadReplicasSettings(
681+
Ydb::Table::ReadReplicasSettings& readReplicasSettings,
682+
const TCoNameValueTuple& setting,
683+
TExprContext& ctx
684+
) {
685+
const auto replicasSettings = TString(
686+
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
687+
);
688+
689+
Ydb::StatusIds::StatusCode code;
690+
TString errText;
691+
if (!ConvertReadReplicasSettingsToProto(replicasSettings, readReplicasSettings, code, errText)) {
692+
693+
ctx.AddError(YqlIssue(ctx.GetPosition(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Pos()),
694+
NYql::YqlStatusFromYdbStatus(code),
695+
errText));
696+
return false;
697+
}
698+
699+
return true;
700+
}
701+
680702
bool ParseAsyncReplicationSettingsBase(
681703
TReplicationSettingsBase& dstSettings, const TCoNameValueTupleList& srcSettings, TExprContext& ctx, TPositionHandle pos,
682704
const TString& objectName = "replication"
@@ -1746,17 +1768,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
17461768
}
17471769

17481770
} else if (name == "readReplicasSettings") {
1749-
const auto replicasSettings = TString(
1750-
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
1751-
);
1752-
Ydb::StatusIds::StatusCode code;
1753-
TString errText;
1754-
if (!ConvertReadReplicasSettingsToProto(replicasSettings,
1755-
*alterTableRequest.mutable_set_read_replicas_settings(), code, errText)) {
1756-
1757-
ctx.AddError(YqlIssue(ctx.GetPosition(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Pos()),
1758-
NYql::YqlStatusFromYdbStatus(code),
1759-
errText));
1771+
if (!ParseReadReplicasSettings(*alterTableRequest.mutable_set_read_replicas_settings(), setting, ctx)) {
17601772
return SyncError();
17611773
}
17621774
} else if (name == "setTtlSettings") {
@@ -1907,12 +1919,17 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
19071919
} else if (settingName == "tableSettings") {
19081920
auto tableSettings = indexSetting.Value().Cast<TCoNameValueTupleList>();
19091921
for (const auto& tableSetting : tableSettings) {
1910-
if (IsPartitioningSetting(tableSetting.Name().Value())) {
1922+
const auto name = tableSetting.Name().Value();
1923+
if (IsPartitioningSetting(name)) {
19111924
if (!ParsePartitioningSettings(
19121925
*alterTableRequest.mutable_alter_partitioning_settings(), tableSetting, ctx
19131926
)) {
19141927
return SyncError();
19151928
}
1929+
} else if (name == "readReplicasSettings") {
1930+
if (!ParseReadReplicasSettings(*alterTableRequest.mutable_set_read_replicas_settings(), tableSetting, ctx)) {
1931+
return SyncError();
1932+
}
19161933
} else {
19171934
ctx.AddError(
19181935
TIssue(ctx.GetPosition(tableSetting.Name().Pos()),

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1508,6 +1508,36 @@ void WaitForZeroReadIterators(Tests::TServer& server, const TString& path) {
15081508
UNIT_ASSERT_C(iterators == 0, "Unable to wait for proper read iterator count, it looks like cancelation doesn`t work (" << iterators << ")");
15091509
}
15101510

1511+
int GetCumulativeCounterValue(Tests::TServer& server, const TString& path, const TString& counterName) {
1512+
int result = 0;
1513+
1514+
TTestActorRuntime* runtime = server.GetRuntime();
1515+
auto sender = runtime->AllocateEdgeActor();
1516+
auto shards = GetTableShards(&server, sender, path);
1517+
UNIT_ASSERT_C(shards.size() > 0, "Table: " << path << " has no shards");
1518+
1519+
for (auto x : shards) {
1520+
runtime->SendToPipe(
1521+
x,
1522+
sender,
1523+
new TEvTablet::TEvGetCounters,
1524+
0,
1525+
GetPipeConfigWithRetries());
1526+
1527+
auto ev = runtime->GrabEdgeEvent<TEvTablet::TEvGetCountersResponse>(sender);
1528+
UNIT_ASSERT(ev);
1529+
1530+
const NKikimrTabletBase::TEvGetCountersResponse& resp = ev->Get()->Record;
1531+
for (const auto& counter : resp.GetTabletCounters().GetAppCounters().GetCumulativeCounters()) {
1532+
if (counter.GetName() == counterName) {
1533+
result += counter.GetValue();
1534+
}
1535+
}
1536+
}
1537+
1538+
return result;
1539+
}
1540+
15111541
NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& params) {
15121542
if (auto ops = opt.GetMapSafe().find("Operators"); ops != opt.GetMapSafe().end()) {
15131543
auto opName = ops->second.GetArraySafe()[0].GetMapSafe().at("Name").GetStringSafe();

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ TVector<ui64> GetColumnTableShards(Tests::TServer* server, TActorId sender, cons
384384

385385
void WaitForZeroSessions(const NKqp::TKqpCounters& counters);
386386
void WaitForZeroReadIterators(Tests::TServer& server, const TString& path);
387+
int GetCumulativeCounterValue(Tests::TServer& server, const TString& path, const TString& counterName);
387388

388389
bool JoinOrderAndAlgosMatch(const TString& optimized, const TString& reference);
389390

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2372,6 +2372,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
23722372
SELECT * FROM FollowersKv WHERE Key = 21;
23732373
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
23742374
AssertSuccessResult(result);
2375+
UNIT_ASSERT_UNEQUAL(0, GetCumulativeCounterValue(
2376+
kikimr.GetTestServer(),
2377+
"/Root/FollowersKv",
2378+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2379+
));
23752380

23762381
CompareYson(R"(
23772382
[
@@ -2385,6 +2390,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
23852390
SELECT * FROM FollowersKv WHERE Value != "One" ORDER BY Key;
23862391
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
23872392
AssertSuccessResult(result);
2393+
UNIT_ASSERT_UNEQUAL(0, GetCumulativeCounterValue(
2394+
kikimr.GetTestServer(),
2395+
"/Root/FollowersKv",
2396+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2397+
));
23882398

23892399
CompareYson(R"(
23902400
[
@@ -2400,6 +2410,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
24002410
SELECT * FROM TwoShard WHERE Key = 2;
24012411
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
24022412
AssertSuccessResult(result);
2413+
UNIT_ASSERT_EQUAL(0, GetCumulativeCounterValue(
2414+
kikimr.GetTestServer(),
2415+
"/Root/TwoShard",
2416+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2417+
));
24032418

24042419
CompareYson(R"(
24052420
[
@@ -2420,6 +2435,11 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
24202435
[[4000000001u];["BigOne"];[-1]]
24212436
]
24222437
)", FormatResultSetYson(result.GetResultSet(0)));
2438+
UNIT_ASSERT_EQUAL(0, GetCumulativeCounterValue(
2439+
kikimr.GetTestServer(),
2440+
"/Root/TwoShard",
2441+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2442+
));
24232443
}
24242444

24252445
Y_UNIT_TEST(StaleRO_Immediate) {
@@ -2444,6 +2464,66 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
24442464
)", FormatResultSetYson(result.GetResultSet(0)));
24452465
}
24462466

2467+
Y_UNIT_TEST_TWIN(StaleRO_IndexFollowers, EnableFollowers) {
2468+
auto kikimr = DefaultKikimrRunner();
2469+
auto db = kikimr.GetTableClient();
2470+
auto session = db.CreateSession().GetValueSync().GetSession();
2471+
2472+
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
2473+
--!syntax_v1
2474+
CREATE TABLE `KeySubkey` (
2475+
Key Uint64,
2476+
Subkey Uint64,
2477+
Value String,
2478+
Order Uint32,
2479+
PRIMARY KEY (Key, Subkey)
2480+
);
2481+
2482+
ALTER TABLE `KeySubkey` ADD INDEX `idx` GLOBAL SYNC ON (`Key`, `Order`) COVER (`Value`);
2483+
)").GetValueSync());
2484+
2485+
if constexpr (EnableFollowers) {
2486+
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
2487+
--!syntax_v1
2488+
ALTER TABLE `KeySubkey` ALTER INDEX `idx` SET READ_REPLICAS_SETTINGS "PER_AZ:1";
2489+
)").GetValueSync());
2490+
}
2491+
2492+
AssertSuccessResult(session.ExecuteDataQuery(R"(
2493+
--!syntax_v1
2494+
2495+
REPLACE INTO `KeySubkey` (`Key`, `Subkey`, `Value`, `Order`) VALUES
2496+
(1u, 2u, "One", 7u),
2497+
(1u, 3u, "Two", 4u),
2498+
(21u, 8u, "Three", 1u),
2499+
(31u, 0u, "Four", 8u);
2500+
)", TTxControl::BeginTx().CommitTx()).GetValueSync());
2501+
2502+
auto result = session.ExecuteDataQuery(R"(
2503+
--!syntax_v1
2504+
SELECT * FROM `KeySubkey` VIEW `idx` WHERE Key = 1 ORDER BY `Order`;
2505+
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2506+
AssertSuccessResult(result);
2507+
2508+
const auto FollowerCpuTime = GetCumulativeCounterValue(
2509+
kikimr.GetTestServer(),
2510+
"/Root/KeySubkey/idx/indexImplTable",
2511+
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2512+
);
2513+
if constexpr (EnableFollowers) {
2514+
UNIT_ASSERT_UNEQUAL(0, FollowerCpuTime);
2515+
} else {
2516+
UNIT_ASSERT_EQUAL(0, FollowerCpuTime);
2517+
}
2518+
2519+
CompareYson(R"(
2520+
[
2521+
[[1u];[4u];[3u];["Two"]];
2522+
[[1u];[7u];[2u];["One"]];
2523+
]
2524+
)", FormatResultSetYson(result.GetResultSet(0)));
2525+
}
2526+
24472527
Y_UNIT_TEST(ReadRangeWithParams) {
24482528
auto kikimr = DefaultKikimrRunner();
24492529
auto db = kikimr.GetTableClient();

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2802,19 +2802,30 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
28022802
AlterTableAddIndex(EIndexTypeSql::GlobalVectorKMeansTree);
28032803
}
28042804

2805-
Y_UNIT_TEST(AlterTableAlterIndex) {
2805+
Y_UNIT_TEST_TWIN(AlterTableAlterIndex, UseQueryService) {
28062806
TKikimrRunner kikimr;
2807+
auto queryClient = kikimr.GetQueryClient();
28072808
auto db = kikimr.GetTableClient();
28082809
auto session = db.CreateSession().GetValueSync().GetSession();
28092810
CreateSampleTablesWithIndex(session);
28102811

2812+
const auto executeGeneric = [&queryClient, &session](const TString& query) -> TStatus {
2813+
if constexpr (UseQueryService) {
2814+
Y_UNUSED(session);
2815+
return queryClient.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2816+
} else {
2817+
Y_UNUSED(queryClient);
2818+
return session.ExecuteSchemeQuery(query).ExtractValueSync();
2819+
}
2820+
};
2821+
28112822
constexpr int minPartitionsCount = 10;
28122823
{
2813-
auto result = session.ExecuteSchemeQuery(Sprintf(R"(
2824+
const auto result = executeGeneric(Sprintf(R"(
28142825
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET AUTO_PARTITIONING_MIN_PARTITIONS_COUNT %d;
28152826
)", minPartitionsCount
28162827
)
2817-
).ExtractValueSync();
2828+
);
28182829
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
28192830
}
28202831
{
@@ -2826,18 +2837,39 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
28262837

28272838
constexpr int partitionSizeMb = 555;
28282839
{
2829-
auto result = session.ExecuteSchemeQuery(Sprintf(R"(
2840+
const auto result = executeGeneric(Sprintf(R"(
28302841
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET AUTO_PARTITIONING_PARTITION_SIZE_MB %d;
2831-
)", partitionSizeMb)
2832-
).ExtractValueSync();
2833-
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
2842+
)", partitionSizeMb
2843+
)
2844+
);
2845+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
28342846
}
28352847
{
28362848
auto describe = session.DescribeTable("/Root/SecondaryKeys/Index/indexImplTable").GetValueSync();
28372849
UNIT_ASSERT_C(describe.IsSuccess(), describe.GetIssues().ToString());
28382850
auto indexDesc = describe.GetTableDescription();
28392851
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetPartitioningSettings().GetPartitionSizeMb(), partitionSizeMb);
28402852
}
2853+
2854+
constexpr TStringBuf readReplicasModeAsString = "PER_AZ";
2855+
constexpr auto readReplicasMode = NYdb::NTable::TReadReplicasSettings::EMode::PerAz;
2856+
constexpr ui64 readReplicasCount = 1;
2857+
{
2858+
const auto result = executeGeneric(Sprintf(R"(
2859+
ALTER TABLE `/Root/SecondaryKeys` ALTER INDEX Index SET READ_REPLICAS_SETTINGS "%s:%)" PRIu64 R"(";
2860+
)", readReplicasModeAsString.data(), readReplicasCount
2861+
)
2862+
);
2863+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
2864+
}
2865+
{
2866+
auto describe = session.DescribeTable("/Root/SecondaryKeys/Index/indexImplTable").GetValueSync();
2867+
UNIT_ASSERT_C(describe.IsSuccess(), describe.GetIssues().ToString());
2868+
auto indexDesc = describe.GetTableDescription();
2869+
UNIT_ASSERT(indexDesc.GetReadReplicasSettings());
2870+
UNIT_ASSERT(indexDesc.GetReadReplicasSettings()->GetMode() == readReplicasMode);
2871+
UNIT_ASSERT_VALUES_EQUAL(indexDesc.GetReadReplicasSettings()->GetReadReplicasCount(), readReplicasCount);
2872+
}
28412873
}
28422874

28432875
Y_UNIT_TEST(AlterTableAlterVectorIndex) {

ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ TVector<ISubOperation::TPtr> CreateConsistentAlterTable(TOperationId id, const T
751751
if (!(IsAdministrator(AppData(), context.UserToken.Get()) && !AppData()->AdministrationAllowedSIDs.empty())
752752
&& (!CheckAllowedFields(alter, {"Name", "PathId", "PartitionConfig", "ReplicationConfig", "IncrementalBackupConfig"})
753753
|| (alter.HasPartitionConfig()
754-
&& !CheckAllowedFields(alter.GetPartitionConfig(), {"PartitioningPolicy"})
754+
&& !CheckAllowedFields(alter.GetPartitionConfig(), {"PartitioningPolicy", "FollowerCount", "FollowerGroups"})
755755
)
756756
)
757757
) {

ydb/core/tx/schemeshard/schemeshard_utils.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,10 @@ NKikimrSchemeOp::TPartitionConfig PartitionConfigForIndexes(
136136
if (baseTablePartitionConfig.HasKeepSnapshotTimeout()) {
137137
result.SetKeepSnapshotTimeout(baseTablePartitionConfig.GetKeepSnapshotTimeout());
138138
}
139+
if (indexTableDesc.GetPartitionConfig().FollowerGroupsSize()) {
140+
result.MutableFollowerGroups()->CopyFrom(indexTableDesc.GetPartitionConfig().GetFollowerGroups());
141+
}
139142
// skip repeated NKikimrStorageSettings.TStorageRoom StorageRooms = 17;
140-
// skip optional NKikimrHive.TFollowerGroup FollowerGroup = 23;
141143

142144
return result;
143145
}

ydb/core/ydb_convert/table_description.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -990,6 +990,7 @@ void FillGlobalIndexSettings(Ydb::Table::GlobalIndexSettings& settings,
990990
}
991991

992992
FillPartitioningSettingsImpl(settings, indexImplTableDescription);
993+
FillReadReplicasSettings(settings, indexImplTableDescription);
993994
}
994995

995996
template <typename TYdbProto>
@@ -1577,6 +1578,11 @@ void FillReadReplicasSettings(Ydb::Table::CreateTableRequest& out,
15771578
FillReadReplicasSettingsImpl(out, in);
15781579
}
15791580

1581+
void FillReadReplicasSettings(Ydb::Table::GlobalIndexSettings& out,
1582+
const NKikimrSchemeOp::TTableDescription& in) {
1583+
FillReadReplicasSettingsImpl(out, in);
1584+
}
1585+
15801586
bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out,
15811587
const Ydb::Table::CreateTableRequest& in, const TTableProfiles& profiles,
15821588
Ydb::StatusIds::StatusCode& status, TString& error, bool indexedTable)

ydb/core/ydb_convert/table_description.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ void FillReadReplicasSettings(Ydb::Table::DescribeTableResult& out,
136136
const NKikimrSchemeOp::TTableDescription& in);
137137
void FillReadReplicasSettings(Ydb::Table::CreateTableRequest& out,
138138
const NKikimrSchemeOp::TTableDescription& in);
139+
void FillReadReplicasSettings(Ydb::Table::GlobalIndexSettings& out,
140+
const NKikimrSchemeOp::TTableDescription& in);
139141

140142
// in
141143
bool FillTableDescription(NKikimrSchemeOp::TModifyScheme& out,

ydb/core/ydb_convert/table_settings.cpp

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,9 +403,10 @@ bool FillIndexTablePartitioning(
403403
) {
404404
auto fillIndexPartitioning = [&](const Ydb::Table::GlobalIndexSettings& settings, std::vector<NKikimrSchemeOp::TTableDescription>& indexImplTableDescriptions) {
405405
auto& indexImplTableDescription = indexImplTableDescriptions.emplace_back();
406+
auto& partitionConfig = *indexImplTableDescription.MutablePartitionConfig();
406407

407408
if (settings.has_partitioning_settings()) {
408-
if (!FillPartitioningPolicy(*indexImplTableDescription.MutablePartitionConfig(), settings, code, error)) {
409+
if (!FillPartitioningPolicy(partitionConfig, settings, code, error)) {
409410
return false;
410411
}
411412
}
@@ -414,6 +415,30 @@ bool FillIndexTablePartitioning(
414415
return false;
415416
}
416417
}
418+
if (settings.has_read_replicas_settings()) {
419+
const auto& readReplicasSettings = settings.read_replicas_settings();
420+
switch (readReplicasSettings.settings_case()) {
421+
case Ydb::Table::ReadReplicasSettings::kPerAzReadReplicasCount:
422+
{
423+
auto& followerGroup = *partitionConfig.AddFollowerGroups();
424+
followerGroup.SetFollowerCount(readReplicasSettings.per_az_read_replicas_count());
425+
followerGroup.SetRequireAllDataCenters(true);
426+
followerGroup.SetFollowerCountPerDataCenter(true);
427+
break;
428+
}
429+
case Ydb::Table::ReadReplicasSettings::kAnyAzReadReplicasCount:
430+
{
431+
auto& followerGroup = *partitionConfig.AddFollowerGroups();
432+
followerGroup.SetFollowerCount(readReplicasSettings.any_az_read_replicas_count());
433+
followerGroup.SetRequireAllDataCenters(false);
434+
break;
435+
}
436+
default:
437+
code = Ydb::StatusIds::BAD_REQUEST;
438+
error = TStringBuilder() << "Unknown read_replicas_settings type";
439+
return false;
440+
}
441+
}
417442
return true;
418443
};
419444

0 commit comments

Comments
 (0)