Skip to content

Commit cd24dc8

Browse files
committed
YQ-3684 fixed error duplicate session id (ydb-platform#9583)
1 parent b230eba commit cd24dc8

File tree

5 files changed

+86
-11
lines changed

5 files changed

+86
-11
lines changed

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
255255
QueryState->UserToken
256256
), IEventHandle::FlagTrackDelivery);
257257

258+
QueryState->PoolHandlerActor = MakeKqpWorkloadServiceId(SelfId().NodeId());
258259
Become(&TKqpSessionActor::ExecuteState);
259260
}
260261

@@ -2387,6 +2388,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
23872388
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
23882389
hFunc(TEvents::TEvUndelivered, HandleNoop);
23892390
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleNoop);
2391+
hFunc(TEvKqpExecuter::TEvStreamData, HandleNoop);
23902392
hFunc(NWorkload::TEvContinueRequest, HandleNoop);
23912393

23922394
// always come from WorkerActor

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
161161
}
162162

163163
SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers);
164+
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvStopPoolHandlerResponse(Database, PoolId));
164165

165166
Counters.OnCleanup(ResetCountersOnStrop);
166167

@@ -184,16 +185,16 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
184185
}
185186

186187
void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
187-
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId));
188-
189188
auto event = std::move(ev->Get()->Event);
189+
const TString& sessionId = event->Get()->SessionId;
190+
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId, sessionId));
191+
190192
const TActorId& workerActorId = event->Sender;
191193
if (!InFlightLimit) {
192194
this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::PRECONDITION_FAILED, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Resource pool " << PoolId << " was disabled due to zero concurrent query limit")}));
193195
return;
194196
}
195197

196-
const TString& sessionId = event->Get()->SessionId;
197198
if (LocalSessions.contains(sessionId)) {
198199
this->Send(workerActorId, new TEvContinueRequest(Ydb::StatusIds::INTERNAL_ERROR, PoolId, PoolConfig, {NYql::TIssue(TStringBuilder() << "Got duplicate session id " << sessionId << " for pool " << PoolId)}));
199200
return;

ydb/core/kqp/workload_service/common/events.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ struct TEvPrivate {
2828
EvFinishRequestInPool,
2929
EvResignPoolHandler,
3030
EvStopPoolHandler,
31+
EvStopPoolHandlerResponse,
3132
EvCancelRequest,
3233
EvUpdatePoolSubscription,
3334

@@ -111,13 +112,15 @@ struct TEvPrivate {
111112
};
112113

113114
struct TEvPlaceRequestIntoPoolResponse : public NActors::TEventLocal<TEvPlaceRequestIntoPoolResponse, EvPlaceRequestIntoPoolResponse> {
114-
TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId)
115+
TEvPlaceRequestIntoPoolResponse(const TString& database, const TString& poolId, const TString& sessionId)
115116
: Database(database)
116117
, PoolId(poolId)
118+
, SessionId(sessionId)
117119
{}
118120

119121
const TString Database;
120122
const TString PoolId;
123+
const TString SessionId;
121124
};
122125

123126
struct TEvFinishRequestInPool : public NActors::TEventLocal<TEvFinishRequestInPool, EvFinishRequestInPool> {
@@ -156,6 +159,16 @@ struct TEvPrivate {
156159
const bool ResetCounters;
157160
};
158161

162+
struct TEvStopPoolHandlerResponse : public NActors::TEventLocal<TEvStopPoolHandlerResponse, EvStopPoolHandlerResponse> {
163+
TEvStopPoolHandlerResponse(const TString& database, const TString& poolId)
164+
: Database(database)
165+
, PoolId(poolId)
166+
{}
167+
168+
const TString Database;
169+
const TString PoolId;
170+
};
171+
159172
struct TEvCancelRequest : public NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> {
160173
explicit TEvCancelRequest(const TString& sessionId)
161174
: SessionId(sessionId)

ydb/core/kqp/workload_service/kqp_workload_service.cpp

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,21 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
169169
void Handle(TEvCleanupRequest::TPtr& ev) {
170170
const TString& database = ev->Get()->Database;
171171
const TString& poolId = ev->Get()->PoolId;
172+
const TString& sessionId = ev->Get()->SessionId;
173+
if (GetOrCreateDatabaseState(database)->PendingSessionIds.contains(sessionId)) {
174+
LOG_D("Finished request with worker actor " << ev->Sender << ", wait for place request, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId);
175+
GetOrCreateDatabaseState(database)->PendingCancelRequests[sessionId].emplace_back(std::move(ev));
176+
return;
177+
}
178+
172179
auto poolState = GetPoolState(database, poolId);
173180
if (!poolState) {
174181
ReplyCleanupError(ev->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << poolId << " not found");
175182
return;
176183
}
177184

178185
LOG_D("Finished request with worker actor " << ev->Sender << ", Database: " << database << ", PoolId: " << poolId << ", SessionId: " << ev->Get()->SessionId);
179-
Send(ev->Forward(poolState->PoolHandler));
186+
poolState->DoCleanupRequest(std::move(ev));
180187
}
181188

182189
void Handle(TEvents::TEvWakeup::TPtr& ev) {
@@ -220,6 +227,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
220227
hFunc(TEvPrivate::TEvTablesCreationFinished, Handle);
221228
hFunc(TEvPrivate::TEvCpuLoadResponse, Handle);
222229
hFunc(TEvPrivate::TEvResignPoolHandler, Handle);
230+
hFunc(TEvPrivate::TEvStopPoolHandlerResponse, Handle);
223231
)
224232

225233
private:
@@ -245,12 +253,16 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
245253
void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) {
246254
const auto& event = ev->Get()->Event;
247255
const TString& database = event->Get()->Database;
256+
auto databaseState = GetOrCreateDatabaseState(database);
248257
if (ev->Get()->DefaultPoolCreated) {
249-
GetOrCreateDatabaseState(database)->HasDefaultPool = true;
258+
databaseState->HasDefaultPool = true;
250259
}
251260

252261
const TString& poolId = event->Get()->PoolId;
253262
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
263+
databaseState->RemovePendingSession(event->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) {
264+
ReplyCleanupError(event->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << event->Get()->PoolId << " not found");
265+
});
254266
ReplyContinueError(event->Sender, ev->Get()->Status, ev->Get()->Issues);
255267
return;
256268
}
@@ -265,9 +277,19 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
265277
void Handle(TEvPrivate::TEvPlaceRequestIntoPoolResponse::TPtr& ev) {
266278
const TString& database = ev->Get()->Database;
267279
const TString& poolId = ev->Get()->PoolId;
268-
LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId);
280+
const TString& sessionId = ev->Get()->SessionId;
281+
LOG_T("Request placed into pool, Database: " << database << ", PoolId: " << poolId << ", SessionId: " << sessionId);
269282

270-
if (auto poolState = GetPoolState(database, poolId)) {
283+
auto poolState = GetPoolState(database, poolId);
284+
GetOrCreateDatabaseState(database)->RemovePendingSession(sessionId, [this, poolState](TEvCleanupRequest::TPtr event) {
285+
if (poolState) {
286+
poolState->DoCleanupRequest(std::move(event));
287+
} else {
288+
ReplyCleanupError(event->Sender, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Pool " << event->Get()->PoolId << " not found");
289+
}
290+
});
291+
292+
if (poolState) {
271293
poolState->PlaceRequestRunning = false;
272294
poolState->UpdateHandler();
273295
poolState->StartPlaceRequest();
@@ -388,6 +410,17 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
388410
}
389411
}
390412

413+
void Handle(TEvPrivate::TEvStopPoolHandlerResponse::TPtr& ev) {
414+
const TString& database = ev->Get()->Database;
415+
const TString& poolId = ev->Get()->PoolId;
416+
LOG_T("Got stop pool handler response, Database: " << database << ", PoolId: " << poolId);
417+
418+
Counters.ActivePools->Dec();
419+
if (auto poolState = GetPoolState(database, poolId)) {
420+
poolState->PreviousPoolHandlers.erase(ev->Sender);
421+
}
422+
}
423+
391424
private:
392425
void InitializeWorkloadService() {
393426
if (ServiceInitialized) {
@@ -441,15 +474,14 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
441474
std::vector<TString> poolsToDelete;
442475
poolsToDelete.reserve(PoolIdToState.size());
443476
for (const auto& [poolKey, poolState] : PoolIdToState) {
444-
if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) {
477+
if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION && poolState.PendingRequests.empty()) {
445478
CpuQuotaManager->CleanupHandler(poolState.PoolHandler);
446479
Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler(true));
447480
poolsToDelete.emplace_back(poolKey);
448481
}
449482
}
450483
for (const auto& poolKey : poolsToDelete) {
451484
PoolIdToState.erase(poolKey);
452-
Counters.ActivePools->Dec();
453485
}
454486

455487
if (!PoolIdToState.empty()) {
@@ -512,7 +544,8 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
512544
Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)}));
513545
}
514546

515-
TDatabaseState* GetOrCreateDatabaseState(const TString& database) {
547+
TDatabaseState* GetOrCreateDatabaseState(TString database) {
548+
database = CanonizePath(database);
516549
auto databaseIt = DatabaseToState.find(database);
517550
if (databaseIt != DatabaseToState.end()) {
518551
return &databaseIt->second;

ydb/core/kqp/workload_service/kqp_workload_service_impl.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ struct TDatabaseState {
1919
bool& EnabledResourcePoolsOnServerless;
2020

2121
std::vector<TEvPlaceRequestIntoPool::TPtr> PendingRequersts = {};
22+
std::unordered_set<TString> PendingSessionIds = {};
23+
std::unordered_map<TString, std::vector<TEvCleanupRequest::TPtr>> PendingCancelRequests = {};
2224
std::unordered_map<TString, std::unordered_set<TActorId>> PendingSubscriptions = {};
2325
bool HasDefaultPool = false;
2426
bool Serverless = false;
@@ -38,6 +40,7 @@ struct TDatabaseState {
3840

3941
void DoPlaceRequest(TEvPlaceRequestIntoPool::TPtr ev) {
4042
TString database = ev->Get()->Database;
43+
PendingSessionIds.emplace(ev->Get()->SessionId);
4144
PendingRequersts.emplace_back(std::move(ev));
4245

4346
if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) {
@@ -79,6 +82,14 @@ struct TDatabaseState {
7982
StartPendingRequests();
8083
}
8184

85+
void RemovePendingSession(const TString& sessionId, std::function<void(TEvCleanupRequest::TPtr)> callback) {
86+
for (auto& event : PendingCancelRequests[sessionId]) {
87+
callback(std::move(event));
88+
}
89+
PendingCancelRequests.erase(sessionId);
90+
PendingSessionIds.erase(sessionId);
91+
}
92+
8293
private:
8394
void StartPendingRequests() {
8495
if (!EnabledResourcePoolsOnServerless && Serverless) {
@@ -94,6 +105,9 @@ struct TDatabaseState {
94105

95106
void ReplyContinueError(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
96107
for (const auto& ev : PendingRequersts) {
108+
RemovePendingSession(ev->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) {
109+
ActorContext.Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, {NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")}));
110+
});
97111
ActorContext.Send(ev->Sender, new TEvContinueRequest(status, {}, {}, issues));
98112
}
99113
PendingRequersts.clear();
@@ -108,6 +122,7 @@ struct TPoolState {
108122
bool WaitingInitialization = false;
109123
bool PlaceRequestRunning = false;
110124
std::optional<TActorId> NewPoolHandler = std::nullopt;
125+
std::unordered_set<TActorId> PreviousPoolHandlers = {};
111126

112127
ui64 InFlightRequests = 0;
113128
TInstant LastUpdateTime = TInstant::Now();
@@ -118,6 +133,7 @@ struct TPoolState {
118133
}
119134

120135
ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
136+
PreviousPoolHandlers.insert(PoolHandler);
121137
PoolHandler = *NewPoolHandler;
122138
NewPoolHandler = std::nullopt;
123139
InFlightRequests = 0;
@@ -139,6 +155,16 @@ struct TPoolState {
139155
InFlightRequests--;
140156
LastUpdateTime = TInstant::Now();
141157
}
158+
159+
void DoCleanupRequest(TEvCleanupRequest::TPtr event) {
160+
for (const auto& poolHandler : PreviousPoolHandlers) {
161+
ActorContext.Send(poolHandler, new TEvCleanupRequest(
162+
event->Get()->Database, event->Get()->SessionId, event->Get()->PoolId,
163+
event->Get()->Duration, event->Get()->CpuConsumed
164+
));
165+
}
166+
ActorContext.Send(event->Forward(PoolHandler));
167+
}
142168
};
143169

144170
struct TCpuQuotaManagerState {

0 commit comments

Comments
 (0)