Skip to content

Commit 32146c7

Browse files
committed
Fixed fault
1 parent e1d6873 commit 32146c7

File tree

4 files changed

+28
-16
lines changed

4 files changed

+28
-16
lines changed

ydb/core/tx/conveyor/service/service.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ TWorkersPool::TWorkersPool(const TString& conveyorName, const NActors::TActorId&
1010
Workers.reserve(WorkersCount);
1111
for (ui32 i = 0; i < WorkersCount; ++i) {
1212
const auto usage = config.GetWorkerCPUUsage(i);
13-
Workers.emplace_back(std::make_unique<TWorker>(conveyorName, usage, distributorId, i, Counters.SendFwdHistogram, Counters.SendFwdDuration));
13+
Workers.emplace_back(usage, std::make_unique<TWorker>(conveyorName, usage, distributorId, i, Counters.SendFwdHistogram, Counters.SendFwdDuration));
1414
MaxWorkerThreads += usage;
1515
}
1616
AFL_VERIFY(WorkersCount)("name", conveyorName)("action", "conveyor_registered")("config", config.DebugString())("actor_id", distributorId)("count", WorkersCount);
@@ -46,9 +46,6 @@ void TWorkersPool::ReleaseWorker(const ui32 workerIdx) {
4646

4747
void TWorkersPool::ChangeAmountCPULimit(const double delta) {
4848
AmountCPULimit += delta;
49-
if (std::abs(AmountCPULimit) < Eps) {
50-
AmountCPULimit = 0;
51-
}
5249
AFL_VERIFY(AmountCPULimit >= 0);
5350
Counters.AmountCPULimit->Set(AmountCPULimit);
5451
Counters.ChangeCPULimitRate->Inc();
@@ -61,15 +58,15 @@ void TWorkersPool::ChangeAmountCPULimit(const double delta) {
6158
ActiveWorkersCount = 0;
6259
ActiveWorkerThreads = numberThreads;
6360
ActiveWorkersIdx.clear();
64-
for (const auto& worker : Workers) {
61+
for (auto& worker : Workers) {
6562
if (numberThreads <= 0) {
6663
break;
6764
}
6865
if (!worker.GetRunningTask()) {
6966
ActiveWorkersIdx.emplace_back(ActiveWorkersCount);
7067
}
71-
worker.GetWorker()->UpdateCPUSoftLimit(std::min<double>(numberThreads, 1));
72-
numberThreads -= worker.GetWorker()->GetCPUSoftLimit();
68+
worker.ChangeCPUSoftLimit(std::min<double>(numberThreads, 1));
69+
numberThreads -= worker.GetCPUSoftLimit();
7370
++ActiveWorkersCount;
7471
}
7572
AFL_VERIFY(std::abs(numberThreads) < Eps);

ydb/core/tx/conveyor/service/service.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,14 +188,21 @@ class TProcess {
188188
class TWorkersPool {
189189
class TWorkerInfo {
190190
YDB_READONLY(bool, RunningTask, false);
191-
YDB_READONLY(TWorker*, Worker, nullptr);
191+
YDB_READONLY(double, CPUSoftLimit, 0.0);
192192
YDB_READONLY_DEF(TActorId, WorkerId);
193193
public:
194-
explicit TWorkerInfo(std::unique_ptr<TWorker> worker)
195-
: Worker(worker.get())
194+
TWorkerInfo(const double cpuSoftLimit, std::unique_ptr<TWorker> worker)
195+
: CPUSoftLimit(cpuSoftLimit)
196196
, WorkerId(TActivationContext::Register(worker.release())) {
197197
}
198198

199+
void ChangeCPUSoftLimit(const double newCPUSoftLimit) {
200+
if (std::abs(newCPUSoftLimit - CPUSoftLimit) > Eps) {
201+
TActivationContext::Send(WorkerId, std::make_unique<TEvInternal::TEvChangeCPUSoftLimit>(newCPUSoftLimit));
202+
CPUSoftLimit = newCPUSoftLimit;
203+
}
204+
}
205+
199206
void OnStartTask() {
200207
AFL_VERIFY(!RunningTask);
201208
RunningTask = true;

ydb/core/tx/conveyor/service/worker.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ void TWorker::HandleMain(TEvInternal::TEvNewTask::TPtr& ev) {
6262
ExecuteTask(ev->Get()->ExtractTasks());
6363
}
6464

65-
void TWorker::UpdateCPUSoftLimit(const double cpuSoftLimit) {
65+
void TWorker::HandleMain(TEvInternal::TEvChangeCPUSoftLimit::TPtr& ev) {
66+
const auto cpuSoftLimit = ev->Get()->GetCPUSoftLimit();
6667
AFL_VERIFY(0 < cpuSoftLimit);
6768
AFL_VERIFY(cpuSoftLimit <= CPUHardLimit);
6869
CPUSoftLimit = cpuSoftLimit;
@@ -71,9 +72,6 @@ void TWorker::UpdateCPUSoftLimit(const double cpuSoftLimit) {
7172
return;
7273
}
7374

74-
Sleep(TDuration::MilliSeconds(10));
75-
76-
AFL_VERIFY(!Instants.empty());
7775
auto wakeupTime = Instants.back();
7876
if (CPUSoftLimit < 1) {
7977
wakeupTime += GetWakeupDuration();

ydb/core/tx/conveyor/service/worker.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ struct TEvInternal {
4444
enum EEv {
4545
EvNewTask = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
4646
EvTaskProcessedResult,
47+
EvChangeCPUSoftLimit,
4748
EvEnd
4849
};
4950

@@ -84,6 +85,15 @@ struct TEvInternal {
8485
AFL_VERIFY(Instants.size() == ProcessIds.size() + 1);
8586
}
8687
};
88+
89+
class TEvChangeCPUSoftLimit: public NActors::TEventLocal<TEvChangeCPUSoftLimit, EvChangeCPUSoftLimit> {
90+
private:
91+
YDB_READONLY(double, CPUSoftLimit, 0.0);
92+
public:
93+
explicit TEvChangeCPUSoftLimit(const double cpuSoftLimit)
94+
: CPUSoftLimit(cpuSoftLimit) {
95+
}
96+
};
8797
};
8898

8999
class TWorker: public NActors::TActorBootstrapped<TWorker> {
@@ -104,13 +114,15 @@ class TWorker: public NActors::TActorBootstrapped<TWorker> {
104114
void ExecuteTask(std::vector<TWorkerTask>&& workerTasks);
105115
void HandleMain(TEvInternal::TEvNewTask::TPtr& ev);
106116
void HandleMain(NActors::TEvents::TEvWakeup::TPtr& ev);
117+
void HandleMain(TEvInternal::TEvChangeCPUSoftLimit::TPtr& ev);
107118
void OnWakeup();
108119
public:
109120

110121
STATEFN(StateMain) {
111122
switch (ev->GetTypeRewrite()) {
112123
hFunc(TEvInternal::TEvNewTask, HandleMain);
113124
hFunc(NActors::TEvents::TEvWakeup, HandleMain);
125+
hFunc(TEvInternal::TEvChangeCPUSoftLimit, HandleMain);
114126
default:
115127
ALS_ERROR(NKikimrServices::TX_CONVEYOR) << "unexpected event for task executor: " << ev->GetTypeRewrite();
116128
break;
@@ -132,8 +144,6 @@ class TWorker: public NActors::TActorBootstrapped<TWorker> {
132144
AFL_VERIFY(0 < CPUHardLimit);
133145
AFL_VERIFY(CPUHardLimit <= 1);
134146
}
135-
136-
void UpdateCPUSoftLimit(const double cpuSoftLimit);
137147
};
138148

139149
}

0 commit comments

Comments
 (0)