Skip to content

Commit ce1388f

Browse files
authored
Update In-progress CS Stats + Y_ASSERTs (#16050)
1 parent 47cccd1 commit ce1388f

File tree

2 files changed

+181
-49
lines changed

2 files changed

+181
-49
lines changed

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 147 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
1212
return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
1313
}
1414

15-
void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
15+
void TTimeSeriesStats::ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats) {
1616
NKikimr::NKqp::ExportAggStats(Values, stats);
17+
}
18+
19+
void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
20+
ExportAggStats(stats);
1721
ExportHistory(baseTimeMs, stats);
1822
}
1923

@@ -28,16 +32,21 @@ void TTimeSeriesStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAg
2832
}
2933
}
3034

31-
void TTimeSeriesStats::Resize(ui32 taskCount) {
32-
Values.resize(taskCount);
35+
void TTimeSeriesStats::Resize(ui32 count) {
36+
Values.resize(count);
3337
}
3438

35-
void TTimeSeriesStats::SetNonZero(ui32 taskIndex, ui64 value) {
39+
void TTimeSeriesStats::SetNonZero(ui32 index, ui64 value) {
3640
if (value) {
41+
Y_ASSERT(index < Values.size());
3742
Sum += value;
38-
Sum -= Values[taskIndex];
39-
Values[taskIndex] = value;
43+
Sum -= Values[index];
44+
Values[index] = value;
45+
AppendHistory();
4046
}
47+
}
48+
49+
void TTimeSeriesStats::AppendHistory() {
4150
if (HistorySampleCount) {
4251
auto nowMs = Now().MilliSeconds();
4352

@@ -97,6 +106,69 @@ void TTimeSeriesStats::Pack() {
97106
}
98107
}
99108

109+
void TPartitionedStats::ResizeByTasks(ui32 taskCount) {
110+
for (auto& p : Parts) {
111+
p.resize(taskCount);
112+
}
113+
}
114+
115+
void TPartitionedStats::ResizeByParts(ui32 partCount, ui32 taskCount) {
116+
auto oldPartCount = Parts.size();
117+
Parts.resize(partCount);
118+
for(auto i = oldPartCount; i < partCount; i++) {
119+
Parts[i].resize(taskCount);
120+
}
121+
Resize(partCount);
122+
}
123+
124+
void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
125+
if (value) {
126+
Y_ASSERT(partIndex < Parts.size());
127+
auto& part = Parts[partIndex];
128+
auto delta = value - part[taskIndex];
129+
Y_ASSERT(taskIndex < part.size());
130+
part[taskIndex] = value;
131+
Y_ASSERT(partIndex < Values.size());
132+
Values[partIndex] += delta;
133+
Sum += delta;
134+
if (recordTimeSeries) {
135+
AppendHistory();
136+
}
137+
}
138+
}
139+
140+
void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries) {
141+
auto [it, inserted] = Indices.try_emplace(key);
142+
if (inserted) {
143+
it->second = Indices.size() - 1;
144+
if (PartCount < Indices.size()) {
145+
PartCount += 4;
146+
}
147+
}
148+
if (stats.Parts.size() < PartCount) {
149+
stats.ResizeByParts(PartCount, TaskCount);
150+
}
151+
stats.SetNonZero(taskIndex, it->second, value, recordTimeSeries);
152+
}
153+
154+
void TExternalStats::Resize(ui32 taskCount) {
155+
ExternalRows.ResizeByTasks(taskCount);
156+
ExternalBytes.ResizeByTasks(taskCount);
157+
FirstMessageMs.ResizeByTasks(taskCount);
158+
LastMessageMs.ResizeByTasks(taskCount);
159+
TaskCount = taskCount;
160+
}
161+
162+
void TExternalStats::SetHistorySampleCount(ui32 historySampleCount) {
163+
ExternalBytes.HistorySampleCount = historySampleCount;
164+
}
165+
166+
void TExternalStats::ExportHistory(ui64 baseTimeMs, NDqProto::TDqExternalAggrStats& stats) {
167+
if (stats.HasExternalBytes()) {
168+
ExternalBytes.ExportHistory(baseTimeMs, *stats.MutableExternalBytes());
169+
}
170+
}
171+
100172
void TAsyncStats::Resize(ui32 taskCount) {
101173
Bytes.Resize(taskCount);
102174
DecompressedBytes.resize(taskCount);
@@ -127,20 +199,25 @@ void TAsyncStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAg
127199
}
128200

129201
void TAsyncBufferStats::Resize(ui32 taskCount) {
202+
External.Resize(taskCount);
130203
Ingress.Resize(taskCount);
131204
Push.Resize(taskCount);
132205
Pop.Resize(taskCount);
133206
Egress.Resize(taskCount);
134207
}
135208

136209
void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) {
210+
External.SetHistorySampleCount(historySampleCount);
137211
Ingress.SetHistorySampleCount(historySampleCount);
138212
Push.SetHistorySampleCount(historySampleCount);
139213
Pop.SetHistorySampleCount(historySampleCount);
140214
Egress.SetHistorySampleCount(historySampleCount);
141215
}
142216

143217
void TAsyncBufferStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
218+
if (stats.HasExternal()) {
219+
External.ExportHistory(baseTimeMs, *stats.MutableExternal());
220+
}
144221
if (stats.HasIngress()) {
145222
Ingress.ExportHistory(baseTimeMs, *stats.MutableIngress());
146223
}
@@ -281,40 +358,46 @@ void TStageExecutionStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqSta
281358
}
282359
}
283360

284-
void SetNonZero(ui64& target, ui64 source) {
361+
inline void SetNonZero(ui64& target, ui64 source) {
285362
if (source) {
286363
target = source;
287364
}
288365
}
289366

367+
inline void SetNonZero(std::vector<ui64>& vector, ui32 index, ui64 value) {
368+
Y_ASSERT(index < vector.size());
369+
SetNonZero(vector[index], value);
370+
}
371+
290372
ui64 TStageExecutionStats::UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
291373
ui64 baseTimeMs = 0;
292374

293375
aggrAsyncStats.Bytes.SetNonZero(index, asyncStats.GetBytes());
294-
SetNonZero(aggrAsyncStats.DecompressedBytes[index], asyncStats.GetDecompressedBytes());
295-
SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows());
296-
SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks());
297-
SetNonZero(aggrAsyncStats.Splits[index], asyncStats.GetSplits());
376+
SetNonZero(aggrAsyncStats.DecompressedBytes, index, asyncStats.GetDecompressedBytes());
377+
SetNonZero(aggrAsyncStats.Rows, index, asyncStats.GetRows());
378+
SetNonZero(aggrAsyncStats.Chunks, index, asyncStats.GetChunks());
379+
SetNonZero(aggrAsyncStats.Splits, index, asyncStats.GetSplits());
298380

299381
auto firstMessageMs = asyncStats.GetFirstMessageMs();
300-
SetNonZero(aggrAsyncStats.FirstMessageMs[index], firstMessageMs);
382+
SetNonZero(aggrAsyncStats.FirstMessageMs, index, firstMessageMs);
301383
baseTimeMs = NonZeroMin(baseTimeMs, firstMessageMs);
302384

303385
auto pauseMessageMs = asyncStats.GetPauseMessageMs();
304-
SetNonZero(aggrAsyncStats.PauseMessageMs[index], pauseMessageMs);
386+
SetNonZero(aggrAsyncStats.PauseMessageMs, index, pauseMessageMs);
305387
baseTimeMs = NonZeroMin(baseTimeMs, pauseMessageMs);
306388

307389
auto resumeMessageMs = asyncStats.GetResumeMessageMs();
308-
SetNonZero(aggrAsyncStats.ResumeMessageMs[index], resumeMessageMs);
390+
SetNonZero(aggrAsyncStats.ResumeMessageMs, index, resumeMessageMs);
309391
baseTimeMs = NonZeroMin(baseTimeMs, resumeMessageMs);
310392

311393
auto lastMessageMs = asyncStats.GetLastMessageMs();
312-
SetNonZero(aggrAsyncStats.LastMessageMs[index], lastMessageMs);
394+
SetNonZero(aggrAsyncStats.LastMessageMs, index, lastMessageMs);
313395
baseTimeMs = NonZeroMin(baseTimeMs, lastMessageMs);
314396

315397
aggrAsyncStats.WaitTimeUs.SetNonZero(index, asyncStats.GetWaitTimeUs());
316-
SetNonZero(aggrAsyncStats.WaitPeriods[index], asyncStats.GetWaitPeriods());
398+
SetNonZero(aggrAsyncStats.WaitPeriods, index, asyncStats.GetWaitPeriods());
317399
if (firstMessageMs && lastMessageMs > firstMessageMs) {
400+
Y_ASSERT(index < aggrAsyncStats.ActiveTimeUs.size());
318401
aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs;
319402
}
320403

@@ -348,29 +431,29 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
348431
}
349432

350433
CpuTimeUs.SetNonZero(index, taskStats.GetCpuTimeUs());
351-
SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs());
352-
353-
SetNonZero(InputRows[index], taskStats.GetInputRows());
354-
SetNonZero(InputBytes[index], taskStats.GetInputBytes());
355-
SetNonZero(OutputRows[index], taskStats.GetOutputRows());
356-
SetNonZero(OutputBytes[index], taskStats.GetOutputBytes());
357-
SetNonZero(ResultRows[index], taskStats.GetResultRows());
358-
SetNonZero(ResultBytes[index], taskStats.GetResultBytes());
359-
SetNonZero(IngressRows[index], taskStats.GetIngressRows());
360-
SetNonZero(IngressBytes[index], taskStats.GetIngressBytes());
361-
SetNonZero(IngressDecompressedBytes[index], taskStats.GetIngressDecompressedBytes());
362-
SetNonZero(EgressRows[index], taskStats.GetEgressRows());
363-
SetNonZero(EgressBytes[index], taskStats.GetEgressBytes());
434+
SetNonZero(SourceCpuTimeUs, index, taskStats.GetSourceCpuTimeUs());
435+
436+
SetNonZero(InputRows, index, taskStats.GetInputRows());
437+
SetNonZero(InputBytes, index, taskStats.GetInputBytes());
438+
SetNonZero(OutputRows, index, taskStats.GetOutputRows());
439+
SetNonZero(OutputBytes, index, taskStats.GetOutputBytes());
440+
SetNonZero(ResultRows, index, taskStats.GetResultRows());
441+
SetNonZero(ResultBytes, index, taskStats.GetResultBytes());
442+
SetNonZero(IngressRows, index, taskStats.GetIngressRows());
443+
SetNonZero(IngressBytes, index, taskStats.GetIngressBytes());
444+
SetNonZero(IngressDecompressedBytes, index, taskStats.GetIngressDecompressedBytes());
445+
SetNonZero(EgressRows, index, taskStats.GetEgressRows());
446+
SetNonZero(EgressBytes, index, taskStats.GetEgressBytes());
364447

365448
auto startTimeMs = taskStats.GetStartTimeMs();
366-
SetNonZero(StartTimeMs[index], startTimeMs);
449+
SetNonZero(StartTimeMs, index, startTimeMs);
367450
baseTimeMs = NonZeroMin(baseTimeMs, startTimeMs);
368451

369452
auto finishTimeMs = taskStats.GetFinishTimeMs();
370-
SetNonZero(FinishTimeMs[index], finishTimeMs);
453+
SetNonZero(FinishTimeMs, index, finishTimeMs);
371454
baseTimeMs = NonZeroMin(baseTimeMs, finishTimeMs);
372455

373-
SetNonZero(DurationUs[index], durationUs);
456+
SetNonZero(DurationUs, index, durationUs);
374457
WaitInputTimeUs.SetNonZero(index, taskStats.GetWaitInputTimeUs());
375458
WaitOutputTimeUs.SetNonZero(index, taskStats.GetWaitOutputTimeUs());
376459

@@ -383,13 +466,13 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
383466
auto tablePath = tableStat.GetTablePath();
384467
auto [it, inserted] = Tables.try_emplace(tablePath, TaskCount);
385468
auto& aggrTableStats = it->second;
386-
SetNonZero(aggrTableStats.ReadRows[index], tableStat.GetReadRows());
387-
SetNonZero(aggrTableStats.ReadBytes[index], tableStat.GetReadBytes());
388-
SetNonZero(aggrTableStats.WriteRows[index], tableStat.GetWriteRows());
389-
SetNonZero(aggrTableStats.WriteBytes[index], tableStat.GetWriteBytes());
390-
SetNonZero(aggrTableStats.EraseRows[index], tableStat.GetEraseRows());
391-
SetNonZero(aggrTableStats.EraseBytes[index], tableStat.GetEraseBytes());
392-
SetNonZero(aggrTableStats.AffectedPartitions[index], tableStat.GetAffectedPartitions());
469+
SetNonZero(aggrTableStats.ReadRows, index, tableStat.GetReadRows());
470+
SetNonZero(aggrTableStats.ReadBytes, index, tableStat.GetReadBytes());
471+
SetNonZero(aggrTableStats.WriteRows, index, tableStat.GetWriteRows());
472+
SetNonZero(aggrTableStats.WriteBytes, index, tableStat.GetWriteBytes());
473+
SetNonZero(aggrTableStats.EraseRows, index, tableStat.GetEraseRows());
474+
SetNonZero(aggrTableStats.EraseBytes, index, tableStat.GetEraseBytes());
475+
SetNonZero(aggrTableStats.AffectedPartitions, index, tableStat.GetAffectedPartitions());
393476
}
394477

395478
for (auto& sourceStat : taskStats.GetSources()) {
@@ -403,6 +486,17 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
403486
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()));
404487
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()));
405488
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()));
489+
for (auto& partitionStat : sourceStat.GetExternalPartitions()) {
490+
auto key = partitionStat.GetPartitionId();
491+
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalRows,
492+
index, key, partitionStat.GetExternalRows(), false);
493+
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalBytes,
494+
index, key, partitionStat.GetExternalBytes(), true);
495+
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.FirstMessageMs,
496+
index, key, partitionStat.GetFirstMessageMs(), false);
497+
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.LastMessageMs,
498+
index, key, partitionStat.GetLastMessageMs(), false);
499+
}
406500
}
407501
}
408502

@@ -449,22 +543,22 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
449543
case NYql::NDqProto::TDqOperatorStats::kJoin: {
450544
auto [it, inserted] = Joins.try_emplace(operatorId, TaskCount);
451545
auto& joinStats = it->second;
452-
SetNonZero(joinStats.Rows[index], operatorStat.GetRows());
453-
SetNonZero(joinStats.Bytes[index], operatorStat.GetBytes());
546+
SetNonZero(joinStats.Rows, index, operatorStat.GetRows());
547+
SetNonZero(joinStats.Bytes, index, operatorStat.GetBytes());
454548
break;
455549
}
456550
case NYql::NDqProto::TDqOperatorStats::kFilter: {
457551
auto [it, inserted] = Filters.try_emplace(operatorId, TaskCount);
458552
auto& filterStats = it->second;
459-
SetNonZero(filterStats.Rows[index], operatorStat.GetRows());
460-
SetNonZero(filterStats.Bytes[index], operatorStat.GetBytes());
553+
SetNonZero(filterStats.Rows, index, operatorStat.GetRows());
554+
SetNonZero(filterStats.Bytes, index, operatorStat.GetBytes());
461555
break;
462556
}
463557
case NYql::NDqProto::TDqOperatorStats::kAggregation: {
464558
auto [it, inserted] = Aggregations.try_emplace(operatorId, TaskCount);
465559
auto& aggStats = it->second;
466-
SetNonZero(aggStats.Rows[index], operatorStat.GetRows());
467-
SetNonZero(aggStats.Bytes[index], operatorStat.GetBytes());
560+
SetNonZero(aggStats.Rows, index, operatorStat.GetRows());
561+
SetNonZero(aggStats.Bytes, index, operatorStat.GetBytes());
468562
break;
469563
}
470564
default:
@@ -1074,6 +1168,8 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
10741168
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
10751169
}
10761170

1171+
// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max
1172+
10771173
void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
10781174

10791175
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
@@ -1215,6 +1311,12 @@ void TQueryExecutionStats::ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto
12151311
}
12161312

12171313
void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
1314+
auto& external = *stats.MutableExternal();
1315+
data.External.ExternalRows.ExportAggStats(*external.MutableExternalRows());
1316+
data.External.ExternalBytes.ExportAggStats(BaseTimeMs, *external.MutableExternalBytes());
1317+
ExportOffsetAggStats(data.External.FirstMessageMs.Values, *external.MutableFirstMessageMs(), BaseTimeMs);
1318+
ExportOffsetAggStats(data.External.LastMessageMs.Values, *external.MutableLastMessageMs(), BaseTimeMs);
1319+
external.SetPartitionCount(data.External.Indices.size());
12181320
ExportAggAsyncStats(data.Ingress, *stats.MutableIngress());
12191321
ExportAggAsyncStats(data.Push, *stats.MutablePush());
12201322
ExportAggAsyncStats(data.Pop, *stats.MutablePop());

ydb/core/kqp/executer_actor/kqp_executer_stats.h

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,39 @@ struct TTimeSeriesStats {
2323
std::vector<std::pair<ui64, ui64>> History;
2424

2525
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
26+
void ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats);
2627
void ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
27-
void Resize(ui32 taskCount);
28-
void SetNonZero(ui32 taskIndex, ui64 value);
28+
void Resize(ui32 count);
29+
void SetNonZero(ui32 index, ui64 value);
2930
void Pack();
31+
void AppendHistory();
32+
};
33+
34+
struct TPartitionedStats : public TTimeSeriesStats {
35+
std::vector<std::vector<ui64>> Parts;
36+
37+
void ResizeByTasks(ui32 taskCount);
38+
void ResizeByParts(ui32 partCount, ui32 taskCount);
39+
void SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
40+
};
41+
42+
struct TTimeMultiSeriesStats {
43+
std::unordered_map<TString, ui32> Indices;
44+
ui32 TaskCount = 0;
45+
ui32 PartCount = 0;
46+
47+
void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries);
48+
};
49+
50+
struct TExternalStats : public TTimeMultiSeriesStats {
51+
TPartitionedStats ExternalRows;
52+
TPartitionedStats ExternalBytes;
53+
TPartitionedStats FirstMessageMs;
54+
TPartitionedStats LastMessageMs;
55+
56+
void Resize(ui32 taskCount);
57+
void SetHistorySampleCount(ui32 historySampleCount);
58+
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqExternalAggrStats& stats);
3059
};
3160

3261
struct TMetricInfo {
@@ -80,6 +109,7 @@ struct TAsyncBufferStats {
80109
Resize(taskCount);
81110
}
82111

112+
TExternalStats External;
83113
TAsyncStats Ingress;
84114
TAsyncStats Push;
85115
TAsyncStats Pop;
@@ -179,8 +209,8 @@ struct TStageExecutionStats {
179209
std::map<TString, TTableStats> Tables;
180210
std::map<TString, TAsyncBufferStats> Ingress;
181211
std::map<TString, TAsyncBufferStats> Egress;
182-
std::map<ui32, TAsyncBufferStats> Input;
183-
std::map<ui32, TAsyncBufferStats> Output;
212+
std::unordered_map<ui32, TAsyncBufferStats> Input;
213+
std::unordered_map<ui32, TAsyncBufferStats> Output;
184214

185215
std::map<TString, TOperatorStats> Joins;
186216
std::map<TString, TOperatorStats> Filters;

0 commit comments

Comments
 (0)