Skip to content

Commit 250822e

Browse files
authored
fixes applied to sessions sysview (#8667)
1 parent 396d0c1 commit 250822e

File tree

5 files changed

+27
-9
lines changed

5 files changed

+27
-9
lines changed

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1732,7 +1732,11 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
17321732
KQP_PROXY_LOG_D("incoming list sessions request " << ev->Get()->Record.ShortUtf8DebugString());
17331733

17341734
auto result = std::make_unique<TEvKqp::TEvListSessionsResponse>();
1735-
auto startIt = LocalSessions->GetOrderedLowerBound(ev->Get()->Record.GetSessionIdStart());
1735+
1736+
const auto& tenant = ev->Get()->Record.GetTenantName();
1737+
bool checkTenant = (AppData()->TenantName != tenant);
1738+
1739+
auto startIt = LocalSessions->GetOrderedLowerBound(tenant, ev->Get()->Record.GetSessionIdStart());
17361740
auto endIt = LocalSessions->GetOrderedEnd();
17371741
i32 freeSpace = ev->Get()->Record.GetFreeSpace();
17381742

@@ -1743,6 +1747,10 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
17431747

17441748
while(startIt != endIt && freeSpace > 0) {
17451749
auto* sessionInfo = startIt->second;
1750+
if (checkTenant && sessionInfo->Database != ev->Get()->Record.GetTenantName()) {
1751+
finished = true;
1752+
break;
1753+
}
17461754

17471755
if (!until.empty()) {
17481756
if (sessionInfo->SessionId > until) {
@@ -1770,7 +1778,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
17701778
if (finished) {
17711779
result->Record.SetFinished(true);
17721780
} else {
1773-
result->Record.SetContinuationToken(startIt->first);
1781+
Y_ABORT_UNLESS(startIt != endIt);
1782+
result->Record.SetContinuationToken(startIt->first.second);
17741783
result->Record.SetFinished(false);
17751784
}
17761785

ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ struct TKqpSessionInfo {
148148

149149
class TLocalSessionsRegistry {
150150
THashMap<TString, TKqpSessionInfo> LocalSessions;
151-
std::map<TString, TKqpSessionInfo*> OrderedSessions;
151+
std::map<std::pair<TString, TString>, TKqpSessionInfo*> OrderedSessions;
152152
THashMap<TActorId, TString> TargetIdIndex;
153153
THashSet<TString> ShutdownInFlightSessions;
154154
THashMap<TString, ui32> SessionsCountPerDatabase;
@@ -206,7 +206,7 @@ class TLocalSessionsRegistry {
206206
auto result = LocalSessions.emplace(sessionId,
207207
TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos),
208208
sessionStartedAt + idleDuration, IdleSessions.end(), pgWire, startedAt));
209-
OrderedSessions.emplace(sessionId, &result.first->second);
209+
OrderedSessions.emplace(std::make_pair(database, sessionId), &result.first->second);
210210
SessionsCountPerDatabase[database]++;
211211
Y_ABORT_UNLESS(result.second, "Duplicate session id!");
212212
TargetIdIndex.emplace(workerId, sessionId);
@@ -300,11 +300,11 @@ class TLocalSessionsRegistry {
300300
return ShutdownInFlightSessions.size();
301301
}
302302

303-
std::map<TString, TKqpSessionInfo*>::const_iterator GetOrderedLowerBound(const TString& continuation) const {
304-
return OrderedSessions.lower_bound(continuation);
303+
std::map<std::pair<TString, TString>, TKqpSessionInfo*>::const_iterator GetOrderedLowerBound(const TString& tenant, const TString& continuation) const {
304+
return OrderedSessions.lower_bound(std::make_pair(tenant, continuation));
305305
}
306306

307-
std::map<TString, TKqpSessionInfo*>::const_iterator GetOrderedEnd() const {
307+
std::map<std::pair<TString, TString>, TKqpSessionInfo*>::const_iterator GetOrderedEnd() const {
308308
return OrderedSessions.end();
309309
}
310310

@@ -337,7 +337,7 @@ class TLocalSessionsRegistry {
337337
}
338338
}
339339

340-
OrderedSessions.erase(sessionId);
340+
OrderedSessions.erase(std::make_pair(it->second.Database, sessionId));
341341
LocalSessions.erase(it);
342342
}
343343

ydb/core/kqp/proxy_service/kqp_session_info.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ namespace NKikimr::NKqp {
66

77
using VSessions = NKikimr::NSysView::Schema::QuerySessions;
88

9+
constexpr size_t QUERY_TEXT_LIMIT = 10_KB;
10+
911
void TKqpSessionInfo::SerializeTo(::NKikimrKqp::TSessionInfo* proto, const TFieldsMap& fieldsMap) const {
1012
if (fieldsMap.NeedField(VSessions::SessionId::ColumnId)) { // 1
1113
proto->SetSessionId(SessionId);
@@ -26,7 +28,12 @@ void TKqpSessionInfo::SerializeTo(::NKikimrKqp::TSessionInfo* proto, const TFiel
2628

2729
// last executed query or currently running query.
2830
if (fieldsMap.NeedField(VSessions::Query::ColumnId)) { // 4
29-
proto->SetQuery(QueryText);
31+
if (QueryText.size() > QUERY_TEXT_LIMIT) {
32+
TString truncatedText = QueryText.substr(0, QUERY_TEXT_LIMIT);
33+
proto->SetQuery(QueryText);
34+
} else {
35+
proto->SetQuery(QueryText);
36+
}
3037
}
3138

3239
if (fieldsMap.NeedField(VSessions::QueryCount::ColumnId)) { // 5

ydb/core/protos/kqp.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ message TEvListSessionsRequest {
359359
repeated uint32 Columns = 5;
360360
optional int64 FreeSpace = 6;
361361
optional int64 Limit = 7;
362+
optional string TenantName = 8;
362363
}
363364

364365
message TEvListSessionsResponse {

ydb/core/sys_view/sessions/sessions.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ class TSessionsScan : public NKikimr::NSysView::TScanActorBase<TSessionsScan> {
178178
const auto& nodeId = PendingNodes.front();
179179
auto kqpProxyId = NKqp::MakeKqpProxyID(nodeId);
180180
auto req = std::make_unique<NKikimr::NKqp::TEvKqp::TEvListSessionsRequest>();
181+
req->Record.SetTenantName(TenantName);
181182
if (!ContinuationToken.empty()) {
182183
req->Record.SetSessionIdStart(ContinuationToken);
183184
req->Record.SetSessionIdStartInclusive(true);

0 commit comments

Comments
 (0)