Skip to content

Commit 78c9736

Browse files
committed
Fixed pool limit exceeding after server restart in QueryClient (#13901)
1 parent 672a33c commit 78c9736

File tree

11 files changed

+233
-15
lines changed

11 files changed

+233
-15
lines changed

include/ydb-cpp-sdk/client/query/client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ class TSession {
151151
class TImpl;
152152
private:
153153
TSession();
154+
TSession(std::shared_ptr<TQueryClient::TImpl> client); // Create broken session
154155
TSession(std::shared_ptr<TQueryClient::TImpl> client, TSession::TImpl* sessionImpl);
155156

156157
std::shared_ptr<TQueryClient::TImpl> Client_;

src/client/query/client.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public
361361
TSession::TImpl::MakeImplAsync(processor, args);
362362
} else {
363363
TStatus st(std::move(status));
364-
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
364+
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession(args->Client)));
365365
}
366366
},
367367
&Ydb::Query::V1::QueryService::Stub::AsyncAttachSession,
@@ -384,13 +384,13 @@ class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public
384384
NYdb::NIssue::TIssues opIssues;
385385
NYdb::NIssue::IssuesFromMessage(resp->issues(), opIssues);
386386
TStatus st(static_cast<EStatus>(resp->status()), std::move(opIssues));
387-
promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
387+
promise.SetValue(TCreateSessionResult(std::move(st), TSession(self)));
388388
} else {
389389
self->DoAttachSession(resp, promise, status.Endpoint, self);
390390
}
391391
} else {
392392
TStatus st(std::move(status));
393-
promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
393+
promise.SetValue(TCreateSessionResult(std::move(st), TSession(self)));
394394
}
395395
};
396396

@@ -631,6 +631,14 @@ TSession TCreateSessionResult::GetSession() const {
631631
TSession::TSession()
632632
{}
633633

634+
TSession::TSession(std::shared_ptr<TQueryClient::TImpl> client)
635+
: Client_(client)
636+
, SessionImpl_(
637+
new TSession::TImpl(nullptr, "", "", client),
638+
TKqpSessionCommon::GetSmartDeleter(client)
639+
)
640+
{}
641+
634642
TSession::TSession(std::shared_ptr<TQueryClient::TImpl> client, TSession::TImpl* session)
635643
: Client_(client)
636644
, SessionImpl_(session, TKqpSessionCommon::GetSmartDeleter(client))

src/client/query/impl/client_session.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,12 @@ void TSession::TImpl::StartAsyncRead(TStreamProcessorPtr ptr, std::weak_ptr<ISes
7979
case grpc::StatusCode::OK:
8080
StartAsyncRead(ptr, client, holder);
8181
break;
82-
case grpc::StatusCode::OUT_OF_RANGE: {
82+
default: {
8383
auto impl = holder->TrySharedOwning();
8484
if (impl) {
8585
impl->CloseFromServer(client);
8686
holder->Release();
8787
}
88-
break;
8988
}
9089
}
9190
});
@@ -96,14 +95,21 @@ TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const std::string& sessionId, co
9695
, StreamProcessor_(ptr)
9796
, SessionHolder(std::make_shared<TSafeTSessionImplHolder>(this))
9897
{
99-
MarkActive();
100-
SetNeedUpdateActiveCounter(true);
101-
StartAsyncRead(StreamProcessor_, client, SessionHolder);
98+
if (ptr) {
99+
MarkActive();
100+
SetNeedUpdateActiveCounter(true);
101+
StartAsyncRead(StreamProcessor_, client, SessionHolder);
102+
} else {
103+
MarkBroken();
104+
SetNeedUpdateActiveCounter(true);
105+
}
102106
}
103107

104108
TSession::TImpl::~TImpl()
105109
{
106-
StreamProcessor_->Cancel();
110+
if (StreamProcessor_) {
111+
StreamProcessor_->Cancel();
112+
}
107113
SessionHolder->WaitAndLock();
108114
}
109115

@@ -114,7 +120,7 @@ void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr,
114120
ptr->Read(resp.get(), [args, resp, ptr](NYdbGrpc::TGrpcStatus grpcStatus) mutable {
115121
if (grpcStatus.GRpcStatusCode != grpc::StatusCode::OK) {
116122
TStatus st(TPlainStatus(grpcStatus, args->Endpoint));
117-
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
123+
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession(args->Client)));
118124

119125
} else {
120126
if (resp->status() == Ydb::StatusIds::SUCCESS) {
@@ -125,7 +131,7 @@ void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr,
125131
NYdb::NIssue::TIssues opIssues;
126132
NYdb::NIssue::IssuesFromMessage(resp->issues(), opIssues);
127133
TStatus st(static_cast<EStatus>(resp->status()), std::move(opIssues));
128-
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
134+
args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession(args->Client)));
129135
}
130136
}
131137
});

tests/integration/basic_example_it/main.cpp renamed to tests/integration/basic_example/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ static NYdb::TType MakeOptionalType(NYdb::EPrimitiveType type) {
3838
}
3939

4040

41-
TEST(Integration, BasicExample) {
41+
TEST(BasicExample, BasicExample) {
4242
auto [driver, path] = GetRunArgs();
4343

4444
NYdb::NTable::TTableClient client(driver);

tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp renamed to tests/integration/bulk_upsert/bulk_upsert.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ static std::string JoinPath(const std::string& basePath, const std::string& path
1717
}
1818

1919
TRunArgs GetRunArgs() {
20-
2120
std::string database = std::getenv("YDB_DATABASE");
2221
std::string endpoint = std::getenv("YDB_ENDPOINT");
2322

tests/integration/bulk_upsert_simple_it/main.cpp renamed to tests/integration/bulk_upsert/main.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
#include <util/string/cast.h>
66

77

8-
TEST(Integration, BulkUpsert) {
9-
8+
TEST(BulkUpsert, BulkUpsert) {
109
uint32_t correctSumApp = 0;
1110
uint32_t correctSumHost = 0;
1211
uint32_t correctRowCount = 0;

0 commit comments

Comments
 (0)