Skip to content

Commit ab10810

Browse files
authored
25-1: add followers support for secondary indexes (#17965) (#19310)
2 parents a5a96d9 + 9091efd commit ab10810

File tree

17 files changed

+509
-200
lines changed

17 files changed

+509
-200
lines changed

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,28 @@ namespace {
677677
return true;
678678
}
679679

680+
[[nodiscard]] bool ParseReadReplicasSettings(
681+
Ydb::Table::ReadReplicasSettings& readReplicasSettings,
682+
const TCoNameValueTuple& setting,
683+
TExprContext& ctx
684+
) {
685+
const auto replicasSettings = TString(
686+
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
687+
);
688+
689+
Ydb::StatusIds::StatusCode code;
690+
TString errText;
691+
if (!ConvertReadReplicasSettingsToProto(replicasSettings, readReplicasSettings, code, errText)) {
692+
693+
ctx.AddError(YqlIssue(ctx.GetPosition(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Pos()),
694+
NYql::YqlStatusFromYdbStatus(code),
695+
errText));
696+
return false;
697+
}
698+
699+
return true;
700+
}
701+
680702
bool ParseAsyncReplicationSettingsBase(
681703
TReplicationSettingsBase& dstSettings, const TCoNameValueTupleList& srcSettings, TExprContext& ctx, TPositionHandle pos,
682704
const TString& objectName = "replication"
@@ -1746,17 +1768,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
17461768
}
17471769

17481770
} else if (name == "readReplicasSettings") {
1749-
const auto replicasSettings = TString(
1750-
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
1751-
);
1752-
Ydb::StatusIds::StatusCode code;
1753-
TString errText;
1754-
if (!ConvertReadReplicasSettingsToProto(replicasSettings,
1755-
*alterTableRequest.mutable_set_read_replicas_settings(), code, errText)) {
1756-
1757-
ctx.AddError(YqlIssue(ctx.GetPosition(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Pos()),
1758-
NYql::YqlStatusFromYdbStatus(code),
1759-
errText));
1771+
if (!ParseReadReplicasSettings(*alterTableRequest.mutable_set_read_replicas_settings(), setting, ctx)) {
17601772
return SyncError();
17611773
}
17621774
} else if (name == "setTtlSettings") {
@@ -1907,12 +1919,17 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
19071919
} else if (settingName == "tableSettings") {
19081920
auto tableSettings = indexSetting.Value().Cast<TCoNameValueTupleList>();
19091921
for (const auto& tableSetting : tableSettings) {
1910-
if (IsPartitioningSetting(tableSetting.Name().Value())) {
1922+
const auto name = tableSetting.Name().Value();
1923+
if (IsPartitioningSetting(name)) {
19111924
if (!ParsePartitioningSettings(
19121925
*alterTableRequest.mutable_alter_partitioning_settings(), tableSetting, ctx
19131926
)) {
19141927
return SyncError();
19151928
}
1929+
} else if (name == "readReplicasSettings") {
1930+
if (!ParseReadReplicasSettings(*alterTableRequest.mutable_set_read_replicas_settings(), tableSetting, ctx)) {
1931+
return SyncError();
1932+
}
19161933
} else {
19171934
ctx.AddError(
19181935
TIssue(ctx.GetPosition(tableSetting.Name().Pos()),

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1508,6 +1508,65 @@ void WaitForZeroReadIterators(Tests::TServer& server, const TString& path) {
15081508
UNIT_ASSERT_C(iterators == 0, "Unable to wait for proper read iterator count, it looks like cancelation doesn`t work (" << iterators << ")");
15091509
}
15101510

1511+
int GetCumulativeCounterValue(Tests::TServer& server, const TString& path, const TString& counterName) {
1512+
int result = 0;
1513+
1514+
TTestActorRuntime* runtime = server.GetRuntime();
1515+
auto sender = runtime->AllocateEdgeActor();
1516+
auto shards = GetTableShards(&server, sender, path);
1517+
UNIT_ASSERT_C(shards.size() > 0, "Table: " << path << " has no shards");
1518+
1519+
for (auto x : shards) {
1520+
runtime->SendToPipe(
1521+
x,
1522+
sender,
1523+
new TEvTablet::TEvGetCounters,
1524+
0,
1525+
GetPipeConfigWithRetries());
1526+
1527+
auto ev = runtime->GrabEdgeEvent<TEvTablet::TEvGetCountersResponse>(sender);
1528+
UNIT_ASSERT(ev);
1529+
1530+
const NKikimrTabletBase::TEvGetCountersResponse& resp = ev->Get()->Record;
1531+
for (const auto& counter : resp.GetTabletCounters().GetAppCounters().GetCumulativeCounters()) {
1532+
if (counter.GetName() == counterName) {
1533+
result += counter.GetValue();
1534+
}
1535+
}
1536+
}
1537+
1538+
return result;
1539+
}
1540+
1541+
void CheckTableReads(NYdb::NTable::TSession& session, const TString& tableName, bool checkFollower, bool readsExpected) {
1542+
for (size_t attempt = 0; attempt < 30; ++attempt)
1543+
{
1544+
Cerr << "... SELECT from partition_stats for " << tableName << " , attempt " << attempt << Endl;
1545+
1546+
const TString selectPartitionStats(Q_(Sprintf(R"(
1547+
SELECT *
1548+
FROM `/Root/.sys/partition_stats`
1549+
WHERE FollowerId %s 0 AND (RowReads != 0 OR RangeReadRows != 0) AND Path = '%s'
1550+
)", (checkFollower ? "!=" : "="), tableName.c_str())));
1551+
1552+
auto result = session.ExecuteDataQuery(selectPartitionStats, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
1553+
AssertSuccessResult(result);
1554+
Cerr << selectPartitionStats << Endl;
1555+
1556+
auto rs = result.GetResultSet(0);
1557+
if (readsExpected) {
1558+
if (rs.RowsCount() != 0)
1559+
return;
1560+
Sleep(TDuration::Seconds(5));
1561+
} else {
1562+
if (rs.RowsCount() == 0)
1563+
return;
1564+
Y_FAIL("!readsExpected, but there are read stats for %s", tableName.c_str());
1565+
}
1566+
}
1567+
Y_FAIL("readsExpected, but there is timeout waiting for read stats from %s", tableName.c_str());
1568+
}
1569+
15111570
NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& params) {
15121571
if (auto ops = opt.GetMapSafe().find("Operators"); ops != opt.GetMapSafe().end()) {
15131572
auto opName = ops->second.GetArraySafe()[0].GetMapSafe().at("Name").GetStringSafe();

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ TVector<ui64> GetColumnTableShards(Tests::TServer* server, TActorId sender, cons
384384

385385
void WaitForZeroSessions(const NKqp::TKqpCounters& counters);
386386
void WaitForZeroReadIterators(Tests::TServer& server, const TString& path);
387+
int GetCumulativeCounterValue(Tests::TServer& server, const TString& path, const TString& counterName);
388+
389+
void CheckTableReads(NYdb::NTable::TSession& session, const TString& tableName, bool checkFollower, bool readsExpected);
387390

388391
bool JoinOrderAndAlgosMatch(const TString& optimized, const TString& reference);
389392

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

Lines changed: 109 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,92 +2334,75 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
23342334
}
23352335
}
23362336

2337-
Y_UNIT_TEST(StaleRO) {
2338-
auto kikimr = DefaultKikimrRunner();
2339-
auto db = kikimr.GetTableClient();
2340-
auto session = db.CreateSession().GetValueSync().GetSession();
2337+
Y_UNIT_TEST_TWIN(StaleRO, EnableFollowers) {
2338+
auto settings = TKikimrSettings()
2339+
.SetEnableForceFollowers(EnableFollowers)
2340+
.SetWithSampleTables(false);
23412341

2342-
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
2343-
--!syntax_v1
2344-
CREATE TABLE `FollowersKv` (
2342+
TKikimrRunner kikimr(settings);
2343+
auto tableClient = kikimr.GetTableClient();
2344+
auto queryClient = kikimr.GetQueryClient();
2345+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
2346+
2347+
AssertSuccessResult(session.ExecuteSchemeQuery(Sprintf(Q_(R"(
2348+
CREATE TABLE `Table` (
23452349
Key Uint64,
23462350
Value String,
23472351
PRIMARY KEY (Key)
23482352
)
23492353
WITH (
2350-
PARTITION_AT_KEYS = (10, 20, 30),
2351-
READ_REPLICAS_SETTINGS = "ANY_AZ:1"
2354+
PARTITION_AT_KEYS = (10, 20, 30)
2355+
%s
23522356
);
2353-
)").GetValueSync());
2357+
)").c_str(), EnableFollowers ? ", READ_REPLICAS_SETTINGS = \"ANY_AZ:3\"" : "")
2358+
).GetValueSync());
23542359

2355-
AssertSuccessResult(session.ExecuteDataQuery(R"(
2356-
--!syntax_v1
2357-
2358-
REPLACE INTO `FollowersKv` (Key, Value) VALUES
2360+
AssertSuccessResult(session.ExecuteDataQuery(Q_(R"(
2361+
UPSERT INTO `Table` (Key, Value) VALUES
23592362
(1u, "One"),
23602363
(11u, "Two"),
23612364
(21u, "Three"),
23622365
(31u, "Four");
2363-
)", TTxControl::BeginTx().CommitTx()).GetValueSync());
2366+
)"), TTxControl::BeginTx().CommitTx()).GetValueSync());
23642367

2365-
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_INFO);
2366-
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::PIPE_CLIENT, NActors::NLog::PRI_DEBUG);
2367-
//kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::PIPE_SERVER, NActors::NLog::PRI_DEBUG);
23682368

2369-
// Followers immediate
2370-
auto result = session.ExecuteDataQuery(R"(
2371-
--!syntax_v1
2372-
SELECT * FROM FollowersKv WHERE Key = 21;
2373-
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2374-
AssertSuccessResult(result);
2369+
auto checkRead = [&](const TString& query, const TString& expectedResult, const TString& tablePath) {
2370+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2371+
AssertSuccessResult(result);
2372+
CompareYson(expectedResult, FormatResultSetYson(result.GetResultSet(0)));
23752373

2376-
CompareYson(R"(
2374+
if (EnableFollowers) {
2375+
// from master - should NOT read
2376+
CheckTableReads(session, tablePath, false, false);
2377+
// from followers - should read
2378+
CheckTableReads(session, tablePath, true, true);
2379+
} else {
2380+
// from master - should read
2381+
CheckTableReads(session, tablePath, false, true);
2382+
// from followers - should NOT read
2383+
CheckTableReads(session, tablePath, true, false);
2384+
}
2385+
};
2386+
2387+
// immediate
2388+
checkRead(Q_(R"(
2389+
SELECT * FROM Table WHERE Key = 21;
2390+
)"), R"(
23772391
[
23782392
[[21u];["Three"]];
23792393
]
2380-
)", FormatResultSetYson(result.GetResultSet(0)));
2381-
2382-
// Followers distributed
2383-
result = session.ExecuteDataQuery(R"(
2384-
--!syntax_v1
2385-
SELECT * FROM FollowersKv WHERE Value != "One" ORDER BY Key;
2386-
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2387-
AssertSuccessResult(result);
2394+
)", "/Root/Table");
23882395

2389-
CompareYson(R"(
2396+
// distributed
2397+
checkRead(Q_(R"(
2398+
SELECT * FROM Table WHERE Value != "One" ORDER BY Key;
2399+
)"), R"(
23902400
[
23912401
[[11u];["Two"]];
23922402
[[21u];["Three"]];
23932403
[[31u];["Four"]];
23942404
]
2395-
)", FormatResultSetYson(result.GetResultSet(0)));
2396-
2397-
// No followers immediate
2398-
result = session.ExecuteDataQuery(R"(
2399-
--!syntax_v1
2400-
SELECT * FROM TwoShard WHERE Key = 2;
2401-
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2402-
AssertSuccessResult(result);
2403-
2404-
CompareYson(R"(
2405-
[
2406-
[[2u];["Two"];[0]];
2407-
]
2408-
)", FormatResultSetYson(result.GetResultSet(0)));
2409-
2410-
// No followers distributed
2411-
result = session.ExecuteDataQuery(R"(
2412-
--!syntax_v1
2413-
SELECT * FROM TwoShard WHERE Value2 < 0 ORDER BY Key;
2414-
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2415-
AssertSuccessResult(result);
2416-
2417-
CompareYson(R"(
2418-
[
2419-
[[1u];["One"];[-1]];
2420-
[[4000000001u];["BigOne"];[-1]]
2421-
]
2422-
)", FormatResultSetYson(result.GetResultSet(0)));
2405+
)", "/Root/Table");
24232406
}
24242407

24252408
Y_UNIT_TEST(StaleRO_Immediate) {
@@ -2444,6 +2427,70 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
24442427
)", FormatResultSetYson(result.GetResultSet(0)));
24452428
}
24462429

2430+
Y_UNIT_TEST_TWIN(StaleRO_IndexFollowers, EnableFollowers) {
2431+
auto settings = TKikimrSettings()
2432+
.SetEnableForceFollowers(EnableFollowers)
2433+
.SetWithSampleTables(false);
2434+
TKikimrRunner kikimr(settings);
2435+
auto db = kikimr.GetTableClient();
2436+
auto session = db.CreateSession().GetValueSync().GetSession();
2437+
2438+
AssertSuccessResult(session.ExecuteSchemeQuery(Q_(R"(
2439+
CREATE TABLE Table (
2440+
Key Uint64,
2441+
Subkey Uint64,
2442+
Value String,
2443+
Order Uint32,
2444+
PRIMARY KEY (Key, Subkey)
2445+
);
2446+
2447+
ALTER TABLE Table ADD INDEX idx GLOBAL SYNC ON (Key, Order) COVER (Value);
2448+
)")).GetValueSync());
2449+
2450+
if constexpr (EnableFollowers) {
2451+
AssertSuccessResult(session.ExecuteSchemeQuery(Q_(R"(
2452+
ALTER TABLE Table ALTER INDEX idx SET READ_REPLICAS_SETTINGS "ANY_AZ:1";
2453+
)")).GetValueSync());
2454+
}
2455+
2456+
AssertSuccessResult(session.ExecuteDataQuery(Q_(R"(
2457+
UPSERT INTO Table (Key, Subkey, Value, Order) VALUES
2458+
(1u, 2u, "One", 7u),
2459+
(1u, 3u, "Two", 4u),
2460+
(21u, 8u, "Three", 1u),
2461+
(31u, 0u, "Four", 8u);
2462+
)"), TTxControl::BeginTx().CommitTx()).GetValueSync());
2463+
2464+
auto result = session.ExecuteDataQuery(Q_(R"(
2465+
SELECT Key, Value FROM Table VIEW idx WHERE Key = 1 ORDER BY Order;
2466+
)"), TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2467+
AssertSuccessResult(result);
2468+
2469+
CompareYson(R"(
2470+
[
2471+
[[1u];["Two"]];
2472+
[[1u];["One"]];
2473+
]
2474+
)", FormatResultSetYson(result.GetResultSet(0)));
2475+
2476+
// from main master - should NOT read
2477+
CheckTableReads(session, "/Root/Table", false, false);
2478+
// from main followers - should NOT read
2479+
CheckTableReads(session, "/Root/Table", true, false);
2480+
2481+
if constexpr (EnableFollowers) {
2482+
// from index master - should NOT read
2483+
CheckTableReads(session, "/Root/Table/idx/indexImplTable", false, false);
2484+
// from index followers - should read
2485+
CheckTableReads(session, "/Root/Table/idx/indexImplTable", true, true);
2486+
} else {
2487+
// from index master - should read
2488+
CheckTableReads(session, "/Root/Table/idx/indexImplTable", false, true);
2489+
// from index followers - should NOT read
2490+
CheckTableReads(session, "/Root/Table/idx/indexImplTable", true, false);
2491+
}
2492+
}
2493+
24472494
Y_UNIT_TEST(ReadRangeWithParams) {
24482495
auto kikimr = DefaultKikimrRunner();
24492496
auto db = kikimr.GetTableClient();

0 commit comments

Comments
 (0)