Skip to content

Commit c85b979

Browse files
authored
24-3: Fix excessive read latency during and after shard splits (#11061)
1 parent 8ae4726 commit c85b979

File tree

3 files changed

+85
-5
lines changed

3 files changed

+85
-5
lines changed

ydb/core/tx/datashard/datashard__read_iterator.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2645,6 +2645,16 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
26452645
return;
26462646
}
26472647

2648+
if (State == TShardState::PreOffline ||
2649+
State == TShardState::Offline)
2650+
{
2651+
replyWithError(
2652+
Ydb::StatusIds::NOT_FOUND,
2653+
TStringBuilder() << "Shard " << TabletID() << " finished splitting/merging"
2654+
<< " (node# " << SelfId().NodeId() << " state# " << DatashardStateName(State) << ")");
2655+
return;
2656+
}
2657+
26482658
if (!IsStateNewReadAllowed()) {
26492659
replyWithError(
26502660
Ydb::StatusIds::OVERLOADED,

ydb/core/tx/datashard/datashard_ut_common_kqp.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,10 +186,14 @@ namespace NKqpHelpers {
186186
return KqpSimpleExec(runtime, query, true, database);
187187
}
188188

189-
inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) {
189+
inline auto KqpSimpleBeginSend(TTestActorRuntime& runtime, TString& sessionId, const TString& query) {
190190
sessionId = CreateSessionRPC(runtime);
191+
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, /* txId */ {}, false /* commitTx */));
192+
}
193+
194+
inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) {
191195
txId.clear();
192-
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */)));
196+
auto response = AwaitResponse(runtime, KqpSimpleBeginSend(runtime, sessionId, query));
193197
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
194198
return TStringBuilder() << "ERROR: " << response.operation().status();
195199
}

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3970,7 +3970,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
39703970
Y_UNIT_TEST(HandleMvccGoneInContinue) {
39713971
// TODO
39723972
}
3973-
};
3973+
}
39743974

39753975
Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {
39763976
Y_UNIT_TEST(ShouldRead) {
@@ -4054,7 +4054,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {
40544054

40554055
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED);
40564056
}
4057-
};
4057+
}
40584058

40594059
Y_UNIT_TEST_SUITE(DataShardReadIteratorState) {
40604060
Y_UNIT_TEST(ShouldCalculateQuota) {
@@ -4105,7 +4105,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorState) {
41054105
UNIT_ASSERT_VALUES_EQUAL(state.Quota.Bytes, 131729);
41064106
UNIT_ASSERT(state.State == NDataShard::TReadIteratorState::EState::Executing);
41074107
}
4108-
};
4108+
}
41094109

41104110
Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
41114111
Y_UNIT_TEST(CancelPageFaultedReadThenDropTable) {
@@ -4755,4 +4755,70 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
47554755

47564756
}
47574757

4758+
Y_UNIT_TEST_SUITE(DataShardReadIteratorLatency) {
4759+
4760+
Y_UNIT_TEST(ReadSplitLatency) {
4761+
TPortManager pm;
4762+
TServerSettings serverSettings(pm.GetPort(2134));
4763+
serverSettings.SetDomainName("Root")
4764+
.SetUseRealThreads(false);
4765+
TServer::TPtr server = new TServer(serverSettings);
4766+
4767+
auto& runtime = *server->GetRuntime();
4768+
auto sender = runtime.AllocateEdgeActor();
4769+
4770+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
4771+
4772+
InitRoot(server, sender);
4773+
4774+
TDisableDataShardLogBatching disableDataShardLogBatching;
4775+
4776+
CreateShardedTable(server, sender, "/Root", "table-1", 1);
4777+
4778+
// Insert initial data
4779+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);");
4780+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);");
4781+
4782+
// Copy table (this will ensure original shards stay alive after split)
4783+
{
4784+
auto senderCopy = runtime.AllocateEdgeActor();
4785+
ui64 txId = AsyncCreateCopyTable(server, senderCopy, "/Root", "table-2", "/Root/table-1");
4786+
WaitTxNotification(server, senderCopy, txId);
4787+
}
4788+
4789+
TBlockEvents<TEvDataShard::TEvRead> blockedReads(runtime);
4790+
4791+
Cerr << "... starting read from table-1" << Endl;
4792+
TString readSessionId;
4793+
auto readFuture = KqpSimpleBeginSend(runtime, readSessionId, R"(
4794+
SELECT * FROM `/Root/table-1` ORDER BY key;
4795+
)");
4796+
4797+
runtime.WaitFor("blocked TEvRead", [&]{ return blockedReads.size() >= 1; });
4798+
4799+
{
4800+
Cerr << "... splitting table-1" << Endl;
4801+
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
4802+
auto shards1before = GetTableShards(server, sender, "/Root/table-1");
4803+
ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards1before.at(0), 5);
4804+
Cerr << "... split txId# " << txId << " started" << Endl;
4805+
WaitTxNotification(server, sender, txId);
4806+
Cerr << "... split txId# " << txId << " finished" << Endl;
4807+
}
4808+
4809+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
4810+
4811+
auto readStartTs = runtime.GetCurrentTime();
4812+
blockedReads.Unblock();
4813+
blockedReads.Stop();
4814+
auto readResponse = runtime.WaitFuture(std::move(readFuture));
4815+
UNIT_ASSERT_VALUES_EQUAL(readResponse.operation().status(), Ydb::StatusIds::SUCCESS);
4816+
auto readLatency = runtime.GetCurrentTime() - readStartTs;
4817+
Cerr << "... read latency was " << readLatency << Endl;
4818+
UNIT_ASSERT_C(readLatency < TDuration::MilliSeconds(100),
4819+
"unexpected read latency " << readLatency);
4820+
}
4821+
4822+
}
4823+
47584824
} // namespace NKikimr

0 commit comments

Comments
 (0)