Skip to content

Commit d222a17

Browse files
authored
KQP Statistics Sensors (#14621)
1 parent 6de49ad commit d222a17

File tree

6 files changed

+257
-58
lines changed

6 files changed

+257
-58
lines changed

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -842,7 +842,7 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
842842
KqpGroup->GetHistogram("SinkWrites/BufferActorCommitLatencyUs", NMonitoring::ExponentialHistogram(28, 2, 1));
843843
BufferActorFlushLatencyHistogram =
844844
KqpGroup->GetHistogram("SinkWrites/BufferActorFlushLatencyUs", NMonitoring::ExponentialHistogram(28, 2, 1));
845-
845+
846846
ForwardActorWritesSizeHistogram =
847847
KqpGroup->GetHistogram("SinkWrites/ForwardActorWritesSize", NMonitoring::ExponentialHistogram(28, 2, 1));
848848
ForwardActorWritesLatencyHistogram =
@@ -876,6 +876,18 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
876876

877877
TotalSingleNodeReqCount = KqpGroup->GetCounter("TotalSingleNodeReqCount", true);
878878
NonLocalSingleNodeReqCount = KqpGroup->GetCounter("NonLocalSingleNodeReqCount", true);
879+
880+
/* Statistics performance */
881+
QueryStatCpuCollectUs = KqpGroup->GetCounter("Query/Stat/CpuCollectUs", true);
882+
QueryStatCpuFinishUs = KqpGroup->GetCounter("Query/Stat/CpuFinishUs", true);
883+
QueryStatCpuConvertUs = KqpGroup->GetCounter("Query/Stat/CpuConvertUs", true);
884+
885+
QueryStatMemCollectInflightBytes = KqpGroup->GetCounter("Query/Stat/MemCollectInflightBytes", false);
886+
QueryStatMemFinishInflightBytes = KqpGroup->GetCounter("Query/Stat/MemFinishInflightBytes", false);
887+
888+
QueryStatMemFinishBytes = KqpGroup->GetCounter("Query/Stat/MemFinishBytes", true);
889+
QueryStatMemConvertBytes = KqpGroup->GetCounter("Query/Stat/MemConvertBytes", true);
890+
879891
}
880892

881893
::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,18 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
467467
TConcurrentRWHashMap<TString, TKqpDbCountersPtr, 256> DbCounters;
468468
TActorSystem* ActorSystem = nullptr;
469469
TActorId DbWatcherActorId;
470+
471+
// Statistics CPU usage
472+
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatCpuCollectUs;
473+
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatCpuFinishUs;
474+
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatCpuConvertUs;
475+
// Statistics MEM inflight (non deriv)
476+
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemCollectInflightBytes;
477+
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemFinishInflightBytes;
478+
// Statistics MEM output (deriv)
479+
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemFinishBytes;
480+
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemConvertBytes;
481+
470482
};
471483

472484
struct TKqpRequestCounters : public TThrRefBase {

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -425,21 +425,33 @@ class TKqpExecuterBase : public TActor<TDerived> {
425425

426426
YQL_ENSURE(Stats);
427427

428-
if (state.HasStats() && Request.ProgressStatsPeriod) {
428+
if (state.HasStats()) {
429+
ui64 cycleCount = GetCycleCountFast();
430+
429431
Stats->UpdateTaskStats(taskId, state.GetStats());
430-
auto now = TInstant::Now();
431-
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
432-
auto progress = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
433-
auto& execStats = *progress->Record.MutableQueryStats()->AddExecutions();
434-
Stats->ExportExecStats(execStats);
435-
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
436-
const auto& tx = Request.Transactions[txId].Body;
437-
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), execStats);
438-
execStats.AddTxPlansWithStats(planWithStats);
432+
if (Request.ProgressStatsPeriod) {
433+
auto now = TInstant::Now();
434+
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
435+
auto progress = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
436+
auto& execStats = *progress->Record.MutableQueryStats()->AddExecutions();
437+
Stats->ExportExecStats(execStats);
438+
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
439+
const auto& tx = Request.Transactions[txId].Body;
440+
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), execStats);
441+
execStats.AddTxPlansWithStats(planWithStats);
442+
}
443+
this->Send(Target, progress.Release());
444+
LastProgressStats = now;
439445
}
440-
this->Send(Target, progress.Release());
441-
LastProgressStats = now;
442446
}
447+
auto collectBytes = Stats->EstimateCollectMem();
448+
auto deltaCpuTime = NHPTimer::GetSeconds(GetCycleCountFast() - cycleCount);
449+
450+
Counters->Counters->QueryStatMemCollectInflightBytes->Add(
451+
static_cast<i64>(collectBytes) - static_cast<i64>(StatCollectInflightBytes)
452+
);
453+
StatCollectInflightBytes = collectBytes;
454+
Counters->Counters->QueryStatCpuCollectUs->Add(deltaCpuTime * 1'000'000);
443455
}
444456

445457
YQL_ENSURE(Planner);
@@ -450,6 +462,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
450462
case NYql::NDqProto::COMPUTE_STATE_FINISHED:
451463
// Don't finalize stats twice.
452464
if (Planner->CompletedCA(taskId, computeActor)) {
465+
ui64 cycleCount = GetCycleCountFast();
466+
453467
auto& extraData = ExtraData[computeActor];
454468
extraData.TaskId = taskId;
455469
extraData.Data.Swap(state.MutableExtraData());
@@ -462,6 +476,15 @@ class TKqpExecuterBase : public TActor<TDerived> {
462476

463477
LastTaskId = taskId;
464478
LastComputeActorId = computeActor.ToString();
479+
480+
auto collectBytes = Stats->EstimateFinishMem();
481+
auto deltaCpuTime = NHPTimer::GetSeconds(GetCycleCountFast() - cycleCount);
482+
483+
Counters->Counters->QueryStatMemFinishInflightBytes->Add(
484+
static_cast<i64>(collectBytes) - static_cast<i64>(StatFinishInflightBytes)
485+
);
486+
StatFinishInflightBytes = collectBytes;
487+
Counters->Counters->QueryStatCpuFinishUs->Add(deltaCpuTime * 1'000'000);
465488
}
466489
default:
467490
; // ignore all other states.
@@ -1974,15 +1997,26 @@ class TKqpExecuterBase : public TActor<TDerived> {
19741997
ReportEventElapsedTime();
19751998

19761999
Stats->FinishTs = TInstant::Now();
2000+
19772001
Stats->Finish();
19782002

19792003
if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
2004+
2005+
ui64 jsonSize = 0;
2006+
ui64 cycleCount = GetCycleCountFast();
2007+
19802008
response.MutableResult()->MutableStats()->ClearTxPlansWithStats();
19812009
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
19822010
const auto& tx = Request.Transactions[txId].Body;
19832011
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
2012+
jsonSize += planWithStats.size();
19842013
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
19852014
}
2015+
2016+
auto deltaCpuTime = NHPTimer::GetSeconds(GetCycleCountFast() - cycleCount);
2017+
Counters->Counters->QueryStatCpuConvertUs->Add(deltaCpuTime * 1'000'000);
2018+
Counters->Counters->QueryStatMemConvertBytes->Add(jsonSize);
2019+
response.MutableResult()->MutableStats()->SetStatConvertBytes(jsonSize);
19862020
}
19872021

19882022
if (Stats->CollectStatsByLongTasks) {
@@ -1991,8 +2025,17 @@ class TKqpExecuterBase : public TActor<TDerived> {
19912025
LOG_I("Full stats: " << response.GetResult().GetStats());
19922026
}
19932027
}
2028+
2029+
auto finishSize = Stats->EstimateFinishMem();
2030+
Counters->Counters->QueryStatMemFinishBytes->Add(finishSize);
2031+
response.MutableResult()->MutableStats()->SetStatFinishBytes(finishSize);
19942032
}
19952033

2034+
Counters->Counters->QueryStatMemCollectInflightBytes->Sub(StatCollectInflightBytes);
2035+
StatCollectInflightBytes = 0;
2036+
Counters->Counters->QueryStatMemFinishInflightBytes->Sub(StatFinishInflightBytes);
2037+
StatFinishInflightBytes = 0;
2038+
19962039
Request.Transactions.crop(0);
19972040
this->Send(Target, ResponseEv.release());
19982041

@@ -2139,6 +2182,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
21392182
const bool VerboseMemoryLimitException;
21402183
TMaybe<ui8> ArrayBufferMinFillPercentage;
21412184

2185+
ui64 StatCollectInflightBytes = 0;
2186+
ui64 StatFinishInflightBytes = 0;
21422187
private:
21432188
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
21442189
};

0 commit comments

Comments
 (0)