Skip to content

Commit 43cb92c

Browse files
committed
Test for read from table with followers (#18732)
1 parent 7435c72 commit 43cb92c

File tree

2 files changed

+91
-114
lines changed

2 files changed

+91
-114
lines changed

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

Lines changed: 81 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,112 +2334,75 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
23342334
}
23352335
}
23362336

2337-
Y_UNIT_TEST(StaleRO) {
2338-
auto kikimr = DefaultKikimrRunner();
2339-
auto db = kikimr.GetTableClient();
2340-
auto session = db.CreateSession().GetValueSync().GetSession();
2337+
Y_UNIT_TEST_TWIN(StaleRO, EnableFollowers) {
2338+
auto settings = TKikimrSettings()
2339+
.SetEnableForceFollowers(EnableFollowers)
2340+
.SetWithSampleTables(false);
23412341

2342-
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
2343-
--!syntax_v1
2344-
CREATE TABLE `FollowersKv` (
2342+
TKikimrRunner kikimr(settings);
2343+
auto tableClient = kikimr.GetTableClient();
2344+
auto queryClient = kikimr.GetQueryClient();
2345+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
2346+
2347+
AssertSuccessResult(session.ExecuteSchemeQuery(Sprintf(Q_(R"(
2348+
CREATE TABLE `Table` (
23452349
Key Uint64,
23462350
Value String,
23472351
PRIMARY KEY (Key)
23482352
)
23492353
WITH (
2350-
PARTITION_AT_KEYS = (10, 20, 30),
2351-
READ_REPLICAS_SETTINGS = "ANY_AZ:1"
2354+
PARTITION_AT_KEYS = (10, 20, 30)
2355+
%s
23522356
);
2353-
)").GetValueSync());
2357+
)").c_str(), EnableFollowers ? ", READ_REPLICAS_SETTINGS = \"ANY_AZ:3\"" : "")
2358+
).GetValueSync());
23542359

2355-
AssertSuccessResult(session.ExecuteDataQuery(R"(
2356-
--!syntax_v1
2357-
2358-
REPLACE INTO `FollowersKv` (Key, Value) VALUES
2360+
AssertSuccessResult(session.ExecuteDataQuery(Q_(R"(
2361+
UPSERT INTO `Table` (Key, Value) VALUES
23592362
(1u, "One"),
23602363
(11u, "Two"),
23612364
(21u, "Three"),
23622365
(31u, "Four");
2363-
)", TTxControl::BeginTx().CommitTx()).GetValueSync());
2366+
)"), TTxControl::BeginTx().CommitTx()).GetValueSync());
23642367

2365-
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NActors::NLog::PRI_INFO);
2366-
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::PIPE_CLIENT, NActors::NLog::PRI_DEBUG);
2367-
//kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::PIPE_SERVER, NActors::NLog::PRI_DEBUG);
23682368

2369-
// Followers immediate
2370-
auto result = session.ExecuteDataQuery(R"(
2371-
--!syntax_v1
2372-
SELECT * FROM FollowersKv WHERE Key = 21;
2373-
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2374-
AssertSuccessResult(result);
2375-
UNIT_ASSERT_UNEQUAL(0, GetCumulativeCounterValue(
2376-
kikimr.GetTestServer(),
2377-
"/Root/FollowersKv",
2378-
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2379-
));
2369+
auto checkRead = [&](const TString& query, const TString& expectedResult, const TString& tablePath) {
2370+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2371+
AssertSuccessResult(result);
2372+
CompareYson(expectedResult, FormatResultSetYson(result.GetResultSet(0)));
23802373

2381-
CompareYson(R"(
2374+
if (EnableFollowers) {
2375+
// from master - should NOT read
2376+
CheckTableReads(session, tablePath, false, false);
2377+
// from followers - should read
2378+
CheckTableReads(session, tablePath, true, true);
2379+
} else {
2380+
// from master - should read
2381+
CheckTableReads(session, tablePath, false, true);
2382+
// from followers - should NOT read
2383+
CheckTableReads(session, tablePath, true, false);
2384+
}
2385+
};
2386+
2387+
// immediate
2388+
checkRead(Q_(R"(
2389+
SELECT * FROM Table WHERE Key = 21;
2390+
)"), R"(
23822391
[
23832392
[[21u];["Three"]];
23842393
]
2385-
)", FormatResultSetYson(result.GetResultSet(0)));
2386-
2387-
// Followers distributed
2388-
result = session.ExecuteDataQuery(R"(
2389-
--!syntax_v1
2390-
SELECT * FROM FollowersKv WHERE Value != "One" ORDER BY Key;
2391-
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2392-
AssertSuccessResult(result);
2393-
UNIT_ASSERT_UNEQUAL(0, GetCumulativeCounterValue(
2394-
kikimr.GetTestServer(),
2395-
"/Root/FollowersKv",
2396-
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2397-
));
2394+
)", "/Root/Table");
23982395

2399-
CompareYson(R"(
2396+
// distributed
2397+
checkRead(Q_(R"(
2398+
SELECT * FROM Table WHERE Value != "One" ORDER BY Key;
2399+
)"), R"(
24002400
[
24012401
[[11u];["Two"]];
24022402
[[21u];["Three"]];
24032403
[[31u];["Four"]];
24042404
]
2405-
)", FormatResultSetYson(result.GetResultSet(0)));
2406-
2407-
// No followers immediate
2408-
result = session.ExecuteDataQuery(R"(
2409-
--!syntax_v1
2410-
SELECT * FROM TwoShard WHERE Key = 2;
2411-
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2412-
AssertSuccessResult(result);
2413-
UNIT_ASSERT_EQUAL(0, GetCumulativeCounterValue(
2414-
kikimr.GetTestServer(),
2415-
"/Root/TwoShard",
2416-
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2417-
));
2418-
2419-
CompareYson(R"(
2420-
[
2421-
[[2u];["Two"];[0]];
2422-
]
2423-
)", FormatResultSetYson(result.GetResultSet(0)));
2424-
2425-
// No followers distributed
2426-
result = session.ExecuteDataQuery(R"(
2427-
--!syntax_v1
2428-
SELECT * FROM TwoShard WHERE Value2 < 0 ORDER BY Key;
2429-
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2430-
AssertSuccessResult(result);
2431-
2432-
CompareYson(R"(
2433-
[
2434-
[[1u];["One"];[-1]];
2435-
[[4000000001u];["BigOne"];[-1]]
2436-
]
2437-
)", FormatResultSetYson(result.GetResultSet(0)));
2438-
UNIT_ASSERT_EQUAL(0, GetCumulativeCounterValue(
2439-
kikimr.GetTestServer(),
2440-
"/Root/TwoShard",
2441-
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2442-
));
2405+
)", "/Root/Table");
24432406
}
24442407

24452408
Y_UNIT_TEST(StaleRO_Immediate) {
@@ -2465,63 +2428,67 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
24652428
}
24662429

24672430
Y_UNIT_TEST_TWIN(StaleRO_IndexFollowers, EnableFollowers) {
2468-
auto kikimr = DefaultKikimrRunner();
2431+
auto settings = TKikimrSettings()
2432+
.SetEnableForceFollowers(EnableFollowers)
2433+
.SetWithSampleTables(false);
2434+
TKikimrRunner kikimr(settings);
24692435
auto db = kikimr.GetTableClient();
24702436
auto session = db.CreateSession().GetValueSync().GetSession();
24712437

2472-
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
2473-
--!syntax_v1
2474-
CREATE TABLE `KeySubkey` (
2438+
AssertSuccessResult(session.ExecuteSchemeQuery(Q_(R"(
2439+
CREATE TABLE Table (
24752440
Key Uint64,
24762441
Subkey Uint64,
24772442
Value String,
24782443
Order Uint32,
24792444
PRIMARY KEY (Key, Subkey)
24802445
);
24812446
2482-
ALTER TABLE `KeySubkey` ADD INDEX `idx` GLOBAL SYNC ON (`Key`, `Order`) COVER (`Value`);
2483-
)").GetValueSync());
2447+
ALTER TABLE Table ADD INDEX idx GLOBAL SYNC ON (Key, Order) COVER (Value);
2448+
)")).GetValueSync());
24842449

24852450
if constexpr (EnableFollowers) {
2486-
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
2487-
--!syntax_v1
2488-
ALTER TABLE `KeySubkey` ALTER INDEX `idx` SET READ_REPLICAS_SETTINGS "PER_AZ:1";
2489-
)").GetValueSync());
2451+
AssertSuccessResult(session.ExecuteSchemeQuery(Q_(R"(
2452+
ALTER TABLE Table ALTER INDEX idx SET READ_REPLICAS_SETTINGS "ANY_AZ:1";
2453+
)")).GetValueSync());
24902454
}
24912455

2492-
AssertSuccessResult(session.ExecuteDataQuery(R"(
2493-
--!syntax_v1
2494-
2495-
REPLACE INTO `KeySubkey` (`Key`, `Subkey`, `Value`, `Order`) VALUES
2456+
AssertSuccessResult(session.ExecuteDataQuery(Q_(R"(
2457+
UPSERT INTO Table (Key, Subkey, Value, Order) VALUES
24962458
(1u, 2u, "One", 7u),
24972459
(1u, 3u, "Two", 4u),
24982460
(21u, 8u, "Three", 1u),
24992461
(31u, 0u, "Four", 8u);
2500-
)", TTxControl::BeginTx().CommitTx()).GetValueSync());
2462+
)"), TTxControl::BeginTx().CommitTx()).GetValueSync());
25012463

2502-
auto result = session.ExecuteDataQuery(R"(
2503-
--!syntax_v1
2504-
SELECT * FROM `KeySubkey` VIEW `idx` WHERE Key = 1 ORDER BY `Order`;
2505-
)", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
2464+
auto result = session.ExecuteDataQuery(Q_(R"(
2465+
SELECT Key, Value FROM Table VIEW idx WHERE Key = 1 ORDER BY Order;
2466+
)"), TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync();
25062467
AssertSuccessResult(result);
25072468

2508-
const auto FollowerCpuTime = GetCumulativeCounterValue(
2509-
kikimr.GetTestServer(),
2510-
"/Root/KeySubkey/idx/indexImplTable",
2511-
"DataShard/TxUpdateFollowerReadEdge/ExecuteCPUTime"
2512-
);
2513-
if constexpr (EnableFollowers) {
2514-
UNIT_ASSERT_UNEQUAL(0, FollowerCpuTime);
2515-
} else {
2516-
UNIT_ASSERT_EQUAL(0, FollowerCpuTime);
2517-
}
2518-
25192469
CompareYson(R"(
25202470
[
2521-
[[1u];[4u];[3u];["Two"]];
2522-
[[1u];[7u];[2u];["One"]];
2471+
[[1u];["Two"]];
2472+
[[1u];["One"]];
25232473
]
25242474
)", FormatResultSetYson(result.GetResultSet(0)));
2475+
2476+
// from main master - should NOT read
2477+
CheckTableReads(session, "/Root/Table", false, false);
2478+
// from main followers - should NOT read
2479+
CheckTableReads(session, "/Root/Table", true, false);
2480+
2481+
if constexpr (EnableFollowers) {
2482+
// from index master - should NOT read
2483+
CheckTableReads(session, "/Root/Table/idx/indexImplTable", false, false);
2484+
// from index followers - should read
2485+
CheckTableReads(session, "/Root/Table/idx/indexImplTable", true, true);
2486+
} else {
2487+
// from index master - should read
2488+
CheckTableReads(session, "/Root/Table/idx/indexImplTable", false, true);
2489+
// from index followers - should NOT read
2490+
CheckTableReads(session, "/Root/Table/idx/indexImplTable", true, false);
2491+
}
25252492
}
25262493

25272494
Y_UNIT_TEST(ReadRangeWithParams) {

ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,11 @@ order by SessionId;)", "%Y-%m-%d %H:%M:%S %Z", sessionsSet.front().GetId().data(
719719
])", actual);
720720
}
721721

722+
// from master - should read
723+
CheckTableReads(session, "/Root/Followers", false, true);
724+
// from followers - should NOT read yet
725+
CheckTableReads(session, "/Root/Followers", true, false);
726+
722727
Cerr << "... SELECT from follower" << Endl;
723728
{
724729
auto result = session.ExecuteDataQuery(R"(
@@ -734,6 +739,11 @@ order by SessionId;)", "%Y-%m-%d %H:%M:%S %Z", sessionsSet.front().GetId().data(
734739
])", actual);
735740
}
736741

742+
// from master - should read
743+
CheckTableReads(session, "/Root/Followers", false, true);
744+
// from followers - should read
745+
CheckTableReads(session, "/Root/Followers", true, true);
746+
737747
for (size_t attempt = 0; attempt < 30; ++attempt)
738748
{
739749
Cerr << "... SELECT from partition_stats, attempt " << attempt << Endl;

0 commit comments

Comments
 (0)