Skip to content

Commit 6c7f004

Browse files
committed
Fixed pool limit exceeding after server restart in QueryClient (#13901)
1 parent f54fa07 commit 6c7f004

File tree

11 files changed

+233
-15
lines changed

11 files changed

+233
-15
lines changed

include/ydb-cpp-sdk/client/query/client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ class TSession {
151151
class TImpl;
152152
private:
153153
TSession();
154+
TSession(std::shared_ptr<TQueryClient::TImpl> client); // Create broken session
154155
TSession(std::shared_ptr<TQueryClient::TImpl> client, TSession::TImpl* sessionImpl);
155156

156157
std::shared_ptr<TQueryClient::TImpl> Client_;

src/client/query/client.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public
361361
TSession::TImpl::MakeImplAsync(processor, args);
362362
} else {
363363
TStatus st(std::move(status));
364-
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
364+
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession(args->Client)));
365365
}
366366
},
367367
&Ydb::Query::V1::QueryService::Stub::AsyncAttachSession,
@@ -384,13 +384,13 @@ class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public
384384
NYdb::NIssue::TIssues opIssues;
385385
NYdb::NIssue::IssuesFromMessage(resp->issues(), opIssues);
386386
TStatus st(static_cast<EStatus>(resp->status()), std::move(opIssues));
387-
promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
387+
promise.SetValue(TCreateSessionResult(std::move(st), TSession(self)));
388388
} else {
389389
self->DoAttachSession(resp, promise, status.Endpoint, self);
390390
}
391391
} else {
392392
TStatus st(std::move(status));
393-
promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
393+
promise.SetValue(TCreateSessionResult(std::move(st), TSession(self)));
394394
}
395395
};
396396

@@ -631,6 +631,14 @@ TSession TCreateSessionResult::GetSession() const {
631631
TSession::TSession()
632632
{}
633633

634+
TSession::TSession(std::shared_ptr<TQueryClient::TImpl> client)
635+
: Client_(client)
636+
, SessionImpl_(
637+
new TSession::TImpl(nullptr, "", "", client),
638+
TKqpSessionCommon::GetSmartDeleter(client)
639+
)
640+
{}
641+
634642
TSession::TSession(std::shared_ptr<TQueryClient::TImpl> client, TSession::TImpl* session)
635643
: Client_(client)
636644
, SessionImpl_(session, TKqpSessionCommon::GetSmartDeleter(client))

src/client/query/impl/client_session.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,12 @@ void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr<ISes
7979
case grpc::StatusCode::OK:
8080
StartAsyncRead(ptr, client, holder);
8181
break;
82-
case grpc::StatusCode::OUT_OF_RANGE: {
82+
default: {
8383
auto impl = holder->TrySharedOwning();
8484
if (impl) {
8585
impl->CloseFromServer(client);
8686
holder->Release();
8787
}
88-
break;
8988
}
9089
}
9190
});
@@ -96,14 +95,21 @@ TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const std::string& sessionId, co
9695
, StreamProcessor_(ptr)
9796
, SessionHolder(std::make_shared<TSafeTSessionImplHolder>(this))
9897
{
99-
MarkActive();
100-
SetNeedUpdateActiveCounter(true);
101-
StartAsyncRead(StreamProcessor_, client, SessionHolder);
98+
if (ptr) {
99+
MarkActive();
100+
SetNeedUpdateActiveCounter(true);
101+
StartAsyncRead(StreamProcessor_, client, SessionHolder);
102+
} else {
103+
MarkBroken();
104+
SetNeedUpdateActiveCounter(true);
105+
}
102106
}
103107

104108
TSession::TImpl::~TImpl()
105109
{
106-
StreamProcessor_->Cancel();
110+
if (StreamProcessor_) {
111+
StreamProcessor_->Cancel();
112+
}
107113
SessionHolder->WaitAndLock();
108114
}
109115

@@ -114,7 +120,7 @@ void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr,
114120
ptr->Read(resp.get(), [args, resp, ptr](NYdbGrpc::TGrpcStatus grpcStatus) mutable {
115121
if (grpcStatus.GRpcStatusCode != grpc::StatusCode::OK) {
116122
TStatus st(TPlainStatus(grpcStatus, args->Endpoint));
117-
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
123+
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession(args->Client)));
118124

119125
} else {
120126
if (resp->status() == Ydb::StatusIds::SUCCESS) {
@@ -125,7 +131,7 @@ void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr,
125131
NYdb::NIssue::TIssues opIssues;
126132
NYdb::NIssue::IssuesFromMessage(resp->issues(), opIssues);
127133
TStatus st(static_cast<EStatus>(resp->status()), std::move(opIssues));
128-
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
134+
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession(args->Client)));
129135
}
130136
}
131137
});

tests/integration/basic_example_it/main.cpp renamed to tests/integration/basic_example/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ static NYdb::TType MakeOptionalType(NYdb::EPrimitiveType type) {
3838
}
3939

4040

41-
TEST(Integration, BasicExample) {
41+
TEST(BasicExample, BasicExample) {
4242
auto [driver, path] = GetRunArgs();
4343

4444
NYdb::NTable::TTableClient client(driver);

tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp renamed to tests/integration/bulk_upsert/bulk_upsert.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ static std::string JoinPath(const std::string& basePath, const std::string& path
1717
}
1818

1919
TRunArgs GetRunArgs() {
20-
2120
std::string database = std::getenv("YDB_DATABASE");
2221
std::string endpoint = std::getenv("YDB_ENDPOINT");
2322

tests/integration/bulk_upsert_simple_it/main.cpp renamed to tests/integration/bulk_upsert/main.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
#include <util/string/cast.h>
66

77

8-
TEST(Integration, BulkUpsert) {
9-
8+
TEST(BulkUpsert, BulkUpsert) {
109
uint32_t correctSumApp = 0;
1110
uint32_t correctSumHost = 0;
1211
uint32_t correctRowCount = 0;
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
#include <ydb-cpp-sdk/client/query/client.h>
2+
3+
#include <library/cpp/testing/gtest/gtest.h>
4+
5+
#include <src/api/grpc/ydb_discovery_v1.grpc.pb.h>
6+
#include <src/api/grpc/ydb_query_v1.grpc.pb.h>
7+
8+
#include <grpcpp/grpcpp.h>
9+
10+
#include <thread>
11+
12+
using namespace NYdb;
13+
using namespace NYdb::NQuery;
14+
15+
using namespace std::chrono_literals;
16+
17+
class TDiscoveryProxy final : public Ydb::Discovery::V1::DiscoveryService::Service {
18+
public:
19+
TDiscoveryProxy(std::atomic_bool& paused)
20+
: Paused_(paused)
21+
{
22+
}
23+
24+
grpc::Status ListEndpoints([[maybe_unused]] grpc::ServerContext* context,
25+
[[maybe_unused]] const Ydb::Discovery::ListEndpointsRequest* request,
26+
Ydb::Discovery::ListEndpointsResponse* response) override {
27+
if (Paused_.load()) {
28+
return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Server is paused");
29+
}
30+
31+
Ydb::Discovery::ListEndpointsResult result;
32+
auto info = result.add_endpoints();
33+
info->set_address("localhost");
34+
info->set_port(Port_);
35+
info->set_node_id(1);
36+
37+
response->mutable_operation()->mutable_result()->PackFrom(result);
38+
response->mutable_operation()->set_id("ydb://operation/1");
39+
response->mutable_operation()->set_ready(true);
40+
response->mutable_operation()->set_status(Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS);
41+
return grpc::Status::OK;
42+
}
43+
44+
void SetPort(int port) {
45+
Port_ = port;
46+
}
47+
48+
private:
49+
int Port_;
50+
std::atomic_bool& Paused_;
51+
};
52+
53+
class TQueryProxy final : public Ydb::Query::V1::QueryService::Service {
54+
public:
55+
TQueryProxy(std::shared_ptr<grpc::Channel> channel, std::atomic_bool& paused)
56+
: Stub_(channel)
57+
, Paused_(paused)
58+
{
59+
}
60+
61+
template <typename TRequest, typename TResponse>
62+
using TGrpcCall =
63+
grpc::Status(Ydb::Query::V1::QueryService::Stub::*)(grpc::ClientContext*, const TRequest& request, TResponse* response);
64+
65+
template <typename TRequest, typename TResponse>
66+
using TGrpcStreamCall =
67+
std::unique_ptr<grpc::ClientReader<TResponse>>(Ydb::Query::V1::QueryService::Stub::*)(grpc::ClientContext*, const TRequest& request);
68+
69+
template <typename TRequest, typename TResponse>
70+
grpc::Status Run(TGrpcCall<TRequest, TResponse> call, grpc::ServerContext *context,
71+
const TRequest* request, TResponse* response) {
72+
if (Paused_.load()) {
73+
return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Server is paused");
74+
}
75+
76+
auto clientContext = grpc::ClientContext::FromServerContext(*context);
77+
return (Stub_.*call)(clientContext.get(), *request, response);
78+
}
79+
80+
template <typename TRequest, typename TResponse>
81+
grpc::Status RunStream(TGrpcStreamCall<TRequest, TResponse> call, grpc::ServerContext *context,
82+
const TRequest* request, grpc::ServerWriter<TResponse>* writer) {
83+
auto clientContext = grpc::ClientContext::FromServerContext(*context);
84+
auto reader = (Stub_.*call)(clientContext.get(), *request);
85+
86+
TResponse state;
87+
88+
while (reader->Read(&state)) {
89+
if (Paused_.load()) {
90+
return grpc::Status(grpc::StatusCode::UNAVAILABLE, "Server is paused");
91+
}
92+
writer->Write(state);
93+
}
94+
95+
return reader->Finish();
96+
}
97+
98+
grpc::Status CreateSession(grpc::ServerContext *context, const Ydb::Query::CreateSessionRequest* request,
99+
Ydb::Query::CreateSessionResponse* response) override {
100+
return Run(&Ydb::Query::V1::QueryService::Stub::CreateSession, context, request, response);
101+
}
102+
103+
grpc::Status DeleteSession(grpc::ServerContext *context, const Ydb::Query::DeleteSessionRequest *request,
104+
Ydb::Query::DeleteSessionResponse *response) override {
105+
return Run(&Ydb::Query::V1::QueryService::Stub::DeleteSession, context, request, response);
106+
}
107+
108+
grpc::Status AttachSession(grpc::ServerContext *context, const Ydb::Query::AttachSessionRequest *request,
109+
grpc::ServerWriter<Ydb::Query::SessionState> *writer) override {
110+
return RunStream(&Ydb::Query::V1::QueryService::Stub::AttachSession, context, request, writer);
111+
}
112+
113+
grpc::Status ExecuteQuery(grpc::ServerContext *context, const Ydb::Query::ExecuteQueryRequest *request,
114+
grpc::ServerWriter<Ydb::Query::ExecuteQueryResponsePart> *writer) override {
115+
return RunStream(&Ydb::Query::V1::QueryService::Stub::ExecuteQuery, context, request, writer);
116+
}
117+
118+
private:
119+
Ydb::Query::V1::QueryService::Stub Stub_;
120+
std::atomic_bool& Paused_;
121+
};
122+
123+
class ServerRestartTest : public testing::Test {
124+
protected:
125+
ServerRestartTest() {
126+
std::string endpoint = std::getenv("YDB_ENDPOINT");
127+
std::string database = std::getenv("YDB_DATABASE");
128+
Channel_ = grpc::CreateChannel(grpc::string{endpoint}, grpc::InsecureChannelCredentials());
129+
130+
DisoveryService_ = std::make_unique<TDiscoveryProxy>(Paused_);
131+
QueryService_ = std::make_unique<TQueryProxy>(Channel_, Paused_);
132+
133+
int port = 0;
134+
135+
Server_ = grpc::ServerBuilder()
136+
.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(), &port)
137+
.RegisterService(DisoveryService_.get())
138+
.RegisterService(QueryService_.get())
139+
.BuildAndStart();
140+
141+
DisoveryService_->SetPort(port);
142+
143+
Driver_ = std::make_unique<TDriver>(TDriverConfig()
144+
.SetEndpoint("localhost:" + std::to_string(port))
145+
.SetDatabase(database)
146+
.SetDiscoveryMode(EDiscoveryMode::Async)
147+
);
148+
}
149+
150+
TDriver GetDriver() {
151+
return *Driver_;
152+
}
153+
154+
void PauseServer() {
155+
Paused_.store(true);
156+
}
157+
158+
void UnpauseServer() {
159+
Paused_.store(false);
160+
}
161+
162+
private:
163+
std::atomic_bool Paused_{false};
164+
165+
std::shared_ptr<grpc::Channel> Channel_;
166+
167+
std::unique_ptr<TDiscoveryProxy> DisoveryService_;
168+
std::unique_ptr<TQueryProxy> QueryService_;
169+
std::unique_ptr<grpc::Server> Server_;
170+
171+
std::unique_ptr<TDriver> Driver_;
172+
};
173+
174+
175+
TEST_F(ServerRestartTest, RestartOnGetSession) {
176+
TQueryClient client(GetDriver());
177+
std::atomic_bool closed(false);
178+
179+
auto thread = std::thread([&client, &closed]() {
180+
std::optional<TStatus> status;
181+
while (!closed.load()) {
182+
status = client.RetryQuerySync([](NYdb::NQuery::TSession session) {
183+
return session.ExecuteQuery("SELECT 1", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
184+
});
185+
186+
ASSERT_LE(client.GetActiveSessionCount(), 1);
187+
188+
std::this_thread::sleep_for(100ms);
189+
}
190+
191+
ASSERT_TRUE(status.has_value());
192+
ASSERT_TRUE(status->IsSuccess()) << ToString(*status);
193+
});
194+
195+
std::this_thread::sleep_for(1s);
196+
PauseServer();
197+
std::this_thread::sleep_for(10s);
198+
UnpauseServer();
199+
std::this_thread::sleep_for(1s);
200+
201+
closed.store(true);
202+
thread.join();
203+
204+
ASSERT_EQ(client.GetActiveSessionCount(), 0);
205+
}

0 commit comments

Comments
 (0)