Skip to content

Commit 120f983

Browse files
authored
Add utilization and starvation sensors (#9066)
1 parent 0f3e124 commit 120f983

File tree

4 files changed

+144
-0
lines changed

4 files changed

+144
-0
lines changed

ydb/library/actors/interconnect/interconnect_counters.cpp

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,26 @@ namespace NActors {
99

1010
namespace {
1111

12+
static constexpr std::initializer_list<ui32> UtilizationPPM{
13+
1'000, 5'000, 10'000, 50'000, 100'000, 200'000, 300'000, 400'000, 500'000, 600'000, 700'000, 800'000,
14+
900'000, 950'000, 990'000, 999'000, Max<ui32>()
15+
};
16+
17+
namespace {
18+
void UpdateUtilization(auto& prevIndex, auto& array, ui32 value) {
19+
auto comp = [](const auto& x, ui32 y) { return std::get<0>(x) < y; };
20+
auto it = std::lower_bound(array.begin(), array.end(), value, comp);
21+
Y_ABORT_UNLESS(it != array.end());
22+
if (const ui32 index = it - array.begin(); index != prevIndex) {
23+
auto& [_1, prev] = array[prevIndex];
24+
prev->Dec();
25+
auto& [_2, cur] = *it;
26+
cur->Inc();
27+
prevIndex = index;
28+
}
29+
};
30+
}
31+
1232
class TInterconnectCounters: public IInterconnectMetrics {
1333
public:
1434
struct TOutputChannel {
@@ -236,6 +256,11 @@ namespace {
236256
}
237257
}
238258

259+
void SetUtilization(ui32 total, ui32 starvation) override {
260+
UpdateUtilization(PrevUtilization, Utilization, total);
261+
UpdateUtilization(PrevStarvation, Starvation, starvation);
262+
}
263+
239264
void SetPeerInfo(const TString& name, const TString& dataCenterId) override {
240265
if (name != std::exchange(HumanFriendlyPeerHostName, name)) {
241266
PerSessionCounters.Reset();
@@ -308,6 +333,17 @@ namespace {
308333
for (const char *reason : TDisconnectReason::Reasons) {
309334
DisconnectByReason[reason] = disconnectReasonGroup->GetCounter(reason, true);
310335
}
336+
337+
auto group = Counters->GetSubgroup("subsystem", "utilization");
338+
auto util = group->GetSubgroup("sensor", "utilization");
339+
auto starv = group->GetSubgroup("sensor", "starvation");
340+
for (ui32 ppm : UtilizationPPM) {
341+
const TString name = ppm != Max<ui32>() ? ToString(ppm) : "inf";
342+
Utilization.emplace_back(ppm, util->GetNamedCounter("bin", name, false));
343+
Starvation.emplace_back(ppm, starv->GetNamedCounter("bin", name, false));
344+
}
345+
++*std::get<1>(Utilization[PrevUtilization]);
346+
++*std::get<1>(Starvation[PrevStarvation]);
311347
}
312348

313349
Initialized = true;
@@ -346,6 +382,11 @@ namespace {
346382
THashMap<TString, NMonitoring::TDynamicCounters::TCounterPtr> DisconnectByReason;
347383

348384
NMonitoring::TDynamicCounters::TCounterPtr TotalBytesWritten, TotalBytesRead;
385+
386+
ui32 PrevUtilization = 0;
387+
std::vector<std::tuple<ui32, NMonitoring::TDynamicCounters::TCounterPtr>> Utilization;
388+
ui32 PrevStarvation = 0;
389+
std::vector<std::tuple<ui32, NMonitoring::TDynamicCounters::TCounterPtr>> Starvation;
349390
};
350391

351392
class TInterconnectMetrics: public IInterconnectMetrics {
@@ -555,6 +596,11 @@ namespace {
555596
}
556597
}
557598

599+
void SetUtilization(ui32 total, ui32 starvation) override {
600+
UpdateUtilization(PrevUtilization_, Utilization_, total);
601+
UpdateUtilization(PrevStarvation_, Starvation_, starvation);
602+
}
603+
558604
void SetPeerInfo(const TString& name, const TString& dataCenterId) override {
559605
if (name != std::exchange(HumanFriendlyPeerHostName, name)) {
560606
PerSessionMetrics_.reset();
@@ -638,6 +684,26 @@ namespace {
638684
{"reason", reason},
639685
}));
640686
}
687+
688+
for (ui32 ppm : UtilizationPPM) {
689+
const TString name = ppm != Max<ui32>() ? ToString(ppm) : "inf";
690+
Utilization_.emplace_back(ppm, Metrics_->IntGauge(
691+
NMonitoring::MakeLabels({
692+
{"subsystem", "utilization"},
693+
{"sensor", "utilization"},
694+
{"bin", name},
695+
})
696+
));
697+
Starvation_.emplace_back(ppm, Metrics_->IntGauge(
698+
NMonitoring::MakeLabels({
699+
{"subsystem", "utilization"},
700+
{"sensor", "starvation"},
701+
{"bin", name},
702+
})
703+
));
704+
}
705+
std::get<1>(Utilization_[PrevUtilization_])->Inc();
706+
std::get<1>(Starvation_[PrevStarvation_])->Inc();
641707
}
642708

643709
Initialized_ = true;
@@ -690,6 +756,11 @@ namespace {
690756

691757
NMonitoring::IRate* TotalBytesWritten_;
692758
NMonitoring::IRate* TotalBytesRead_;
759+
760+
ui32 PrevUtilization_ = 0;
761+
std::vector<std::tuple<ui32, NMonitoring::IIntGauge*>> Utilization_;
762+
ui32 PrevStarvation_ = 0;
763+
std::vector<std::tuple<ui32, NMonitoring::IIntGauge*>> Starvation_;
693764
};
694765

695766
} // namespace

ydb/library/actors/interconnect/interconnect_counters.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class IInterconnectMetrics {
4343
virtual void UpdatePingTimeHistogram(ui64 value) = 0;
4444
virtual void UpdateOutputChannelTraffic(ui16 channel, ui64 value) = 0;
4545
virtual void UpdateOutputChannelEvents(ui16 channel) = 0;
46+
virtual void SetUtilization(ui32 total, ui32 starvation) = 0;
4647
TString GetHumanFriendlyPeerHostName() const {
4748
return HumanFriendlyPeerHostName.value_or(TString());
4849
}

ydb/library/actors/interconnect/interconnect_tcp_session.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ namespace NActors {
3939
, OutputCounter(0ULL)
4040
{
4141
Proxy->Metrics->SetConnected(0);
42+
PartUpdateTimestamp = GetCycleCountFast();
4243
ReceiveContext.Reset(new TReceiveContext);
4344
}
4445

@@ -143,6 +144,9 @@ namespace NActors {
143144

144145
SetOutputStuckFlag(true);
145146
++NumEventsInQueue;
147+
if (State == EState::Idle) {
148+
UpdateState(EState::Utilized);
149+
}
146150
RearmCloseOnIdle();
147151

148152
LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInQueue, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
@@ -408,9 +412,15 @@ namespace NActors {
408412
TimeLimit.emplace(GetMaxCyclesPerEvent());
409413
}
410414

415+
if (NumEventsInQueue || OutgoingStream || OutOfBandStream || XdcStream) {
416+
UpdateState(EState::Utilized);
417+
}
418+
411419
// generate ping request, if needed
412420
IssuePingRequest();
413421

422+
bool notEnoughCpu = false;
423+
414424
while (Socket) {
415425
ProducePackets();
416426
if (!Socket) {
@@ -438,6 +448,7 @@ namespace NActors {
438448
} else if (TimeLimit->CheckExceeded()) {
439449
SetEnoughCpu(++StarvingInRow < StarvingInRowForNotEnoughCpu);
440450
IssueRam(false);
451+
notEnoughCpu = true;
441452
break;
442453
}
443454
}
@@ -449,6 +460,10 @@ namespace NActors {
449460

450461
// equalize channel weights
451462
EqualizeCounter += ChannelScheduler->Equalize();
463+
464+
// update state
465+
const bool finished = !NumEventsInQueue && !OutgoingStream && !OutOfBandStream && !XdcStream;
466+
UpdateState(finished ? EState::Idle : notEnoughCpu ? EState::WaitingCpu : EState::Utilized);
452467
}
453468

454469
void TInterconnectSessionTCP::ProducePackets() {
@@ -533,6 +548,7 @@ namespace NActors {
533548
XdcSocket->Shutdown(SHUT_RDWR);
534549
XdcSocket.Reset();
535550
}
551+
UpdateState(EState::Idle);
536552
}
537553

538554
void TInterconnectSessionTCP::ReestablishConnectionExecute() {
@@ -1120,6 +1136,10 @@ namespace NActors {
11201136
}
11211137
}
11221138

1139+
void TInterconnectSessionTCP::UpdateUtilization() {
1140+
Proxy->Metrics->SetUtilization(1'000'000 * Utilized, 1'000'000 * Starving);
1141+
}
1142+
11231143
void TInterconnectSessionTCP::GenerateHttpInfo(NMon::TEvHttpInfoRes::TPtr& ev) {
11241144
TStringStream str;
11251145
ev->Get()->Output(str);
@@ -1332,6 +1352,15 @@ namespace NActors {
13321352
MON_VAR(CpuStarvationEvents)
13331353
MON_VAR(CpuStarvationEventsOnWriteData)
13341354

1355+
UpdateState();
1356+
1357+
TABLER() {
1358+
TABLED() { str << "Utilization"; }
1359+
TABLED() {
1360+
str << Sprintf("%.1f%%", 100 * Utilized) << " / " << Sprintf("%.1f%%", Starving);
1361+
}
1362+
}
1363+
13351364
TString clockSkew;
13361365
i64 x = GetClockSkew();
13371366
if (x < 0) {

ydb/library/actors/interconnect/interconnect_tcp_session.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ namespace NActors {
475475
hFunc(TEvTerminate, Handle)
476476
hFunc(TEvProcessPingRequest, Handle)
477477
)
478+
UpdateUtilization();
478479
}
479480

480481
void Handle(TEvUpdateFromInputSession::TPtr& ev);
@@ -650,6 +651,48 @@ namespace NActors {
650651
ui64 EqualizeCounter = 0;
651652

652653
ui64 StarvingInRow = 0;
654+
655+
enum class EState {
656+
Utilized = 0,
657+
WaitingCpu = 1,
658+
Idle = 2,
659+
} State = EState::Idle;
660+
661+
double UtilizedPart = 0;
662+
double WaitingCpuPart = 0;
663+
double IdlePart = 0;
664+
double Total = 0;
665+
double Utilized = 0;
666+
double Starving = 0;
667+
NHPTimer::STime PartUpdateTimestamp = 0;
668+
669+
void UpdateState(std::optional<EState> newState = std::nullopt) {
670+
if (!newState || *newState != State) {
671+
const NHPTimer::STime timestamp = GetCycleCountFast();
672+
const TDuration passed = CyclesToDuration(timestamp - std::exchange(PartUpdateTimestamp, timestamp));
673+
const double seconds = passed.SecondsFloat();
674+
const double factor = pow(0.8, seconds); // in 20 seconds we will get approx 1% of initial value
675+
const double shift = 4 * (1 - factor);
676+
UtilizedPart *= factor;
677+
WaitingCpuPart *= factor;
678+
IdlePart *= factor;
679+
switch (State) {
680+
case EState::Utilized: UtilizedPart += shift; break;
681+
case EState::WaitingCpu: WaitingCpuPart += shift; break;
682+
case EState::Idle: IdlePart += shift; break;
683+
}
684+
Total = UtilizedPart + WaitingCpuPart + IdlePart;
685+
if (Total) {
686+
Utilized = (Utilized + WaitingCpuPart) / Total;
687+
Starving = WaitingCpuPart / Total;
688+
}
689+
if (newState) {
690+
State = *newState;
691+
}
692+
}
693+
}
694+
695+
void UpdateUtilization();
653696
};
654697

655698
class TInterconnectSessionKiller

0 commit comments

Comments
 (0)