Skip to content

Commit 2ec8c2c

Browse files
authored
Fix memory leak in case of launch periodic keep-alive task for query service session (#11489)
1 parent 3f45718 commit 2ec8c2c

File tree

3 files changed

+85
-7
lines changed

3 files changed

+85
-7
lines changed

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,85 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
9090
WaitForZeroSessions(counters);
9191
}
9292

93+
Y_UNIT_TEST(PeriodicTaskInSessionPool) {
94+
auto kikimr = DefaultKikimrRunner();
95+
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
96+
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
97+
98+
{
99+
auto db = kikimr.GetQueryClient();
100+
101+
TString id;
102+
{
103+
auto result = db.GetSession().GetValueSync();
104+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
105+
UNIT_ASSERT(result.GetSession().GetId());
106+
auto session = result.GetSession();
107+
id = session.GetId();
108+
109+
auto execResult = session.ExecuteQuery("SELECT 1;",
110+
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
111+
UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
112+
}
113+
// This time is more then internal sdk periodic timeout but less than close session
114+
// expect nothing happens with session in the pool
115+
Sleep(TDuration::Seconds(10));
116+
117+
{
118+
auto result = db.GetSession().GetValueSync();
119+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
120+
UNIT_ASSERT(result.GetSession().GetId() == id);
121+
122+
auto execResult = result.GetSession().ExecuteQuery("SELECT 1;",
123+
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
124+
125+
UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
126+
}
127+
}
128+
WaitForZeroSessions(counters);
129+
}
130+
131+
Y_UNIT_TEST(PeriodicTaskInSessionPoolSessionCloseByIdle) {
132+
auto kikimr = DefaultKikimrRunner();
133+
auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
134+
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
135+
136+
{
137+
auto settings = NYdb::NQuery::TClientSettings().SessionPoolSettings(
138+
NYdb::NQuery::TSessionPoolSettings()
139+
.MinPoolSize(0)
140+
.CloseIdleThreshold(TDuration::Seconds(1)));
141+
auto db = kikimr.GetQueryClient(settings);
142+
143+
TString id;
144+
{
145+
auto result = db.GetSession().GetValueSync();
146+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
147+
UNIT_ASSERT(result.GetSession().GetId());
148+
auto session = result.GetSession();
149+
id = session.GetId();
150+
151+
auto execResult = session.ExecuteQuery("SELECT 1;",
152+
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
153+
UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
154+
}
155+
156+
Sleep(TDuration::Seconds(11));
157+
UNIT_ASSERT_VALUES_EQUAL(db.GetCurrentPoolSize(), 0);
158+
159+
{
160+
auto result = db.GetSession().GetValueSync();
161+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
162+
UNIT_ASSERT(result.GetSession().GetId() != id);
163+
164+
auto execResult = result.GetSession().ExecuteQuery("SELECT 1;",
165+
NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
166+
UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
167+
}
168+
}
169+
WaitForZeroSessions(counters);
170+
}
171+
93172
Y_UNIT_TEST(StreamExecuteQueryPure) {
94173
auto kikimr = DefaultKikimrRunner();
95174
auto db = kikimr.GetQueryClient();

ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,13 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<ISessionClient> weakC
284284

285285
if (deletePredicate(it->second.get(), sessions.size())) {
286286
sessionsToDelete.emplace_back(std::move(it->second));
287-
} else {
287+
sessions.erase(it++);
288+
} else if (cmd) {
288289
sessionsToTouch.emplace_back(std::move(it->second));
290+
sessions.erase(it++);
291+
} else {
292+
it++;
289293
}
290-
sessions.erase(it++);
291294
}
292295
}
293296

ydb/public/sdk/cpp/client/ydb_query/client.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -484,15 +484,11 @@ class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public
484484
return false;
485485
};
486486

487-
// No need to keep-alive
488-
auto keepAliveCmd = [](TKqpSessionCommon*) {
489-
};
490-
491487
std::weak_ptr<TQueryClient::TImpl> weak = shared_from_this();
492488
Connections_->AddPeriodicTask(
493489
SessionPool_.CreatePeriodicTask(
494490
weak,
495-
std::move(keepAliveCmd),
491+
NSessionPool::TSessionPool::TKeepAliveCmd(), // no keep-alive cmd for query service
496492
std::move(deletePredicate)
497493
), NSessionPool::PERIODIC_ACTION_INTERVAL);
498494
}

0 commit comments

Comments
 (0)