Skip to content

Commit d9cefca

Browse files
committed
Fix ydb read rows timeout after ds tablets restart (#17925)
1 parent 161992c commit d9cefca

File tree

3 files changed

+111
-0
lines changed

3 files changed

+111
-0
lines changed

ydb/core/grpc_services/rpc_read_rows.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,14 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
732732
SendResult(status, errorMsg, issues);
733733
}
734734

735+
void Handle(TEvents::TEvUndelivered::TPtr&) {
736+
return ReplyWithError(Ydb::StatusIds::INTERNAL_ERROR, "Internal error: pipe cache is not available, the cluster might not be configured properly");
737+
}
738+
739+
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev) {
740+
return ReplyWithError(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Failed to connect to shard " << ev->Get()->TabletId);
741+
}
742+
735743
void PassAway() override {
736744
Send(PipeCache, new TEvPipeCache::TEvUnlink(0));
737745
if (TimeoutTimerActorId) {
@@ -748,6 +756,9 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
748756
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
749757
hFunc(TEvDataShard::TEvReadResult, Handle);
750758

759+
hFunc(TEvents::TEvUndelivered, Handle);
760+
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
761+
751762
hFunc(TEvents::TEvWakeup, HandleTimeout);
752763
}
753764
}

ydb/services/ydb/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ SRCS(
2525
ydb_olapstore_ut.cpp
2626
ydb_monitoring_ut.cpp
2727
ydb_query_ut.cpp
28+
ydb_read_rows_ut.cpp
2829
ydb_ldap_login_ut.cpp
2930
ydb_login_ut.cpp
3031
ydb_object_storage_ut.cpp

ydb/services/ydb/ydb_read_rows_ut.cpp

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#include <ydb/core/grpc_services/base/base.h>
2+
3+
#include <ydb/core/tx/datashard/defs.h>
4+
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
5+
#include <ydb/core/tx/datashard/datashard_ut_common_kqp.h>
6+
7+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
8+
9+
#include <library/cpp/testing/unittest/registar.h>
10+
11+
namespace NKikimr {
12+
13+
using namespace NKikimr::NDataShard::NKqpHelpers;
14+
using namespace NSchemeShard;
15+
using namespace Tests;
16+
17+
namespace {
18+
19+
using TEvReadRowsRequest = NGRpcService::TGrpcRequestNoOperationCall<Ydb::Table::ReadRowsRequest, Ydb::Table::ReadRowsResponse>;
20+
21+
using TRows = TVector<std::pair<TSerializedCellVec, TString>>;
22+
using TRowTypes = TVector<std::pair<TString, Ydb::Type>>;
23+
24+
25+
Ydb::Table::ReadRowsRequest MakeReadRowsRequest(const TString& tablePath, const TVector<ui32>& keys) {
26+
Ydb::Table::ReadRowsRequest request;
27+
request.set_path(tablePath);
28+
29+
NYdb::TValueBuilder keysBuilder;
30+
keysBuilder.BeginList();
31+
for (ui32 key : keys) {
32+
keysBuilder.AddListItem().BeginStruct().AddMember("key").Uint32(key).EndStruct();
33+
}
34+
keysBuilder.EndList();
35+
36+
auto keysValuesCpp = keysBuilder.Build();
37+
auto keysTypeCpp = keysValuesCpp.GetType();
38+
request.mutable_keys()->mutable_type()->CopyFrom(NYdb::TProtoAccessor::GetProto(keysTypeCpp));
39+
request.mutable_keys()->mutable_value()->CopyFrom(NYdb::TProtoAccessor::GetProto(keysValuesCpp));
40+
41+
return request;
42+
}
43+
44+
} // namespace
45+
46+
Y_UNIT_TEST_SUITE(ReadRows) {
47+
48+
Y_UNIT_TEST(KillTabletDuringRead) {
49+
// Init cluster
50+
TPortManager pm;
51+
TServerSettings serverSettings(pm.GetPort(2134));
52+
serverSettings.SetDomainName("Root")
53+
.SetUseRealThreads(false);
54+
55+
Tests::TServer::TPtr server = new TServer(serverSettings);
56+
auto &runtime = *server->GetRuntime();
57+
auto sender = runtime.AllocateEdgeActor();
58+
59+
InitRoot(server, sender);
60+
61+
// Create table
62+
CreateShardedTable(server, sender, "/Root", "table-1", 1, false);
63+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500);");
64+
65+
// Check normal behavior
66+
{
67+
Ydb::Table::ReadRowsRequest request = MakeReadRowsRequest("/Root/table-1", {1, 5});
68+
auto readRowsFuture = NRpcService::DoLocalRpc<TEvReadRowsRequest>(
69+
std::move(request), "/Root", "", runtime.GetActorSystem(0));
70+
auto res = runtime.WaitFuture(readRowsFuture, TDuration::Seconds(10));
71+
UNIT_ASSERT_VALUES_EQUAL(res.status(), ::Ydb::StatusIds::SUCCESS);
72+
}
73+
74+
// Get tablet id of the only table shard
75+
auto tablets = GetTableShards(server, sender, "/Root/table-1");
76+
UNIT_ASSERT(tablets.size() == 1);
77+
ui64 tabletId = tablets.at(0);
78+
79+
// Reboot tablet during read
80+
auto dsReadResultOberver = runtime.AddObserver<TEvDataShard::TEvReadResult>([&](TEvDataShard::TEvReadResult::TPtr& ev) {
81+
Cerr << "Stoping tablet id: " << tabletId;
82+
RebootTablet(runtime, tabletId, sender);
83+
ev.Reset();
84+
});
85+
86+
// Check read with tablet reboot
87+
{
88+
Ydb::Table::ReadRowsRequest request = MakeReadRowsRequest("/Root/table-1", {1, 5});
89+
auto readRowsFuture = NRpcService::DoLocalRpc<TEvReadRowsRequest>(
90+
std::move(request), "/Root", "", runtime.GetActorSystem(0));
91+
auto res = runtime.WaitFuture(readRowsFuture, TDuration::Seconds(10));
92+
UNIT_ASSERT_VALUES_EQUAL(res.status(), ::Ydb::StatusIds::UNAVAILABLE);
93+
}
94+
95+
}
96+
97+
}
98+
99+
} // namespace NKikimr

0 commit comments

Comments
 (0)