Skip to content

Commit f322990

Browse files
[C++ SDK] Remove session from session pool if stream session was closed by server side. (#13199)
Co-authored-by: Bulat <bylatgr@gmail.com>
1 parent ef3837c commit f322990

File tree

8 files changed

+394
-15
lines changed

8 files changed

+394
-15
lines changed

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 217 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
1212

1313
#include <ydb/core/kqp/counters/kqp_counters.h>
14+
#include <ydb/core/base/counters.h>
15+
#include <library/cpp/threading/local_executor/local_executor.h>
1416

1517
#include <fmt/format.h>
1618

@@ -54,7 +56,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
5456
WaitForZeroSessions(counters);
5557
}
5658

57-
Y_UNIT_TEST(QueryOnClosedSession) {
59+
void DoClosedSessionRemovedWhileActiveTest(bool withQuery) {
5860
auto kikimr = DefaultKikimrRunner();
5961
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
6062
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
@@ -75,21 +77,134 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
7577

7678
UNIT_ASSERT(allDoneOk);
7779

78-
auto execResult = session.ExecuteQuery("SELECT 1;",
79-
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
80+
if (withQuery) {
81+
auto execResult = session.ExecuteQuery("SELECT 1;",
82+
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
8083

81-
UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION);
84+
UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::BAD_SESSION);
85+
}
8286
}
8387
// closed session must be removed from session pool
8488
{
8589
auto result = db.GetSession().GetValueSync();
8690
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
8791
UNIT_ASSERT(result.GetSession().GetId() != id);
8892
}
93+
UNIT_ASSERT_VALUES_EQUAL(db.GetActiveSessionCount(), 0);
8994
}
9095
WaitForZeroSessions(counters);
9196
}
9297

98+
Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithQuery) {
99+
// - Session is active (user gfot it)
100+
// - server close it
101+
// - user executes query (got BAD SESSION)
102+
// - session should be removed from pool
103+
DoClosedSessionRemovedWhileActiveTest(true);
104+
}
105+
106+
/* Not implemented in the sdk
107+
Y_UNIT_TEST(ClosedSessionRemovedWhileActiveWithoutQuery) {
108+
// - Session is active (user gfot it)
109+
// - server close it
110+
// - user do not executes any query
111+
// - session should be removed from pool
112+
DoClosedSessionRemovedWhileActiveTest(false);
113+
}
114+
*/
115+
// Copy paste from table service but with some modifications for query service
116+
// Checks read iterators/session/sdk counters have expected values
117+
Y_UNIT_TEST(CloseSessionsWithLoad) {
118+
auto kikimr = std::make_shared<TKikimrRunner>();
119+
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);
120+
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_DEBUG);
121+
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_ACTOR, NLog::PRI_DEBUG);
122+
kikimr->GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPILE_SERVICE, NLog::PRI_DEBUG);
123+
124+
NYdb::NQuery::TQueryClient db = kikimr->GetQueryClient();
125+
126+
const ui32 SessionsCount = 50;
127+
const TDuration WaitDuration = TDuration::Seconds(1);
128+
129+
TVector<NYdb::NQuery::TQueryClient::TSession> sessions;
130+
for (ui32 i = 0; i < SessionsCount; ++i) {
131+
auto sessionResult = db.GetSession().GetValueSync();
132+
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());
133+
134+
sessions.push_back(sessionResult.GetSession());
135+
}
136+
137+
NPar::LocalExecutor().RunAdditionalThreads(SessionsCount + 1);
138+
NPar::LocalExecutor().ExecRange([&kikimr, sessions, WaitDuration](int id) mutable {
139+
if (id == (i32)sessions.size()) {
140+
Sleep(WaitDuration);
141+
Cerr << "start sessions close....." << Endl;
142+
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr->GetEndpoint());
143+
for (ui32 i = 0; i < sessions.size(); ++i) {
144+
bool allDoneOk = true;
145+
NTestHelpers::CheckDelete(clientConfig, sessions[i].GetId(), Ydb::StatusIds::SUCCESS, allDoneOk);
146+
UNIT_ASSERT(allDoneOk);
147+
}
148+
149+
Cerr << "finished sessions close....." << Endl;
150+
auto counters = GetServiceCounters(kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters, "ydb");
151+
152+
ui64 pendingCompilations = 0;
153+
do {
154+
Sleep(WaitDuration);
155+
pendingCompilations = counters->GetNamedCounter("name", "table.query.compilation.active_count", false)->Val();
156+
Cerr << "still compiling... " << pendingCompilations << Endl;
157+
} while (pendingCompilations != 0);
158+
159+
ui64 pendingSessions = 0;
160+
do {
161+
Sleep(WaitDuration);
162+
pendingSessions = counters->GetNamedCounter("name", "table.session.active_count", false)->Val();
163+
Cerr << "still active sessions ... " << pendingSessions << Endl;
164+
} while (pendingSessions != 0);
165+
166+
return;
167+
}
168+
169+
auto session = sessions[id];
170+
TMaybe<TTransaction> tx;
171+
172+
while (true) {
173+
if (tx) {
174+
auto result = tx->Commit().GetValueSync();
175+
if (!result.IsSuccess()) {
176+
return;
177+
}
178+
179+
tx = {};
180+
continue;
181+
}
182+
183+
auto query = Sprintf(R"(
184+
SELECT Key, Text, Data FROM `/Root/EightShard` WHERE Key=%1$d + 0;
185+
SELECT Key, Data, Text FROM `/Root/EightShard` WHERE Key=%1$d + 1;
186+
SELECT Text, Key, Data FROM `/Root/EightShard` WHERE Key=%1$d + 2;
187+
SELECT Text, Data, Key FROM `/Root/EightShard` WHERE Key=%1$d + 3;
188+
SELECT Data, Key, Text FROM `/Root/EightShard` WHERE Key=%1$d + 4;
189+
SELECT Data, Text, Key FROM `/Root/EightShard` WHERE Key=%1$d + 5;
190+
191+
UPSERT INTO `/Root/EightShard` (Key, Text) VALUES
192+
(%2$dul, "New");
193+
)", RandomNumber<ui32>(), RandomNumber<ui32>());
194+
195+
auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).GetValueSync();
196+
if (!result.IsSuccess()) {
197+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_SESSION);
198+
Cerr << "received non-success status for session " << id << Endl;
199+
return;
200+
}
201+
202+
tx = result.GetTransaction();
203+
}
204+
}, 0, SessionsCount + 1, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
205+
WaitForZeroReadIterators(kikimr->GetTestServer(), "/Root/EightShard");
206+
}
207+
93208
Y_UNIT_TEST(PeriodicTaskInSessionPool) {
94209
auto kikimr = DefaultKikimrRunner();
95210
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
@@ -169,6 +284,104 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
169284
WaitForZeroSessions(counters);
170285
}
171286

287+
// Check closed session removed while its in the session pool
288+
Y_UNIT_TEST(ClosedSessionRemovedFromPool) {
289+
auto kikimr = DefaultKikimrRunner();
290+
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
291+
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
292+
293+
{
294+
auto db = kikimr.GetQueryClient();
295+
296+
TString id;
297+
{
298+
auto result = db.GetSession().GetValueSync();
299+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
300+
UNIT_ASSERT(result.GetSession().GetId());
301+
auto session = result.GetSession();
302+
id = session.GetId();
303+
}
304+
305+
bool allDoneOk = true;
306+
NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk);
307+
308+
Sleep(TDuration::Seconds(5));
309+
310+
UNIT_ASSERT(allDoneOk);
311+
{
312+
auto result = db.GetSession().GetValueSync();
313+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
314+
auto newSession = result.GetSession();
315+
UNIT_ASSERT_C(newSession.GetId() != id, "closed id: " << id << " new id: " << newSession.GetId());
316+
317+
auto execResult = newSession.ExecuteQuery("SELECT 1;",
318+
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
319+
320+
UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
321+
}
322+
}
323+
WaitForZeroSessions(counters);
324+
}
325+
326+
// Attempt to trigger simultanous server side close and return session
327+
// From sdk perspective check no dataraces
328+
Y_UNIT_TEST(ReturnAndCloseSameTime) {
329+
auto kikimr = DefaultKikimrRunner();
330+
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
331+
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
332+
333+
size_t iterations = 999;
334+
auto db = kikimr.GetQueryClient();
335+
336+
NPar::LocalExecutor().RunAdditionalThreads(2);
337+
while (iterations--) {
338+
auto lim = iterations % 33;
339+
TVector<NYdb::NQuery::TQueryClient::TSession> sessions;
340+
TVector<TString> sids;
341+
sessions.reserve(lim);
342+
sids.reserve(lim);
343+
for (size_t i = 0; i < lim; ++i) {
344+
auto sessionResult = db.GetSession().GetValueSync();
345+
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());
346+
347+
sessions.push_back(sessionResult.GetSession());
348+
sids.push_back(sessions.back().GetId());
349+
}
350+
351+
if (iterations & 1) {
352+
auto rng = std::default_random_engine {};
353+
std::ranges::shuffle(sids, rng);
354+
}
355+
356+
NPar::LocalExecutor().ExecRange([sessions{std::move(sessions)}, sids{std::move(sids)}, clientConfig](int id) mutable{
357+
if (id == 0) {
358+
for (size_t i = 0; i < sessions.size(); i++) {
359+
auto s = std::move(sessions[i]);
360+
auto execResult = s.ExecuteQuery("SELECT 1;",
361+
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
362+
switch (execResult.GetStatus()) {
363+
case EStatus::SUCCESS:
364+
case EStatus::BAD_SESSION:
365+
break;
366+
default:
367+
UNIT_ASSERT_C(false, "unexpected status: " << execResult.GetStatus());
368+
}
369+
}
370+
} else if (id == 1) {
371+
for (size_t i = 0; i < sids.size(); i++) {
372+
bool allDoneOk = true;
373+
NTestHelpers::CheckDelete(clientConfig, sids[i], Ydb::StatusIds::SUCCESS, allDoneOk);
374+
UNIT_ASSERT(allDoneOk);
375+
}
376+
} else {
377+
Y_ABORT_UNLESS(false, "unexpected thread cxount");
378+
}
379+
}, 0, 2, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
380+
}
381+
382+
WaitForZeroSessions(counters);
383+
}
384+
172385
Y_UNIT_TEST(StreamExecuteQueryPure) {
173386
auto kikimr = DefaultKikimrRunner();
174387
auto db = kikimr.GetQueryClient();

ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ TKqpSessionCommon::TKqpSessionCommon(
3636
, State_(S_STANDALONE)
3737
, TimeToTouch_(TInstant::Now())
3838
, TimeInPast_(TInstant::Now())
39+
, CloseHandler_(nullptr)
3940
, NeedUpdateActiveCounter_(false)
4041
{}
4142

@@ -115,7 +116,7 @@ void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval,
115116
if (updateTimeInPast) {
116117
TimeInPast_ = now;
117118
}
118-
TimeToTouch_ = now + interval;
119+
TimeToTouch_.store(now + interval, std::memory_order_relaxed);
119120
}
120121
}
121122

@@ -126,11 +127,11 @@ void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval,
126127
if (updateTimeInPast) {
127128
TimeInPast_ = now;
128129
}
129-
TimeToTouch_ = now + interval;
130+
TimeToTouch_.store(now + interval, std::memory_order_relaxed);
130131
}
131132

132133
TInstant TKqpSessionCommon::GetTimeToTouchFast() const {
133-
return TimeToTouch_;
134+
return TimeToTouch_.load(std::memory_order_relaxed);
134135
}
135136

136137
TInstant TKqpSessionCommon::GetTimeInPastFast() const {
@@ -146,6 +147,24 @@ TDuration TKqpSessionCommon::GetTimeInterval() const {
146147
return TimeInterval_;
147148
}
148149

150+
void TKqpSessionCommon::UpdateServerCloseHandler(IServerCloseHandler* handler) {
151+
CloseHandler_.store(handler);
152+
}
153+
154+
void TKqpSessionCommon::CloseFromServer(std::weak_ptr<ISessionClient> client) noexcept {
155+
auto strong = client.lock();
156+
if (!strong) {
157+
// Session closed on the server after stopping client - do nothing
158+
// moreover pool maybe destoyed now
159+
return;
160+
}
161+
162+
IServerCloseHandler* h = CloseHandler_.load();
163+
if (h) {
164+
h->OnCloseSession(this, strong);
165+
}
166+
}
167+
149168
////////////////////////////////////////////////////////////////////////////////
150169

151170
std::function<void(TKqpSessionCommon*)> TKqpSessionCommon::GetSmartDeleter(

ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@ namespace NYdb {
1414
////////////////////////////////////////////////////////////////////////////////
1515
ui64 GetNodeIdFromSession(const TStringType& sessionId);
1616

17+
class TKqpSessionCommon;
18+
19+
class IServerCloseHandler {
20+
public:
21+
virtual ~IServerCloseHandler() = default;
22+
// called when session should be closed by server signal
23+
virtual void OnCloseSession(const TKqpSessionCommon*, std::shared_ptr<ISessionClient>) = 0;
24+
};
25+
1726
class TKqpSessionCommon : public TEndpointObj {
1827
public:
1928
TKqpSessionCommon(const TStringType& sessionId, const TStringType& endpoint,
@@ -55,6 +64,12 @@ class TKqpSessionCommon : public TEndpointObj {
5564
static std::function<void(TKqpSessionCommon*)>
5665
GetSmartDeleter(std::shared_ptr<ISessionClient> client);
5766

67+
// Shoult be called under session pool lock
68+
void UpdateServerCloseHandler(IServerCloseHandler*);
69+
70+
// Called asynchronously from grpc thread.
71+
void CloseFromServer(std::weak_ptr<ISessionClient> client) noexcept;
72+
5873
protected:
5974
TAdaptiveLock Lock_;
6075

@@ -64,15 +79,20 @@ class TKqpSessionCommon : public TEndpointObj {
6479
const bool IsOwnedBySessionPool_;
6580

6681
EState State_;
67-
TInstant TimeToTouch_;
82+
// This time is used during async close session handling which does not lock the session
83+
// so we need to be able to read this value atomicaly
84+
std::atomic<TInstant> TimeToTouch_;
6885
TInstant TimeInPast_;
6986
// Is used to implement progressive timeout for settler keep alive call
7087
TDuration TimeInterval_;
88+
89+
std::atomic<IServerCloseHandler*> CloseHandler_;
7190
// Indicate session was in active state, but state was changed
7291
// (need to decrement active session counter)
7392
// TODO: suboptimal because need lock for atomic change from interceptor
7493
// Rewrite with bit field
7594
bool NeedUpdateActiveCounter_;
95+
7696
};
7797

7898
} // namespace NYdb

0 commit comments

Comments
 (0)