Skip to content

Commit 6dac5e0

Browse files
authored
YQ WM fixed race with actor context (#11576)
1 parent 071fc9a commit 6dac5e0

File tree

2 files changed

+20
-23
lines changed

2 files changed

+20
-23
lines changed

ydb/core/kqp/workload_service/kqp_workload_service.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
7171
(ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem
7272
}), IEventHandle::FlagTrackDelivery);
7373

74-
CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(ActorContext(), Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));
74+
CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));
7575

7676
EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools();
7777
EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless();
@@ -556,7 +556,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
556556
return &databaseIt->second;
557557
}
558558
LOG_I("Creating new database state for id " << databaseId);
559-
return &DatabaseToState.insert({databaseId, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second;
559+
return &DatabaseToState.insert({databaseId, TDatabaseState{.SelfId = SelfId(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second;
560560
}
561561

562562
TPoolState* GetOrCreatePoolState(const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) {
@@ -568,7 +568,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
568568
LOG_I("Creating new handler for pool " << poolKey);
569569

570570
const auto poolHandler = Register(CreatePoolHandlerActor(databaseId, poolId, poolConfig, EnableResourcePoolsCounters ? Counters.Counters : MakeIntrusive<NMonitoring::TDynamicCounters>()));
571-
const auto poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second;
571+
const auto poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler}}).first->second;
572572

573573
Counters.ActivePools->Inc();
574574
ScheduleIdleCheck();

ydb/core/kqp/workload_service/kqp_workload_service_impl.h

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ constexpr TDuration IDLE_DURATION = TDuration::Seconds(60);
1616

1717

1818
struct TDatabaseState {
19-
NActors::TActorContext ActorContext;
19+
TActorId SelfId;
2020
bool& EnabledResourcePoolsOnServerless;
2121

2222
std::vector<TEvPlaceRequestIntoPool::TPtr> PendingRequersts = {};
@@ -33,7 +33,7 @@ struct TDatabaseState {
3333
const TString& poolId = ev->Get()->PoolId;
3434
auto& subscribers = PendingSubscriptions[poolId];
3535
if (subscribers.empty()) {
36-
ActorContext.Register(CreatePoolFetcherActor(ActorContext.SelfID, ev->Get()->DatabaseId, poolId, nullptr));
36+
TActivationContext::Register(CreatePoolFetcherActor(SelfId, ev->Get()->DatabaseId, poolId, nullptr));
3737
}
3838

3939
subscribers.emplace(ev->Sender);
@@ -45,7 +45,7 @@ struct TDatabaseState {
4545
PendingRequersts.emplace_back(std::move(ev));
4646

4747
if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) {
48-
ActorContext.Register(CreateDatabaseFetcherActor(ActorContext.SelfID, DatabaseIdToDatabase(databaseId)));
48+
TActivationContext::Register(CreateDatabaseFetcherActor(SelfId, DatabaseIdToDatabase(databaseId)));
4949
} else if (!DatabaseUnsupported) {
5050
StartPendingRequests();
5151
} else {
@@ -61,11 +61,11 @@ struct TDatabaseState {
6161
}
6262

6363
if (ev->Get()->Status == Ydb::StatusIds::SUCCESS && poolHandler) {
64-
ActorContext.Send(poolHandler, new TEvPrivate::TEvUpdatePoolSubscription(ev->Get()->PathId, subscribers));
64+
TActivationContext::Send(poolHandler, std::make_unique<TEvPrivate::TEvUpdatePoolSubscription>(ev->Get()->PathId, subscribers));
6565
} else {
6666
const TString& databaseId = ev->Get()->DatabaseId;
6767
for (const auto& subscriber : subscribers) {
68-
ActorContext.Send(subscriber, new TEvUpdatePoolInfo(databaseId, poolId, std::nullopt, std::nullopt));
68+
TActivationContext::Send(subscriber, std::make_unique<TEvUpdatePoolInfo>(databaseId, poolId, std::nullopt, std::nullopt));
6969
}
7070
}
7171
subscribers.clear();
@@ -79,7 +79,7 @@ struct TDatabaseState {
7979
}
8080

8181
if (Serverless != ev->Get()->Serverless) {
82-
ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvKqp::TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->DatabaseId, ev->Get()->Serverless));
82+
TActivationContext::Send(MakeKqpProxyID(SelfId.NodeId()), std::make_unique<TEvKqp::TEvUpdateDatabaseInfo>(ev->Get()->Database, ev->Get()->DatabaseId, ev->Get()->Serverless));
8383
}
8484

8585
LastUpdateTime = TInstant::Now();
@@ -103,25 +103,24 @@ struct TDatabaseState {
103103
}
104104

105105
for (auto& ev : PendingRequersts) {
106-
ActorContext.Register(CreatePoolResolverActor(std::move(ev), HasDefaultPool));
106+
TActivationContext::Register(CreatePoolResolverActor(std::move(ev), HasDefaultPool));
107107
}
108108
PendingRequersts.clear();
109109
}
110110

111111
void ReplyContinueError(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
112112
for (const auto& ev : PendingRequersts) {
113-
RemovePendingSession(ev->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) {
114-
ActorContext.Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, {NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")}));
113+
RemovePendingSession(ev->Get()->SessionId, [actorSystem = TActivationContext::ActorSystem()](TEvCleanupRequest::TPtr event) {
114+
actorSystem->Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, NYql::TIssues{NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")}));
115115
});
116-
ActorContext.Send(ev->Sender, new TEvContinueRequest(status, {}, {}, issues));
116+
TActivationContext::Send(ev->Sender, std::make_unique<TEvContinueRequest>(status, TString{}, NResourcePool::TPoolSettings{}, issues));
117117
}
118118
PendingRequersts.clear();
119119
}
120120
};
121121

122122
struct TPoolState {
123123
NActors::TActorId PoolHandler;
124-
NActors::TActorContext ActorContext;
125124

126125
std::queue<TEvPrivate::TEvResolvePoolResponse::TPtr> PendingRequests = {};
127126
bool WaitingInitialization = false;
@@ -137,7 +136,7 @@ struct TPoolState {
137136
return;
138137
}
139138

140-
ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
139+
TActivationContext::Send(PoolHandler, std::make_unique<TEvPrivate::TEvStopPoolHandler>(false));
141140
PreviousPoolHandlers.insert(PoolHandler);
142141
PoolHandler = *NewPoolHandler;
143142
NewPoolHandler = std::nullopt;
@@ -151,7 +150,7 @@ struct TPoolState {
151150

152151
PlaceRequestRunning = true;
153152
InFlightRequests++;
154-
ActorContext.Send(PendingRequests.front()->Forward(PoolHandler));
153+
TActivationContext::Send(PendingRequests.front()->Forward(PoolHandler));
155154
PendingRequests.pop();
156155
}
157156

@@ -163,31 +162,29 @@ struct TPoolState {
163162

164163
void DoCleanupRequest(TEvCleanupRequest::TPtr event) {
165164
for (const auto& poolHandler : PreviousPoolHandlers) {
166-
ActorContext.Send(poolHandler, new TEvCleanupRequest(
165+
TActivationContext::Send(poolHandler, std::make_unique<TEvCleanupRequest>(
167166
event->Get()->DatabaseId, event->Get()->SessionId,
168167
event->Get()->PoolId, event->Get()->Duration, event->Get()->CpuConsumed
169168
));
170169
}
171-
ActorContext.Send(event->Forward(PoolHandler));
170+
TActivationContext::Send(event->Forward(PoolHandler));
172171
}
173172
};
174173

175174
struct TCpuQuotaManagerState {
176175
TCpuQuotaManager CpuQuotaManager;
177-
NActors::TActorContext ActorContext;
178176
bool CpuLoadRequestRunning = false;
179177
TInstant CpuLoadRequestTime = TInstant::Zero();
180178

181-
TCpuQuotaManagerState(NActors::TActorContext actorContext, NMonitoring::TDynamicCounterPtr subComponent)
179+
TCpuQuotaManagerState(NMonitoring::TDynamicCounterPtr subComponent)
182180
: CpuQuotaManager(TDuration::Seconds(1), TDuration::Seconds(10), IDLE_DURATION, 0.1, true, 0, subComponent)
183-
, ActorContext(actorContext)
184181
{}
185182

186183
void RequestCpuQuota(TActorId poolHandler, double maxClusterLoad, ui64 coockie) {
187184
auto response = CpuQuotaManager.RequestCpuQuota(0.0, maxClusterLoad);
188185

189186
bool quotaAccepted = response.Status == NYdb::EStatus::SUCCESS;
190-
ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie);
187+
TActivationContext::Send(poolHandler, std::make_unique<TEvPrivate::TEvCpuQuotaResponse>(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie);
191188

192189
// Schedule notification
193190
if (!quotaAccepted) {
@@ -238,7 +235,7 @@ struct TCpuQuotaManagerState {
238235
}
239236

240237
for (const TActorId& poolHandler : poolHandlers) {
241-
ActorContext.Send(poolHandler, new TEvPrivate::TEvRefreshPoolState());
238+
TActivationContext::Send(poolHandler, std::make_unique<TEvPrivate::TEvRefreshPoolState>());
242239
HandlersLimits.erase(poolHandler);
243240
}
244241
PendingHandlers.erase(PendingHandlers.begin());

0 commit comments

Comments
 (0)