Skip to content

Commit 0d17f55

Browse files
authored
merge to stable kqp cpu scheduler fix use after free (#20155)
2 parents 82e47de + 67c94f4 commit 0d17f55

File tree

6 files changed

+44
-19
lines changed

6 files changed

+44
-19
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
210210
share = 1.0;
211211
}
212212
std::optional<double> resourceWeight;
213-
if (msg.GetResourceWeight() >= 0) {
213+
if (msg.HasResourceWeight() && msg.GetResourceWeight() >= 0) {
214214
resourceWeight = msg.GetResourceWeight();
215215
}
216216

ydb/core/kqp/runtime/kqp_compute_scheduler.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -188,14 +188,17 @@ class TObservableUpdater {
188188

189189
void CollectValues() {
190190
std::vector<TParameterKey> toerase;
191-
for (auto& [k, v] : Params) {
192-
if (!v.Holder->HasDependents()) {
193-
toerase.push_back(k);
191+
do {
192+
toerase.clear();
193+
for (auto& [k, v] : Params) {
194+
if (!v.Holder->HasDependents()) {
195+
toerase.push_back(k);
196+
}
194197
}
195-
}
196-
for (auto& key : toerase) {
197-
Params.erase(key);
198-
}
198+
for (auto& key : toerase) {
199+
Params.erase(key);
200+
}
201+
} while (!toerase.empty());
199202
}
200203

201204
private:
@@ -897,7 +900,8 @@ class TCompositeGroupShare : public IObservableValue<double> {
897900
protected:
898901
double DoUpdateValue() override {
899902
if (ResourceWeightEnabled->GetValue()) {
900-
if (ResourceWeightLimit->Enabled()->GetValue()) {
903+
auto limitEnabled = ResourceWeightLimit->Enabled();
904+
if (limitEnabled->GetValue()) {
901905
return Min(TotalLimit->GetValue(), ResourceWeightLimit->GetValue());
902906
} else {
903907
return 0;

ydb/core/tx/conveyor/service/service.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ TWorkersPool::TWorkersPool(const TString& conveyorName, const NActors::TActorId&
1010
Workers.reserve(WorkersCount);
1111
for (ui32 i = 0; i < WorkersCount; ++i) {
1212
const auto usage = config.GetWorkerCPUUsage(i);
13-
Workers.emplace_back(std::make_unique<TWorker>(conveyorName, usage, distributorId, i, Counters.SendFwdHistogram, Counters.SendFwdDuration));
13+
Workers.emplace_back(usage, std::make_unique<TWorker>(conveyorName, usage, distributorId, i, Counters.SendFwdHistogram, Counters.SendFwdDuration));
1414
MaxWorkerThreads += usage;
1515
}
1616
AFL_VERIFY(WorkersCount)("name", conveyorName)("action", "conveyor_registered")("config", config.DebugString())("actor_id", distributorId)("count", WorkersCount);
@@ -46,6 +46,9 @@ void TWorkersPool::ReleaseWorker(const ui32 workerIdx) {
4646

4747
void TWorkersPool::ChangeAmountCPULimit(const double delta) {
4848
AmountCPULimit += delta;
49+
if (std::abs(AmountCPULimit) < Eps) {
50+
AmountCPULimit = 0;
51+
}
4952
AFL_VERIFY(AmountCPULimit >= 0);
5053
Counters.AmountCPULimit->Set(AmountCPULimit);
5154
Counters.ChangeCPULimitRate->Inc();
@@ -58,15 +61,15 @@ void TWorkersPool::ChangeAmountCPULimit(const double delta) {
5861
ActiveWorkersCount = 0;
5962
ActiveWorkerThreads = numberThreads;
6063
ActiveWorkersIdx.clear();
61-
for (const auto& worker : Workers) {
64+
for (auto& worker : Workers) {
6265
if (numberThreads <= 0) {
6366
break;
6467
}
6568
if (!worker.GetRunningTask()) {
6669
ActiveWorkersIdx.emplace_back(ActiveWorkersCount);
6770
}
68-
worker.GetWorker()->UpdateCPUSoftLimit(std::min<double>(numberThreads, 1));
69-
numberThreads -= worker.GetWorker()->GetCPUSoftLimit();
71+
worker.ChangeCPUSoftLimit(std::min<double>(numberThreads, 1));
72+
numberThreads -= worker.GetCPUSoftLimit();
7073
++ActiveWorkersCount;
7174
}
7275
AFL_VERIFY(std::abs(numberThreads) < Eps);

ydb/core/tx/conveyor/service/service.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,14 +188,21 @@ class TProcess {
188188
class TWorkersPool {
189189
class TWorkerInfo {
190190
YDB_READONLY(bool, RunningTask, false);
191-
YDB_READONLY(TWorker*, Worker, nullptr);
191+
YDB_READONLY(double, CPUSoftLimit, 0.0);
192192
YDB_READONLY_DEF(TActorId, WorkerId);
193193
public:
194-
explicit TWorkerInfo(std::unique_ptr<TWorker> worker)
195-
: Worker(worker.get())
194+
TWorkerInfo(const double cpuSoftLimit, std::unique_ptr<TWorker> worker)
195+
: CPUSoftLimit(cpuSoftLimit)
196196
, WorkerId(TActivationContext::Register(worker.release())) {
197197
}
198198

199+
void ChangeCPUSoftLimit(const double newCPUSoftLimit) {
200+
if (std::abs(newCPUSoftLimit - CPUSoftLimit) > Eps) {
201+
TActivationContext::Send(WorkerId, std::make_unique<TEvInternal::TEvChangeCPUSoftLimit>(newCPUSoftLimit));
202+
CPUSoftLimit = newCPUSoftLimit;
203+
}
204+
}
205+
199206
void OnStartTask() {
200207
AFL_VERIFY(!RunningTask);
201208
RunningTask = true;

ydb/core/tx/conveyor/service/worker.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) {
6262
ExecuteTask(ev->Get()->ExtractTasks());
6363
}
6464

65-
void TWorker::UpdateCPUSoftLimit(const double cpuSoftLimit) {
65+
void TWorker::HandleMain(TEvInternal::TEvChangeCPUSoftLimit::TPtr& ev) {
66+
const auto cpuSoftLimit = ev->Get()->GetCPUSoftLimit();
6667
AFL_VERIFY(0 < cpuSoftLimit);
6768
AFL_VERIFY(cpuSoftLimit <= CPUHardLimit);
6869
CPUSoftLimit = cpuSoftLimit;

ydb/core/tx/conveyor/service/worker.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ struct TEvInternal {
4444
enum EEv {
4545
EvNewTask = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
4646
EvTaskProcessedResult,
47+
EvChangeCPUSoftLimit,
4748
EvEnd
4849
};
4950

@@ -84,6 +85,15 @@ struct TEvInternal {
8485
AFL_VERIFY(Instants.size() == ProcessIds.size() + 1);
8586
}
8687
};
88+
89+
class TEvChangeCPUSoftLimit: public NActors::TEventLocal<TEvChangeCPUSoftLimit, EvChangeCPUSoftLimit> {
90+
private:
91+
YDB_READONLY(double, CPUSoftLimit, 0.0);
92+
public:
93+
explicit TEvChangeCPUSoftLimit(const double cpuSoftLimit)
94+
: CPUSoftLimit(cpuSoftLimit) {
95+
}
96+
};
8797
};
8898

8999
class TWorker: public NActors::TActorBootstrapped<TWorker> {
@@ -104,13 +114,15 @@ class TWorker: public NActors::TActorBootstrapped<TWorker> {
104114
void ExecuteTask(std::vector<TWorkerTask>&& workerTasks);
105115
void HandleMain(TEvInternal::TEvNewTask::TPtr& ev);
106116
void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev);
117+
void HandleMain(TEvInternal::TEvChangeCPUSoftLimit::TPtr& ev);
107118
void OnWakeup();
108119
public:
109120

110121
STATEFN(StateMain) {
111122
switch (ev->GetTypeRewrite()) {
112123
hFunc(TEvInternal::TEvNewTask, HandleMain);
113124
hFunc(NActors::TEvents::TEvWakeup, HandleMain);
125+
hFunc(TEvInternal::TEvChangeCPUSoftLimit, HandleMain);
114126
default:
115127
ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "unexpected event for task executor: " << ev->GetTypeRewrite();
116128
break;
@@ -132,8 +144,6 @@ class TWorker: public NActors::TActorBootstrapped<TWorker> {
132144
AFL_VERIFY(0 < CPUHardLimit);
133145
AFL_VERIFY(CPUHardLimit <= 1);
134146
}
135-
136-
void UpdateCPUSoftLimit(const double cpuSoftLimit);
137147
};
138148

139149
}

0 commit comments

Comments
 (0)