Skip to content

Commit f885a05

Browse files
authored
Support progress stats with basic stats mode (#16853)
1 parent 9238ba4 commit f885a05

File tree

12 files changed

+124
-89
lines changed

12 files changed

+124
-89
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
386386
if (NeedReportStats(*Request_->GetProtoRequest())) {
387387
if (record.HasQueryStats()) {
388388
FillQueryStats(*response.mutable_exec_stats(), record.GetQueryStats());
389-
response.mutable_exec_stats()->set_query_plan(NKqp::SerializeAnalyzePlan(record.GetQueryStats()));
389+
response.mutable_exec_stats()->set_query_plan(record.GetQueryPlan());
390390
}
391391
}
392392

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
167167
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
168168
runtimeSettings.UseSpilling = args.WithSpilling;
169169
runtimeSettings.StatsMode = args.StatsMode;
170+
runtimeSettings.WithProgressStats = args.WithProgressStats;
170171

171172
if (runtimeSettings.UseSpilling) {
172173
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ struct IKqpNodeComputeActorFactory {
122122
const NKikimr::NKqp::NRm::EKqpMemoryPool MemoryPool;
123123
const bool WithSpilling;
124124
const NYql::NDqProto::EDqStatsMode StatsMode;
125+
const bool WithProgressStats;
125126
const TInstant& Deadline;
126127
const bool ShareMailbox;
127128
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1656,6 +1656,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
16561656
.UserToken = UserToken,
16571657
.Deadline = Deadline.GetOrElse(TInstant::Zero()),
16581658
.StatsMode = Request.StatsMode,
1659+
.WithProgressStats = Request.ProgressStatsPeriod != TDuration::Zero(),
16591660
.RlPath = Request.RlPath,
16601661
.ExecuterSpan = ExecuterSpan,
16611662
.ResourcesSnapshot = std::move(ResourcesSnapshot),

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 100 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,90 +1218,114 @@ void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NY
12181218
void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats) {
12191219

12201220
THashMap<ui32, NDqProto::TDqStageStats*> protoStages;
1221-
for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) {
1222-
protoStages.emplace(stageId.StageId, GetOrCreateStageStats(stageId, *TasksGraph, stats));
1221+
1222+
if (CollectFullStats(StatsMode)) {
1223+
for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) {
1224+
protoStages.emplace(stageId.StageId, GetOrCreateStageStats(stageId, *TasksGraph, stats));
1225+
}
12231226
}
12241227

1228+
std::unordered_map<TString, NYql::NDqProto::TDqTableStats*> currentTableStats;
12251229
for (auto& [stageId, stageStat] : StageStats) {
1226-
auto& stageStats = *protoStages[stageStat.StageId.StageId];
1227-
stageStats.SetTotalTasksCount(stageStat.Task2Index.size());
1228-
1229-
stageStats.SetBaseTimeMs(BaseTimeMs);
1230-
stageStat.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs());
1231-
ExportAggStats(stageStat.SourceCpuTimeUs, *stageStats.MutableSourceCpuTimeUs());
1232-
stageStat.MaxMemoryUsage.ExportAggStats(BaseTimeMs, *stageStats.MutableMaxMemoryUsage());
1233-
1234-
ExportAggStats(stageStat.InputRows, *stageStats.MutableInputRows());
1235-
ExportAggStats(stageStat.InputBytes, *stageStats.MutableInputBytes());
1236-
ExportAggStats(stageStat.OutputRows, *stageStats.MutableOutputRows());
1237-
ExportAggStats(stageStat.OutputBytes, *stageStats.MutableOutputBytes());
1238-
ExportAggStats(stageStat.ResultRows, *stageStats.MutableResultRows());
1239-
ExportAggStats(stageStat.ResultBytes, *stageStats.MutableResultBytes());
1240-
ExportAggStats(stageStat.IngressRows, *stageStats.MutableIngressRows());
1241-
ExportAggStats(stageStat.IngressBytes, *stageStats.MutableIngressBytes());
1242-
ExportAggStats(stageStat.IngressDecompressedBytes, *stageStats.MutableIngressDecompressedBytes());
1243-
ExportAggStats(stageStat.EgressRows, *stageStats.MutableEgressRows());
1244-
ExportAggStats(stageStat.EgressBytes, *stageStats.MutableEgressBytes());
1245-
1246-
ExportOffsetAggStats(stageStat.StartTimeMs, *stageStats.MutableStartTimeMs(), BaseTimeMs);
1247-
ExportOffsetAggStats(stageStat.FinishTimeMs, *stageStats.MutableFinishTimeMs(), BaseTimeMs);
1248-
ExportAggStats(stageStat.DurationUs, *stageStats.MutableDurationUs());
1249-
stageStat.WaitInputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitInputTimeUs());
1250-
stageStat.WaitOutputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitOutputTimeUs());
1251-
1252-
stageStat.SpillingComputeBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeBytes());
1253-
stageStat.SpillingChannelBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelBytes());
1254-
stageStat.SpillingComputeTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeTimeUs());
1255-
stageStat.SpillingChannelTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelTimeUs());
1256-
1257-
FillStageDurationUs(stageStats);
1258-
12591230
for (auto& [path, t] : stageStat.Tables) {
1260-
auto& table = *stageStats.AddTables();
1261-
table.SetTablePath(path);
1262-
ExportAggStats(t.ReadRows, *table.MutableReadRows());
1263-
ExportAggStats(t.ReadBytes, *table.MutableReadBytes());
1264-
ExportAggStats(t.WriteRows, *table.MutableWriteRows());
1265-
ExportAggStats(t.WriteBytes, *table.MutableWriteBytes());
1266-
ExportAggStats(t.EraseRows, *table.MutableEraseRows());
1267-
ExportAggStats(t.EraseBytes, *table.MutableEraseBytes());
1268-
table.SetAffectedPartitions(ExportAggStats(t.AffectedPartitions));
1269-
}
1270-
for (auto& [id, i] : stageStat.Ingress) {
1271-
ExportAggAsyncBufferStats(i, (*stageStats.MutableIngress())[id]);
1272-
}
1273-
for (auto& [id, i] : stageStat.Input) {
1274-
ExportAggAsyncBufferStats(i, (*stageStats.MutableInput())[id]);
1275-
}
1276-
for (auto& [id, o] : stageStat.Output) {
1277-
ExportAggAsyncBufferStats(o, (*stageStats.MutableOutput())[id]);
1278-
}
1279-
for (auto& [id, e] : stageStat.Egress) {
1280-
ExportAggAsyncBufferStats(e, (*stageStats.MutableEgress())[id]);
1281-
}
1282-
for (auto& [id, j] : stageStat.Joins) {
1283-
auto& joinStat = (*stageStats.MutableOperatorJoin())[id];
1284-
joinStat.SetOperatorId(id);
1285-
ExportAggStats(j.Bytes, *joinStat.MutableBytes());
1286-
ExportAggStats(j.Rows, *joinStat.MutableRows());
1287-
}
1288-
for (auto& [id, f] : stageStat.Filters) {
1289-
auto& filterStat = (*stageStats.MutableOperatorFilter())[id];
1290-
filterStat.SetOperatorId(id);
1291-
ExportAggStats(f.Bytes, *filterStat.MutableBytes());
1292-
ExportAggStats(f.Rows, *filterStat.MutableRows());
1231+
NYql::NDqProto::TDqTableStats* tableAggr = nullptr;
1232+
if (auto it = currentTableStats.find(path); it != currentTableStats.end()) {
1233+
tableAggr = it->second;
1234+
} else {
1235+
tableAggr = stats.AddTables();
1236+
tableAggr->SetTablePath(path);
1237+
currentTableStats.emplace(path, tableAggr);
1238+
}
1239+
1240+
tableAggr->SetReadRows(tableAggr->GetReadRows() + ExportAggStats(t.ReadRows));
1241+
tableAggr->SetReadBytes(tableAggr->GetReadBytes() + ExportAggStats(t.ReadBytes));
1242+
tableAggr->SetWriteRows(tableAggr->GetWriteRows() + ExportAggStats(t.WriteRows));
1243+
tableAggr->SetWriteBytes(tableAggr->GetWriteBytes() + ExportAggStats(t.WriteBytes));
1244+
tableAggr->SetEraseRows(tableAggr->GetEraseRows() + ExportAggStats(t.EraseRows));
1245+
tableAggr->SetAffectedPartitions(tableAggr->GetAffectedPartitions() + ExportAggStats(t.AffectedPartitions));
1246+
12931247
}
1294-
for (auto& [id, a] : stageStat.Aggregations) {
1295-
auto& aggrStat = (*stageStats.MutableOperatorAggregation())[id];
1296-
aggrStat.SetOperatorId(id);
1297-
ExportAggStats(a.Bytes, *aggrStat.MutableBytes());
1298-
ExportAggStats(a.Rows, *aggrStat.MutableRows());
1248+
1249+
1250+
if (CollectFullStats(StatsMode)) {
1251+
auto& stageStats = *protoStages[stageStat.StageId.StageId];
1252+
stageStats.SetTotalTasksCount(stageStat.Task2Index.size());
1253+
1254+
stageStats.SetBaseTimeMs(BaseTimeMs);
1255+
stageStat.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs());
1256+
ExportAggStats(stageStat.SourceCpuTimeUs, *stageStats.MutableSourceCpuTimeUs());
1257+
stageStat.MaxMemoryUsage.ExportAggStats(BaseTimeMs, *stageStats.MutableMaxMemoryUsage());
1258+
1259+
ExportAggStats(stageStat.InputRows, *stageStats.MutableInputRows());
1260+
ExportAggStats(stageStat.InputBytes, *stageStats.MutableInputBytes());
1261+
ExportAggStats(stageStat.OutputRows, *stageStats.MutableOutputRows());
1262+
ExportAggStats(stageStat.OutputBytes, *stageStats.MutableOutputBytes());
1263+
ExportAggStats(stageStat.ResultRows, *stageStats.MutableResultRows());
1264+
ExportAggStats(stageStat.ResultBytes, *stageStats.MutableResultBytes());
1265+
ExportAggStats(stageStat.IngressRows, *stageStats.MutableIngressRows());
1266+
ExportAggStats(stageStat.IngressBytes, *stageStats.MutableIngressBytes());
1267+
ExportAggStats(stageStat.IngressDecompressedBytes, *stageStats.MutableIngressDecompressedBytes());
1268+
ExportAggStats(stageStat.EgressRows, *stageStats.MutableEgressRows());
1269+
ExportAggStats(stageStat.EgressBytes, *stageStats.MutableEgressBytes());
1270+
1271+
ExportOffsetAggStats(stageStat.StartTimeMs, *stageStats.MutableStartTimeMs(), BaseTimeMs);
1272+
ExportOffsetAggStats(stageStat.FinishTimeMs, *stageStats.MutableFinishTimeMs(), BaseTimeMs);
1273+
ExportAggStats(stageStat.DurationUs, *stageStats.MutableDurationUs());
1274+
stageStat.WaitInputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitInputTimeUs());
1275+
stageStat.WaitOutputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitOutputTimeUs());
1276+
1277+
stageStat.SpillingComputeBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeBytes());
1278+
stageStat.SpillingChannelBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelBytes());
1279+
stageStat.SpillingComputeTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeTimeUs());
1280+
stageStat.SpillingChannelTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelTimeUs());
1281+
1282+
FillStageDurationUs(stageStats);
1283+
1284+
for (auto& [path, t] : stageStat.Tables) {
1285+
auto& table = *stageStats.AddTables();
1286+
table.SetTablePath(path);
1287+
ExportAggStats(t.ReadRows, *table.MutableReadRows());
1288+
ExportAggStats(t.ReadBytes, *table.MutableReadBytes());
1289+
ExportAggStats(t.WriteRows, *table.MutableWriteRows());
1290+
ExportAggStats(t.WriteBytes, *table.MutableWriteBytes());
1291+
ExportAggStats(t.EraseRows, *table.MutableEraseRows());
1292+
ExportAggStats(t.EraseBytes, *table.MutableEraseBytes());
1293+
table.SetAffectedPartitions(ExportAggStats(t.AffectedPartitions));
1294+
}
1295+
for (auto& [id, i] : stageStat.Ingress) {
1296+
ExportAggAsyncBufferStats(i, (*stageStats.MutableIngress())[id]);
1297+
}
1298+
for (auto& [id, i] : stageStat.Input) {
1299+
ExportAggAsyncBufferStats(i, (*stageStats.MutableInput())[id]);
1300+
}
1301+
for (auto& [id, o] : stageStat.Output) {
1302+
ExportAggAsyncBufferStats(o, (*stageStats.MutableOutput())[id]);
1303+
}
1304+
for (auto& [id, e] : stageStat.Egress) {
1305+
ExportAggAsyncBufferStats(e, (*stageStats.MutableEgress())[id]);
1306+
}
1307+
for (auto& [id, j] : stageStat.Joins) {
1308+
auto& joinStat = (*stageStats.MutableOperatorJoin())[id];
1309+
joinStat.SetOperatorId(id);
1310+
ExportAggStats(j.Bytes, *joinStat.MutableBytes());
1311+
ExportAggStats(j.Rows, *joinStat.MutableRows());
1312+
}
1313+
for (auto& [id, f] : stageStat.Filters) {
1314+
auto& filterStat = (*stageStats.MutableOperatorFilter())[id];
1315+
filterStat.SetOperatorId(id);
1316+
ExportAggStats(f.Bytes, *filterStat.MutableBytes());
1317+
ExportAggStats(f.Rows, *filterStat.MutableRows());
1318+
}
1319+
for (auto& [id, a] : stageStat.Aggregations) {
1320+
auto& aggrStat = (*stageStats.MutableOperatorAggregation())[id];
1321+
aggrStat.SetOperatorId(id);
1322+
ExportAggStats(a.Bytes, *aggrStat.MutableBytes());
1323+
ExportAggStats(a.Rows, *aggrStat.MutableRows());
1324+
}
12991325
}
13001326
}
13011327

1302-
for (const auto& [_, tableStats] : TableStats) {
1303-
stats.AddTables()->CopyFrom(*tableStats);
1304-
}
1328+
stats.SetDurationUs(TInstant::Now().MicroSeconds() - StartTs.MicroSeconds());
13051329
}
13061330

13071331
void TQueryExecutionStats::AdjustExternalAggr(NYql::NDqProto::TDqExternalAggrStats& stats) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
8888
, UserToken(args.UserToken)
8989
, Deadline(args.Deadline)
9090
, StatsMode(args.StatsMode)
91+
, WithProgressStats(args.WithProgressStats)
9192
, RlPath(args.RlPath)
9293
, ResourcesSnapshot(std::move(args.ResourcesSnapshot))
9394
, ExecuterSpan(args.ExecuterSpan)
@@ -222,6 +223,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
222223
}
223224

224225
request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode));
226+
request.MutableRuntimeSettings()->SetWithProgressStats(WithProgressStats);
225227
request.SetStartAllOrFail(true);
226228
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
227229
request.MutableRuntimeSettings()->SetUseSpilling(TasksGraph.GetMeta().AllowWithSpilling);
@@ -497,6 +499,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
497499
.MemoryPool = NRm::EKqpMemoryPool::DataQuery,
498500
.WithSpilling = TasksGraph.GetMeta().AllowWithSpilling,
499501
.StatsMode = GetDqStatsMode(StatsMode),
502+
.WithProgressStats = WithProgressStats,
500503
.Deadline = Deadline,
501504
.ShareMailbox = (computeTasksSize <= 1),
502505
.RlPath = Nothing(),

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class TKqpPlanner {
4848
const TIntrusiveConstPtr<NACLib::TUserToken>& UserToken;
4949
const TInstant Deadline;
5050
const Ydb::Table::QueryStatsCollection::Mode& StatsMode;
51+
const bool WithProgressStats;
5152
const TMaybe<NKikimrKqp::TRlPath>& RlPath;
5253
NWilson::TSpan& ExecuterSpan;
5354
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
@@ -107,6 +108,7 @@ class TKqpPlanner {
107108
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
108109
const TInstant Deadline;
109110
const Ydb::Table::QueryStatsCollection::Mode StatsMode;
111+
const bool WithProgressStats;
110112
const TMaybe<NKikimrKqp::TRlPath> RlPath;
111113
THashSet<ui32> TrackingNodes;
112114
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
274274
.MemoryPool = memoryPool,
275275
.WithSpilling = msgRtSettings.GetUseSpilling(),
276276
.StatsMode = msgRtSettings.GetStatsMode(),
277+
.WithProgressStats = msgRtSettings.GetWithProgressStats(),
277278
.Deadline = TInstant(),
278279
.ShareMailbox = false,
279280
.RlPath = rlPath,

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1567,14 +1567,13 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
15671567
}
15681568

15691569
if (QueryState->ReportStats()) {
1570-
if (QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
1571-
NKqpProto::TKqpStatsQuery& stats = *ev->Get()->Record.MutableQueryStats();
1572-
NKqpProto::TKqpStatsQuery executionStats;
1573-
executionStats.Swap(&stats);
1574-
stats = QueryState->QueryStats.ToProto();
1575-
stats.MutableExecutions()->MergeFrom(executionStats.GetExecutions());
1576-
ev->Get()->Record.SetQueryPlan(SerializeAnalyzePlan(stats, QueryState->UserRequestContext->PoolId));
1577-
}
1570+
NKqpProto::TKqpStatsQuery& stats = *ev->Get()->Record.MutableQueryStats();
1571+
NKqpProto::TKqpStatsQuery executionStats;
1572+
executionStats.Swap(&stats);
1573+
stats = QueryState->QueryStats.ToProto();
1574+
stats.MutableExecutions()->MergeFrom(executionStats.GetExecutions());
1575+
ev->Get()->Record.SetQueryPlan(SerializeAnalyzePlan(stats, QueryState->UserRequestContext->PoolId));
1576+
stats.SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds());
15781577
}
15791578

15801579
LOG_D("Forwarded TEvExecuterProgress to " << QueryState->RequestActorId);

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ struct TComputeRuntimeSettings {
256256

257257
i64 AsyncInputPushLimit = std::numeric_limits<i64>::max();
258258

259+
bool WithProgressStats = false;
260+
259261
inline bool CollectNone() const {
260262
return StatsMode <= NDqProto::DQ_STATS_MODE_NONE;
261263
}

0 commit comments

Comments
 (0)