Skip to content

Commit a69244b

Browse files
authored
Column Shard Bytes Time Series (#15423)
1 parent 6592c23 commit a69244b

File tree

3 files changed

+148
-26
lines changed

3 files changed

+148
-26
lines changed

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ void TKqpScanComputeActor::AcquireRateQuota() {
8080
}
8181

8282
void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, bool last) {
83-
if (last && ScanData && dst->TasksSize() > 0) {
83+
Y_UNUSED(last);
84+
85+
if (ScanData && dst->TasksSize() > 0) {
8486
YQL_ENSURE(dst->TasksSize() == 1);
8587

8688
auto* taskStats = dst->MutableTasks(0);
@@ -96,23 +98,25 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b
9698
tableStats->SetTablePath(ScanData->TablePath);
9799

98100
if (auto* stats = ScanData->BasicStats.get()) {
99-
ingressStats.SetRows(stats->Rows);
100-
ingressStats.SetBytes(stats->Bytes);
101-
ingressStats.SetFirstMessageMs(stats->FirstMessageMs);
102-
ingressStats.SetLastMessageMs(stats->LastMessageMs);
103-
104-
for (auto& [shardId, stat] : stats->ExternalStats) {
105-
auto& externalStat = *sourceStats.AddExternalPartitions();
106-
externalStat.SetPartitionId(ToString(shardId));
107-
externalStat.SetExternalRows(stat.ExternalRows);
108-
externalStat.SetExternalBytes(stat.ExternalBytes);
109-
externalStat.SetFirstMessageMs(stat.FirstMessageMs);
110-
externalStat.SetLastMessageMs(stat.LastMessageMs);
101+
if (RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_FULL) {
102+
ingressStats.SetRows(stats->Rows);
103+
ingressStats.SetBytes(stats->Bytes);
104+
ingressStats.SetFirstMessageMs(stats->FirstMessageMs);
105+
ingressStats.SetLastMessageMs(stats->LastMessageMs);
106+
107+
for (auto& [shardId, stat] : stats->ExternalStats) {
108+
auto& externalStat = *sourceStats.AddExternalPartitions();
109+
externalStat.SetPartitionId(ToString(shardId));
110+
externalStat.SetExternalRows(stat.ExternalRows);
111+
externalStat.SetExternalBytes(stat.ExternalBytes);
112+
externalStat.SetFirstMessageMs(stat.FirstMessageMs);
113+
externalStat.SetLastMessageMs(stat.LastMessageMs);
114+
}
115+
116+
taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows);
117+
taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes);
111118
}
112119

113-
taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows);
114-
taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes);
115-
116120
tableStats->SetReadRows(stats->Rows);
117121
tableStats->SetReadBytes(stats->Bytes);
118122
tableStats->SetAffectedPartitions(stats->AffectedShards);

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
1212
return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
1313
}
1414

15-
void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
15+
void TTimeSeriesStats::ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats) {
1616
NKikimr::NKqp::ExportAggStats(Values, stats);
17+
}
18+
19+
void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
20+
ExportAggStats(stats);
1721
ExportHistory(baseTimeMs, stats);
1822
}
1923

@@ -28,16 +32,20 @@ void TTimeSeriesStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAg
2832
}
2933
}
3034

31-
void TTimeSeriesStats::Resize(ui32 taskCount) {
32-
Values.resize(taskCount);
35+
void TTimeSeriesStats::Resize(ui32 count) {
36+
Values.resize(count);
3337
}
3438

35-
void TTimeSeriesStats::SetNonZero(ui32 taskIndex, ui64 value) {
39+
void TTimeSeriesStats::SetNonZero(ui32 index, ui64 value) {
3640
if (value) {
3741
Sum += value;
38-
Sum -= Values[taskIndex];
39-
Values[taskIndex] = value;
42+
Sum -= Values[index];
43+
Values[index] = value;
4044
}
45+
AppendHistory();
46+
}
47+
48+
void TTimeSeriesStats::AppendHistory() {
4149
if (HistorySampleCount) {
4250
auto nowMs = Now().MilliSeconds();
4351

@@ -97,6 +105,62 @@ void TTimeSeriesStats::Pack() {
97105
}
98106
}
99107

108+
void TPartitionedStats::ResizeByTasks(ui32 taskCount) {
109+
for (auto& p : Parts) {
110+
p.resize(taskCount);
111+
}
112+
}
113+
114+
void TPartitionedStats::ResizeByParts(ui32 partCount, ui32 taskCount) {
115+
auto oldPartCount = Parts.size();
116+
Parts.resize(partCount);
117+
for(auto i = oldPartCount; i < partCount; i++) {
118+
Parts[i].resize(taskCount);
119+
}
120+
Resize(partCount);
121+
}
122+
123+
void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
124+
auto& part = Parts[partIndex];
125+
auto delta = value - part[taskIndex];
126+
part[taskIndex] = value;
127+
Values[partIndex] += delta;
128+
Sum += delta;
129+
if (recordTimeSeries) {
130+
AppendHistory();
131+
}
132+
}
133+
134+
void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries) {
135+
auto [it, inserted] = Indices.try_emplace(key);
136+
if (inserted) {
137+
it->second = Indices.size() - 1;
138+
if (PartCount < Indices.size()) {
139+
PartCount += 4;
140+
}
141+
}
142+
if (stats.Parts.size() < PartCount) {
143+
stats.ResizeByParts(PartCount, TaskCount);
144+
}
145+
stats.SetNonZero(taskIndex, it->second, value, recordTimeSeries);
146+
}
147+
148+
void TExternalStats::Resize(ui32 taskCount) {
149+
ExternalRows.ResizeByTasks(taskCount);
150+
ExternalBytes.ResizeByTasks(taskCount);
151+
TaskCount = taskCount;
152+
}
153+
154+
void TExternalStats::SetHistorySampleCount(ui32 historySampleCount) {
155+
ExternalBytes.HistorySampleCount = historySampleCount;
156+
}
157+
158+
void TExternalStats::ExportHistory(ui64 baseTimeMs, NDqProto::TDqExternalAggrStats& stats) {
159+
if (stats.HasExternalBytes()) {
160+
ExternalBytes.ExportHistory(baseTimeMs, *stats.MutableExternalBytes());
161+
}
162+
}
163+
100164
void TAsyncStats::Resize(ui32 taskCount) {
101165
Bytes.Resize(taskCount);
102166
DecompressedBytes.resize(taskCount);
@@ -127,20 +191,25 @@ void TAsyncStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAg
127191
}
128192

129193
void TAsyncBufferStats::Resize(ui32 taskCount) {
194+
External.Resize(taskCount);
130195
Ingress.Resize(taskCount);
131196
Push.Resize(taskCount);
132197
Pop.Resize(taskCount);
133198
Egress.Resize(taskCount);
134199
}
135200

136201
void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) {
202+
External.SetHistorySampleCount(historySampleCount);
137203
Ingress.SetHistorySampleCount(historySampleCount);
138204
Push.SetHistorySampleCount(historySampleCount);
139205
Pop.SetHistorySampleCount(historySampleCount);
140206
Egress.SetHistorySampleCount(historySampleCount);
141207
}
142208

143209
void TAsyncBufferStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
210+
if (stats.HasExternal()) {
211+
External.ExportHistory(baseTimeMs, *stats.MutableExternal());
212+
}
144213
if (stats.HasIngress()) {
145214
Ingress.ExportHistory(baseTimeMs, *stats.MutableIngress());
146215
}
@@ -403,6 +472,17 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
403472
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()));
404473
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()));
405474
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()));
475+
for (auto& partitionStat : sourceStat.GetExternalPartitions()) {
476+
auto key = partitionStat.GetPartitionId();
477+
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalRows,
478+
index, key, partitionStat.GetExternalRows(), false);
479+
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalBytes,
480+
index, key, partitionStat.GetExternalBytes(), true);
481+
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.FirstMessageMs,
482+
index, key, partitionStat.GetFirstMessageMs(), false);
483+
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.LastMessageMs,
484+
index, key, partitionStat.GetLastMessageMs(), false);
485+
}
406486
}
407487
}
408488

@@ -1074,6 +1154,8 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
10741154
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
10751155
}
10761156

1157+
// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max
1158+
10771159
void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
10781160

10791161
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
@@ -1215,6 +1297,12 @@ void TQueryExecutionStats::ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto
12151297
}
12161298

12171299
void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
1300+
auto& external = *stats.MutableExternal();
1301+
data.External.ExternalRows.ExportAggStats(*external.MutableExternalRows());
1302+
data.External.ExternalBytes.ExportAggStats(BaseTimeMs, *external.MutableExternalBytes());
1303+
ExportOffsetAggStats(data.External.FirstMessageMs.Values, *external.MutableFirstMessageMs(), BaseTimeMs);
1304+
ExportOffsetAggStats(data.External.LastMessageMs.Values, *external.MutableLastMessageMs(), BaseTimeMs);
1305+
external.SetPartitionCount(data.External.Indices.size());
12181306
ExportAggAsyncStats(data.Ingress, *stats.MutableIngress());
12191307
ExportAggAsyncStats(data.Push, *stats.MutablePush());
12201308
ExportAggAsyncStats(data.Pop, *stats.MutablePop());

ydb/core/kqp/executer_actor/kqp_executer_stats.h

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,39 @@ struct TTimeSeriesStats {
2323
std::vector<std::pair<ui64, ui64>> History;
2424

2525
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
26+
void ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats);
2627
void ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
27-
void Resize(ui32 taskCount);
28-
void SetNonZero(ui32 taskIndex, ui64 value);
28+
void Resize(ui32 count);
29+
void SetNonZero(ui32 index, ui64 value);
2930
void Pack();
31+
void AppendHistory();
32+
};
33+
34+
struct TPartitionedStats : public TTimeSeriesStats {
35+
std::vector<std::vector<ui64>> Parts;
36+
37+
void ResizeByTasks(ui32 taskCount);
38+
void ResizeByParts(ui32 partCount, ui32 taskCount);
39+
void SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
40+
};
41+
42+
struct TTimeMultiSeriesStats {
43+
std::unordered_map<TString, ui32> Indices;
44+
ui32 TaskCount = 0;
45+
ui32 PartCount = 0;
46+
47+
void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries);
48+
};
49+
50+
struct TExternalStats : public TTimeMultiSeriesStats {
51+
TPartitionedStats ExternalRows;
52+
TPartitionedStats ExternalBytes;
53+
TPartitionedStats FirstMessageMs;
54+
TPartitionedStats LastMessageMs;
55+
56+
void Resize(ui32 taskCount);
57+
void SetHistorySampleCount(ui32 historySampleCount);
58+
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqExternalAggrStats& stats);
3059
};
3160

3261
struct TMetricInfo {
@@ -80,6 +109,7 @@ struct TAsyncBufferStats {
80109
Resize(taskCount);
81110
}
82111

112+
TExternalStats External;
83113
TAsyncStats Ingress;
84114
TAsyncStats Push;
85115
TAsyncStats Pop;
@@ -179,8 +209,8 @@ struct TStageExecutionStats {
179209
std::map<TString, TTableStats> Tables;
180210
std::map<TString, TAsyncBufferStats> Ingress;
181211
std::map<TString, TAsyncBufferStats> Egress;
182-
std::map<ui32, TAsyncBufferStats> Input;
183-
std::map<ui32, TAsyncBufferStats> Output;
212+
std::unordered_map<ui32, TAsyncBufferStats> Input;
213+
std::unordered_map<ui32, TAsyncBufferStats> Output;
184214

185215
std::map<TString, TOperatorStats> Joins;
186216
std::map<TString, TOperatorStats> Filters;

0 commit comments

Comments
 (0)