Skip to content

Commit d3a36a5

Browse files
authored
Fix follower reads sometimes crashing during split (#12151)
1 parent ad82e86 commit d3a36a5

File tree

3 files changed

+91
-2
lines changed

3 files changed

+91
-2
lines changed

ydb/core/tx/datashard/datashard__read_iterator.cpp

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2739,11 +2739,30 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
27392739
return true;
27402740
}
27412741

2742+
Result = MakeEvReadResult(ctx.SelfID.NodeId());
2743+
2744+
if (Self->IsFollower()) {
2745+
NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
2746+
TString errMessage;
2747+
2748+
if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) {
2749+
return false;
2750+
}
2751+
2752+
if (status != NKikimrTxDataShard::TError::OK) {
2753+
SetStatusError(
2754+
Result->Record,
2755+
Ydb::StatusIds::INTERNAL_ERROR,
2756+
TStringBuilder() << "Failed to sync follower: " << errMessage
2757+
<< " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")");
2758+
SendResult(ctx);
2759+
return true;
2760+
}
2761+
}
2762+
27422763
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << state.ReadId
27432764
<< ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery);
27442765

2745-
Result = MakeEvReadResult(ctx.SelfID.NodeId());
2746-
27472766
const auto& tableId = state.PathId.LocalPathId;
27482767
if (state.PathId.OwnerId == Self->GetPathOwnerId()) {
27492768
auto it = Self->TableInfos.find(tableId);

ydb/core/tx/datashard/datashard_ut_common_kqp.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ namespace NKqpHelpers {
182182
return FormatResult(response);
183183
}
184184

185+
inline auto KqpSimpleStaleRoSend(TTestActorRuntime& runtime, const TString& query, const TString& database = {}) {
186+
return KqpSimpleSend(runtime, query, true, database);
187+
}
188+
185189
inline TString KqpSimpleStaleRoExec(TTestActorRuntime& runtime, const TString& query, const TString& database = {}) {
186190
return KqpSimpleExec(runtime, query, true, database);
187191
}

ydb/core/tx/datashard/datashard_ut_followers.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "datashard_ut_common_kqp.h"
33
#include "datashard_ut_read_table.h"
44

5+
#include <ydb/core/testlib/actors/block_events.h>
56
#include <ydb/library/actors/core/mon.h>
67

78
namespace NKikimr {
@@ -720,6 +721,71 @@ Y_UNIT_TEST_SUITE(DataShardFollowers) {
720721
UNIT_ASSERT_EQUAL(readDataPages, 3);
721722
}
722723

724+
Y_UNIT_TEST(FollowerReadDuringSplit) {
725+
TPortManager pm;
726+
TServerSettings serverSettings(pm.GetPort(2134));
727+
serverSettings.SetDomainName("Root")
728+
.SetUseRealThreads(false)
729+
.SetEnableForceFollowers(true);
730+
731+
Tests::TServer::TPtr server = new TServer(serverSettings);
732+
auto &runtime = *server->GetRuntime();
733+
auto sender = runtime.AllocateEdgeActor();
734+
735+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
736+
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
737+
runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_TRACE);
738+
runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_DEBUG);
739+
740+
InitRoot(server, sender);
741+
742+
TDisableDataShardLogBatching disableDataShardLogBatching;
743+
UNIT_ASSERT_VALUES_EQUAL(
744+
KqpSchemeExec(runtime, R"(
745+
CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key))
746+
WITH (READ_REPLICAS_SETTINGS = "PER_AZ:1");
747+
)"),
748+
"SUCCESS");
749+
750+
auto shards = GetTableShards(server, sender, "/Root/table");
751+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1UL);
752+
753+
ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 11), (2, 22), (3, 33);");
754+
755+
// Wait for leader to promote the follower read edge (and stop writing to the Sys table)
756+
Cerr << "... sleeping after upsert" << Endl;
757+
runtime.SimulateSleep(TDuration::Seconds(1));
758+
759+
auto modifyReads = runtime.AddObserver<TEvDataShard::TEvRead>(
760+
[&](TEvDataShard::TEvRead::TPtr& ev) {
761+
ev->Get()->Record.SetMaxRowsInResult(1);
762+
});
763+
TBlockEvents<TEvDataShard::TEvReadContinue> blockedContinue(runtime);
764+
765+
auto readFuture = KqpSimpleStaleRoSend(runtime, "SELECT key, value FROM `/Root/table` ORDER BY key", "/Root");
766+
runtime.WaitFor("the first TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
767+
768+
Cerr << "... splitting table at key 3" << Endl;
769+
SetSplitMergePartCountLimit(&runtime, -1);
770+
ui64 txId = AsyncSplitTable(server, sender, "/Root/table", shards.at(0), 3);
771+
WaitTxNotification(server, sender, txId);
772+
773+
blockedContinue.Unblock().Stop();
774+
UNIT_ASSERT_VALUES_EQUAL(
775+
FormatResult(runtime.WaitFuture(std::move(readFuture))),
776+
"ERROR: UNAVAILABLE");
777+
778+
Cerr << "... reading from the left follower" << Endl;
779+
UNIT_ASSERT_VALUES_EQUAL(
780+
KqpSimpleStaleRoExec(runtime, "SELECT key, value FROM `/Root/table` WHERE key < 3 ORDER BY key"),
781+
"{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
782+
"{ items { uint32_value: 2 } items { uint32_value: 22 } }");
783+
Cerr << "... reading from the right follower" << Endl;
784+
UNIT_ASSERT_VALUES_EQUAL(
785+
KqpSimpleStaleRoExec(runtime, "SELECT key, value FROM `/Root/table` WHERE key >= 3 ORDER BY key"),
786+
"{ items { uint32_value: 3 } items { uint32_value: 33 } }");
787+
}
788+
723789
} // Y_UNIT_TEST_SUITE(DataShardFollowers)
724790

725791
} // namespace NKikimr

0 commit comments

Comments
 (0)