Skip to content

Commit 609158e

Browse files
iddqdexblinkov
authored andcommitted
Revert "Column Shard Bytes Time Series (#15423)" (#15603)
1 parent a4a17ac commit 609158e

File tree

3 files changed

+26
-148
lines changed

3 files changed

+26
-148
lines changed

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,7 @@ void TKqpScanComputeActor::AcquireRateQuota() {
8080
}
8181

8282
void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, bool last) {
83-
Y_UNUSED(last);
84-
85-
if (ScanData && dst->TasksSize() > 0) {
83+
if (last && ScanData && dst->TasksSize() > 0) {
8684
YQL_ENSURE(dst->TasksSize() == 1);
8785

8886
auto* taskStats = dst->MutableTasks(0);
@@ -98,25 +96,23 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b
9896
tableStats->SetTablePath(ScanData->TablePath);
9997

10098
if (auto* stats = ScanData->BasicStats.get()) {
101-
if (RuntimeSettings.StatsMode >= NYql::NDqProto::DQ_STATS_MODE_FULL) {
102-
ingressStats.SetRows(stats->Rows);
103-
ingressStats.SetBytes(stats->Bytes);
104-
ingressStats.SetFirstMessageMs(stats->FirstMessageMs);
105-
ingressStats.SetLastMessageMs(stats->LastMessageMs);
106-
107-
for (auto& [shardId, stat] : stats->ExternalStats) {
108-
auto& externalStat = *sourceStats.AddExternalPartitions();
109-
externalStat.SetPartitionId(ToString(shardId));
110-
externalStat.SetExternalRows(stat.ExternalRows);
111-
externalStat.SetExternalBytes(stat.ExternalBytes);
112-
externalStat.SetFirstMessageMs(stat.FirstMessageMs);
113-
externalStat.SetLastMessageMs(stat.LastMessageMs);
114-
}
115-
116-
taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows);
117-
taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes);
99+
ingressStats.SetRows(stats->Rows);
100+
ingressStats.SetBytes(stats->Bytes);
101+
ingressStats.SetFirstMessageMs(stats->FirstMessageMs);
102+
ingressStats.SetLastMessageMs(stats->LastMessageMs);
103+
104+
for (auto& [shardId, stat] : stats->ExternalStats) {
105+
auto& externalStat = *sourceStats.AddExternalPartitions();
106+
externalStat.SetPartitionId(ToString(shardId));
107+
externalStat.SetExternalRows(stat.ExternalRows);
108+
externalStat.SetExternalBytes(stat.ExternalBytes);
109+
externalStat.SetFirstMessageMs(stat.FirstMessageMs);
110+
externalStat.SetLastMessageMs(stat.LastMessageMs);
118111
}
119112

113+
taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows);
114+
taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes);
115+
120116
tableStats->SetReadRows(stats->Rows);
121117
tableStats->SetReadBytes(stats->Bytes);
122118
tableStats->SetAffectedPartitions(stats->AffectedShards);

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 6 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,8 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
1212
return (b == 0) ? a : ((a == 0 || a > b) ? b : a);
1313
}
1414

15-
void TTimeSeriesStats::ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats) {
16-
NKikimr::NKqp::ExportAggStats(Values, stats);
17-
}
18-
1915
void TTimeSeriesStats::ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
20-
ExportAggStats(stats);
16+
NKikimr::NKqp::ExportAggStats(Values, stats);
2117
ExportHistory(baseTimeMs, stats);
2218
}
2319

@@ -32,20 +28,16 @@ void TTimeSeriesStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAg
3228
}
3329
}
3430

35-
void TTimeSeriesStats::Resize(ui32 count) {
36-
Values.resize(count);
31+
void TTimeSeriesStats::Resize(ui32 taskCount) {
32+
Values.resize(taskCount);
3733
}
3834

39-
void TTimeSeriesStats::SetNonZero(ui32 index, ui64 value) {
35+
void TTimeSeriesStats::SetNonZero(ui32 taskIndex, ui64 value) {
4036
if (value) {
4137
Sum += value;
42-
Sum -= Values[index];
43-
Values[index] = value;
38+
Sum -= Values[taskIndex];
39+
Values[taskIndex] = value;
4440
}
45-
AppendHistory();
46-
}
47-
48-
void TTimeSeriesStats::AppendHistory() {
4941
if (HistorySampleCount) {
5042
auto nowMs = Now().MilliSeconds();
5143

@@ -105,62 +97,6 @@ void TTimeSeriesStats::Pack() {
10597
}
10698
}
10799

108-
void TPartitionedStats::ResizeByTasks(ui32 taskCount) {
109-
for (auto& p : Parts) {
110-
p.resize(taskCount);
111-
}
112-
}
113-
114-
void TPartitionedStats::ResizeByParts(ui32 partCount, ui32 taskCount) {
115-
auto oldPartCount = Parts.size();
116-
Parts.resize(partCount);
117-
for(auto i = oldPartCount; i < partCount; i++) {
118-
Parts[i].resize(taskCount);
119-
}
120-
Resize(partCount);
121-
}
122-
123-
void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
124-
auto& part = Parts[partIndex];
125-
auto delta = value - part[taskIndex];
126-
part[taskIndex] = value;
127-
Values[partIndex] += delta;
128-
Sum += delta;
129-
if (recordTimeSeries) {
130-
AppendHistory();
131-
}
132-
}
133-
134-
void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries) {
135-
auto [it, inserted] = Indices.try_emplace(key);
136-
if (inserted) {
137-
it->second = Indices.size() - 1;
138-
if (PartCount < Indices.size()) {
139-
PartCount += 4;
140-
}
141-
}
142-
if (stats.Parts.size() < PartCount) {
143-
stats.ResizeByParts(PartCount, TaskCount);
144-
}
145-
stats.SetNonZero(taskIndex, it->second, value, recordTimeSeries);
146-
}
147-
148-
void TExternalStats::Resize(ui32 taskCount) {
149-
ExternalRows.ResizeByTasks(taskCount);
150-
ExternalBytes.ResizeByTasks(taskCount);
151-
TaskCount = taskCount;
152-
}
153-
154-
void TExternalStats::SetHistorySampleCount(ui32 historySampleCount) {
155-
ExternalBytes.HistorySampleCount = historySampleCount;
156-
}
157-
158-
void TExternalStats::ExportHistory(ui64 baseTimeMs, NDqProto::TDqExternalAggrStats& stats) {
159-
if (stats.HasExternalBytes()) {
160-
ExternalBytes.ExportHistory(baseTimeMs, *stats.MutableExternalBytes());
161-
}
162-
}
163-
164100
void TAsyncStats::Resize(ui32 taskCount) {
165101
Bytes.Resize(taskCount);
166102
DecompressedBytes.resize(taskCount);
@@ -191,25 +127,20 @@ void TAsyncStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAg
191127
}
192128

193129
void TAsyncBufferStats::Resize(ui32 taskCount) {
194-
External.Resize(taskCount);
195130
Ingress.Resize(taskCount);
196131
Push.Resize(taskCount);
197132
Pop.Resize(taskCount);
198133
Egress.Resize(taskCount);
199134
}
200135

201136
void TAsyncBufferStats::SetHistorySampleCount(ui32 historySampleCount) {
202-
External.SetHistorySampleCount(historySampleCount);
203137
Ingress.SetHistorySampleCount(historySampleCount);
204138
Push.SetHistorySampleCount(historySampleCount);
205139
Pop.SetHistorySampleCount(historySampleCount);
206140
Egress.SetHistorySampleCount(historySampleCount);
207141
}
208142

209143
void TAsyncBufferStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
210-
if (stats.HasExternal()) {
211-
External.ExportHistory(baseTimeMs, *stats.MutableExternal());
212-
}
213144
if (stats.HasIngress()) {
214145
Ingress.ExportHistory(baseTimeMs, *stats.MutableIngress());
215146
}
@@ -472,17 +403,6 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
472403
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()));
473404
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()));
474405
baseTimeMs = NonZeroMin(baseTimeMs, UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()));
475-
for (auto& partitionStat : sourceStat.GetExternalPartitions()) {
476-
auto key = partitionStat.GetPartitionId();
477-
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalRows,
478-
index, key, partitionStat.GetExternalRows(), false);
479-
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalBytes,
480-
index, key, partitionStat.GetExternalBytes(), true);
481-
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.FirstMessageMs,
482-
index, key, partitionStat.GetFirstMessageMs(), false);
483-
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.LastMessageMs,
484-
index, key, partitionStat.GetLastMessageMs(), false);
485-
}
486406
}
487407
}
488408

@@ -1154,8 +1074,6 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
11541074
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
11551075
}
11561076

1157-
// SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max
1158-
11591077
void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
11601078

11611079
Y_DEBUG_ABORT_UNLESS((data.size() & 3) == 0);
@@ -1297,12 +1215,6 @@ void TQueryExecutionStats::ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto
12971215
}
12981216

12991217
void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
1300-
auto& external = *stats.MutableExternal();
1301-
data.External.ExternalRows.ExportAggStats(*external.MutableExternalRows());
1302-
data.External.ExternalBytes.ExportAggStats(BaseTimeMs, *external.MutableExternalBytes());
1303-
ExportOffsetAggStats(data.External.FirstMessageMs.Values, *external.MutableFirstMessageMs(), BaseTimeMs);
1304-
ExportOffsetAggStats(data.External.LastMessageMs.Values, *external.MutableLastMessageMs(), BaseTimeMs);
1305-
external.SetPartitionCount(data.External.Indices.size());
13061218
ExportAggAsyncStats(data.Ingress, *stats.MutableIngress());
13071219
ExportAggAsyncStats(data.Push, *stats.MutablePush());
13081220
ExportAggAsyncStats(data.Pop, *stats.MutablePop());

ydb/core/kqp/executer_actor/kqp_executer_stats.h

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,10 @@ struct TTimeSeriesStats {
2323
std::vector<std::pair<ui64, ui64>> History;
2424

2525
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
26-
void ExportAggStats(NYql::NDqProto::TDqStatsAggr& stats);
2726
void ExportAggStats(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats);
28-
void Resize(ui32 count);
29-
void SetNonZero(ui32 index, ui64 value);
30-
void Pack();
31-
void AppendHistory();
32-
};
33-
34-
struct TPartitionedStats : public TTimeSeriesStats {
35-
std::vector<std::vector<ui64>> Parts;
36-
37-
void ResizeByTasks(ui32 taskCount);
38-
void ResizeByParts(ui32 partCount, ui32 taskCount);
39-
void SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
40-
};
41-
42-
struct TTimeMultiSeriesStats {
43-
std::unordered_map<TString, ui32> Indices;
44-
ui32 TaskCount = 0;
45-
ui32 PartCount = 0;
46-
47-
void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries);
48-
};
49-
50-
struct TExternalStats : public TTimeMultiSeriesStats {
51-
TPartitionedStats ExternalRows;
52-
TPartitionedStats ExternalBytes;
53-
TPartitionedStats FirstMessageMs;
54-
TPartitionedStats LastMessageMs;
55-
5627
void Resize(ui32 taskCount);
57-
void SetHistorySampleCount(ui32 historySampleCount);
58-
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqExternalAggrStats& stats);
28+
void SetNonZero(ui32 taskIndex, ui64 value);
29+
void Pack();
5930
};
6031

6132
struct TMetricInfo {
@@ -109,7 +80,6 @@ struct TAsyncBufferStats {
10980
Resize(taskCount);
11081
}
11182

112-
TExternalStats External;
11383
TAsyncStats Ingress;
11484
TAsyncStats Push;
11585
TAsyncStats Pop;
@@ -209,8 +179,8 @@ struct TStageExecutionStats {
209179
std::map<TString, TTableStats> Tables;
210180
std::map<TString, TAsyncBufferStats> Ingress;
211181
std::map<TString, TAsyncBufferStats> Egress;
212-
std::unordered_map<ui32, TAsyncBufferStats> Input;
213-
std::unordered_map<ui32, TAsyncBufferStats> Output;
182+
std::map<ui32, TAsyncBufferStats> Input;
183+
std::map<ui32, TAsyncBufferStats> Output;
214184

215185
std::map<TString, TOperatorStats> Joins;
216186
std::map<TString, TOperatorStats> Filters;

0 commit comments

Comments
 (0)