Skip to content

Commit 70f66fb

Browse files
authored
Aggregated spilling stats in KQP pipeline (#8220)
1 parent 77844b4 commit 70f66fb

File tree

4 files changed

+64
-8
lines changed

4 files changed

+64
-8
lines changed

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ void TTableStats::Resize(ui32 taskCount) {
167167

168168
void TStageExecutionStats::Resize(ui32 taskCount) {
169169
CpuTimeUs.Resize(taskCount);
170-
SourceCpuTimeUs.Resize(taskCount);
170+
SourceCpuTimeUs.resize(taskCount);
171171

172172
InputRows.resize(taskCount);
173173
InputBytes.resize(taskCount);
@@ -187,6 +187,11 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
187187
WaitInputTimeUs.resize(taskCount);
188188
WaitOutputTimeUs.resize(taskCount);
189189

190+
SpillingComputeBytes.Resize(taskCount);
191+
SpillingChannelBytes.Resize(taskCount);
192+
SpillingComputeTimeUs.Resize(taskCount);
193+
SpillingChannelTimeUs.Resize(taskCount);
194+
190195
for (auto& p : Ingress) p.second.Resize(taskCount);
191196
for (auto& p : Input) p.second.Resize(taskCount);
192197
for (auto& p : Output) p.second.Resize(taskCount);
@@ -198,17 +203,17 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
198203
void TStageExecutionStats::SetHistorySampleCount(ui32 historySampleCount) {
199204
HistorySampleCount = historySampleCount;
200205
CpuTimeUs.HistorySampleCount = historySampleCount;
201-
SourceCpuTimeUs.HistorySampleCount = historySampleCount;
202206
MaxMemoryUsage.HistorySampleCount = historySampleCount;
207+
SpillingComputeBytes.HistorySampleCount = historySampleCount;
208+
SpillingChannelBytes.HistorySampleCount = historySampleCount;
209+
SpillingComputeTimeUs.HistorySampleCount = historySampleCount;
210+
SpillingChannelTimeUs.HistorySampleCount = historySampleCount;
203211
}
204212

205213
void TStageExecutionStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats) {
206214
if (stageStats.HasCpuTimeUs()) {
207215
CpuTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableCpuTimeUs());
208216
}
209-
if (stageStats.HasSourceCpuTimeUs()) {
210-
SourceCpuTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableSourceCpuTimeUs());
211-
}
212217
for (auto& p : *stageStats.MutableIngress()) {
213218
auto it = Ingress.find(p.first);
214219
if (it != Ingress.end()) {
@@ -236,6 +241,18 @@ void TStageExecutionStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqSta
236241
if (stageStats.HasMaxMemoryUsage()) {
237242
MaxMemoryUsage.ExportHistory(baseTimeMs, *stageStats.MutableMaxMemoryUsage());
238243
}
244+
if (stageStats.HasSpillingComputeBytes()) {
245+
SpillingComputeBytes.ExportHistory(baseTimeMs, *stageStats.MutableSpillingComputeBytes());
246+
}
247+
if (stageStats.HasSpillingChannelBytes()) {
248+
SpillingChannelBytes.ExportHistory(baseTimeMs, *stageStats.MutableSpillingChannelBytes());
249+
}
250+
if (stageStats.HasSpillingComputeTimeUs()) {
251+
SpillingComputeTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableSpillingComputeTimeUs());
252+
}
253+
if (stageStats.HasSpillingChannelTimeUs()) {
254+
SpillingChannelTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableSpillingChannelTimeUs());
255+
}
239256
}
240257

241258
void SetNonZero(ui64& target, ui64 source) {
@@ -295,7 +312,7 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
295312
}
296313

297314
CpuTimeUs.SetNonZero(index, taskStats.GetCpuTimeUs());
298-
SourceCpuTimeUs.SetNonZero(index, taskStats.GetSourceCpuTimeUs());
315+
SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs());
299316

300317
SetNonZero(InputRows[index], taskStats.GetInputRows());
301318
SetNonZero(InputBytes[index], taskStats.GetInputBytes());
@@ -321,6 +338,11 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
321338
SetNonZero(WaitInputTimeUs[index], taskStats.GetWaitInputTimeUs());
322339
SetNonZero(WaitOutputTimeUs[index], taskStats.GetWaitOutputTimeUs());
323340

341+
SpillingComputeBytes.SetNonZero(index, taskStats.GetSpillingComputeWriteBytes());
342+
SpillingChannelBytes.SetNonZero(index, taskStats.GetSpillingChannelWriteBytes());
343+
SpillingComputeTimeUs.SetNonZero(index, taskStats.GetSpillingComputeReadTimeUs() + taskStats.GetSpillingComputeWriteTimeUs());
344+
SpillingChannelTimeUs.SetNonZero(index, taskStats.GetSpillingChannelReadTimeUs() + taskStats.GetSpillingChannelWriteTimeUs());
345+
324346
for (auto& tableStat : taskStats.GetTables()) {
325347
auto tablePath = tableStat.GetTablePath();
326348
auto [it, inserted] = Tables.try_emplace(tablePath, taskCount);
@@ -594,6 +616,12 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
594616
UpdateAggr(stageStats->MutableDurationUs(), stats.GetDurationUs());
595617
UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs());
596618
UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs());
619+
620+
UpdateAggr(stageStats->MutableSpillingComputeBytes(), task.GetSpillingComputeWriteBytes());
621+
UpdateAggr(stageStats->MutableSpillingChannelBytes(), task.GetSpillingChannelWriteBytes());
622+
UpdateAggr(stageStats->MutableSpillingComputeTimeUs(), task.GetSpillingComputeReadTimeUs() + task.GetSpillingComputeWriteTimeUs());
623+
UpdateAggr(stageStats->MutableSpillingChannelTimeUs(), task.GetSpillingChannelReadTimeUs() + task.GetSpillingChannelWriteTimeUs());
624+
597625
FillStageDurationUs(*stageStats);
598626

599627
for (auto& sourcesStat : task.GetSources()) {
@@ -964,7 +992,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
964992

965993
stageStats.SetBaseTimeMs(BaseTimeMs);
966994
p.second.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs());
967-
p.second.SourceCpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSourceCpuTimeUs());
995+
ExportAggStats(p.second.SourceCpuTimeUs, *stageStats.MutableSourceCpuTimeUs());
968996
p.second.MaxMemoryUsage.ExportAggStats(BaseTimeMs, *stageStats.MutableMaxMemoryUsage());
969997

970998
ExportAggStats(p.second.InputRows, *stageStats.MutableInputRows());
@@ -984,6 +1012,12 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
9841012
ExportAggStats(p.second.DurationUs, *stageStats.MutableDurationUs());
9851013
ExportAggStats(p.second.WaitInputTimeUs, *stageStats.MutableWaitInputTimeUs());
9861014
ExportAggStats(p.second.WaitOutputTimeUs, *stageStats.MutableWaitOutputTimeUs());
1015+
1016+
p.second.SpillingComputeBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeBytes());
1017+
p.second.SpillingChannelBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelBytes());
1018+
p.second.SpillingComputeTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeTimeUs());
1019+
p.second.SpillingChannelTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelTimeUs());
1020+
9871021
FillStageDurationUs(stageStats);
9881022

9891023
for (auto& p2 : p.second.Tables) {

ydb/core/kqp/executer_actor/kqp_executer_stats.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ struct TStageExecutionStats {
9191
std::map<ui32, ui32> Task2Index;
9292

9393
TTimeSeriesStats CpuTimeUs;
94-
TTimeSeriesStats SourceCpuTimeUs;
94+
std::vector<ui64> SourceCpuTimeUs;
9595

9696
std::vector<ui64> InputRows;
9797
std::vector<ui64> InputBytes;
@@ -111,6 +111,11 @@ struct TStageExecutionStats {
111111
std::vector<ui64> WaitInputTimeUs;
112112
std::vector<ui64> WaitOutputTimeUs;
113113

114+
TTimeSeriesStats SpillingComputeBytes;
115+
TTimeSeriesStats SpillingChannelBytes;
116+
TTimeSeriesStats SpillingComputeTimeUs;
117+
TTimeSeriesStats SpillingChannelTimeUs;
118+
114119
std::map<TString, TTableStats> Tables;
115120
std::map<TString, TAsyncBufferStats> Ingress;
116121
std::map<TString, TAsyncBufferStats> Egress;

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2703,6 +2703,18 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
27032703
if ((*stat)->HasMaxMemoryUsage()) {
27042704
FillAggrStat(stats, (*stat)->GetMaxMemoryUsage(), "MaxMemoryUsage");
27052705
}
2706+
if ((*stat)->HasSpillingComputeBytes()) {
2707+
FillAggrStat(stats, (*stat)->GetSpillingComputeBytes(), "SpillingComputeBytes");
2708+
}
2709+
if ((*stat)->HasSpillingChannelBytes()) {
2710+
FillAggrStat(stats, (*stat)->GetSpillingChannelBytes(), "SpillingChannelBytes");
2711+
}
2712+
if ((*stat)->HasSpillingComputeTimeUs()) {
2713+
FillAggrStat(stats, (*stat)->GetSpillingComputeTimeUs(), "SpillingComputeTimeUs");
2714+
}
2715+
if ((*stat)->HasSpillingChannelTimeUs()) {
2716+
FillAggrStat(stats, (*stat)->GetSpillingChannelTimeUs(), "SpillingChannelTimeUs");
2717+
}
27062718

27072719
if (!(*stat)->GetIngress().empty()) {
27082720
auto& ingressStats = stats.InsertValue("Ingress", NJson::JSON_ARRAY);

ydb/library/yql/dq/actors/protos/dq_stats.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,11 @@ message TDqStageStats {
335335
TDqStatsAggr EgressBytes = 30;
336336
TDqStatsAggr EgressRows = 31;
337337

338+
TDqStatsAggr SpillingComputeBytes = 39;
339+
TDqStatsAggr SpillingChannelBytes = 40;
340+
TDqStatsAggr SpillingComputeTimeUs = 41;
341+
TDqStatsAggr SpillingChannelTimeUs = 42;
342+
338343
reserved 13; // FirstRowTimeMs
339344
reserved 14; // FinishTimeMs
340345
reserved 21; // StartTimeMs

0 commit comments

Comments
 (0)