Skip to content

Commit e06c432

Browse files
authored
Support progress stats with basic stats mode (#16773)
1 parent 80d888d commit e06c432

File tree

12 files changed

+126
-90
lines changed

12 files changed

+126
-90
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
386386
if (NeedReportStats(*Request_->GetProtoRequest())) {
387387
if (record.HasQueryStats()) {
388388
FillQueryStats(*response.mutable_exec_stats(), record.GetQueryStats());
389-
response.mutable_exec_stats()->set_query_plan(NKqp::SerializeAnalyzePlan(record.GetQueryStats()));
389+
response.mutable_exec_stats()->set_query_plan(record.GetQueryPlan());
390390
}
391391
}
392392

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
170170
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
171171
runtimeSettings.UseSpilling = args.WithSpilling;
172172
runtimeSettings.StatsMode = args.StatsMode;
173+
runtimeSettings.WithProgressStats = args.WithProgressStats;
173174

174175
if (runtimeSettings.UseSpilling) {
175176
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ struct IKqpNodeComputeActorFactory {
122122
const NKikimr::NKqp::NRm::EKqpMemoryPool MemoryPool;
123123
const bool WithSpilling;
124124
const NYql::NDqProto::EDqStatsMode StatsMode;
125+
const bool WithProgressStats;
125126
const TInstant& Deadline;
126127
const bool ShareMailbox;
127128
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
16181618
.UserToken = UserToken,
16191619
.Deadline = Deadline.GetOrElse(TInstant::Zero()),
16201620
.StatsMode = Request.StatsMode,
1621+
.WithProgressStats = Request.ProgressStatsPeriod != TDuration::Zero(),
16211622
.RlPath = Request.RlPath,
16221623
.ExecuterSpan = ExecuterSpan,
16231624
.ResourcesSnapshot = std::move(ResourcesSnapshot),

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 102 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1326,91 +1326,116 @@ void TQueryExecutionStats::ExportAggAsyncBufferStats(TAsyncBufferStats& data, NY
13261326
void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats) {
13271327

13281328
THashMap<ui32, NDqProto::TDqStageStats*> protoStages;
1329-
for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) {
1330-
protoStages.emplace(stageId.StageId, GetOrCreateStageStats(stageId, *TasksGraph, stats));
1329+
1330+
if (CollectFullStats(StatsMode)) {
1331+
for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) {
1332+
protoStages.emplace(stageId.StageId, GetOrCreateStageStats(stageId, *TasksGraph, stats));
1333+
}
13311334
}
13321335

1333-
for (auto& [stageId, stageStat] : StageStats) {
1334-
auto& stageStats = *protoStages[stageStat.StageId.StageId];
1335-
stageStats.SetTotalTasksCount(stageStat.Task2Index.size());
1336-
stageStats.SetFinishedTasksCount(stageStat.FinishedCount);
1337-
1338-
stageStats.SetBaseTimeMs(BaseTimeMs);
1339-
stageStat.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs());
1340-
ExportAggStats(stageStat.SourceCpuTimeUs, *stageStats.MutableSourceCpuTimeUs());
1341-
stageStat.MaxMemoryUsage.ExportAggStats(BaseTimeMs, *stageStats.MutableMaxMemoryUsage());
1342-
1343-
ExportAggStats(stageStat.InputRows, *stageStats.MutableInputRows());
1344-
ExportAggStats(stageStat.InputBytes, *stageStats.MutableInputBytes());
1345-
ExportAggStats(stageStat.OutputRows, *stageStats.MutableOutputRows());
1346-
ExportAggStats(stageStat.OutputBytes, *stageStats.MutableOutputBytes());
1347-
ExportAggStats(stageStat.ResultRows, *stageStats.MutableResultRows());
1348-
ExportAggStats(stageStat.ResultBytes, *stageStats.MutableResultBytes());
1349-
ExportAggStats(stageStat.IngressRows, *stageStats.MutableIngressRows());
1350-
ExportAggStats(stageStat.IngressBytes, *stageStats.MutableIngressBytes());
1351-
ExportAggStats(stageStat.IngressDecompressedBytes, *stageStats.MutableIngressDecompressedBytes());
1352-
ExportAggStats(stageStat.EgressRows, *stageStats.MutableEgressRows());
1353-
ExportAggStats(stageStat.EgressBytes, *stageStats.MutableEgressBytes());
1354-
1355-
ExportOffsetAggStats(stageStat.StartTimeMs, *stageStats.MutableStartTimeMs(), BaseTimeMs);
1356-
ExportOffsetAggStats(stageStat.FinishTimeMs, *stageStats.MutableFinishTimeMs(), BaseTimeMs);
1357-
ExportAggStats(stageStat.DurationUs, *stageStats.MutableDurationUs());
1358-
stageStat.WaitInputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitInputTimeUs());
1359-
stageStat.WaitOutputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitOutputTimeUs());
1360-
1361-
stageStat.SpillingComputeBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeBytes());
1362-
stageStat.SpillingChannelBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelBytes());
1363-
stageStat.SpillingComputeTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeTimeUs());
1364-
stageStat.SpillingChannelTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelTimeUs());
1365-
1366-
FillStageDurationUs(stageStats);
1336+
std::unordered_map<TString, NYql::NDqProto::TDqTableStats*> currentTableStats;
13671337

1338+
for (auto& [stageId, stageStat] : StageStats) {
13681339
for (auto& [path, t] : stageStat.Tables) {
1369-
auto& table = *stageStats.AddTables();
1370-
table.SetTablePath(path);
1371-
ExportAggStats(t.ReadRows, *table.MutableReadRows());
1372-
ExportAggStats(t.ReadBytes, *table.MutableReadBytes());
1373-
ExportAggStats(t.WriteRows, *table.MutableWriteRows());
1374-
ExportAggStats(t.WriteBytes, *table.MutableWriteBytes());
1375-
ExportAggStats(t.EraseRows, *table.MutableEraseRows());
1376-
ExportAggStats(t.EraseBytes, *table.MutableEraseBytes());
1377-
table.SetAffectedPartitions(ExportAggStats(t.AffectedPartitions));
1378-
}
1379-
for (auto& [id, i] : stageStat.Ingress) {
1380-
ExportAggAsyncBufferStats(i, (*stageStats.MutableIngress())[id]);
1381-
}
1382-
for (auto& [id, i] : stageStat.Input) {
1383-
ExportAggAsyncBufferStats(i, (*stageStats.MutableInput())[id]);
1384-
}
1385-
for (auto& [id, o] : stageStat.Output) {
1386-
ExportAggAsyncBufferStats(o, (*stageStats.MutableOutput())[id]);
1387-
}
1388-
for (auto& [id, e] : stageStat.Egress) {
1389-
ExportAggAsyncBufferStats(e, (*stageStats.MutableEgress())[id]);
1390-
}
1391-
for (auto& [id, j] : stageStat.Joins) {
1392-
auto& joinStat = (*stageStats.MutableOperatorJoin())[id];
1393-
joinStat.SetOperatorId(id);
1394-
ExportAggStats(j.Bytes, *joinStat.MutableBytes());
1395-
ExportAggStats(j.Rows, *joinStat.MutableRows());
1396-
}
1397-
for (auto& [id, f] : stageStat.Filters) {
1398-
auto& filterStat = (*stageStats.MutableOperatorFilter())[id];
1399-
filterStat.SetOperatorId(id);
1400-
ExportAggStats(f.Bytes, *filterStat.MutableBytes());
1401-
ExportAggStats(f.Rows, *filterStat.MutableRows());
1340+
NYql::NDqProto::TDqTableStats* tableAggr = nullptr;
1341+
if (auto it = currentTableStats.find(path); it != currentTableStats.end()) {
1342+
tableAggr = it->second;
1343+
} else {
1344+
tableAggr = stats.AddTables();
1345+
tableAggr->SetTablePath(path);
1346+
currentTableStats.emplace(path, tableAggr);
1347+
}
1348+
1349+
tableAggr->SetReadRows(tableAggr->GetReadRows() + ExportAggStats(t.ReadRows));
1350+
tableAggr->SetReadBytes(tableAggr->GetReadBytes() + ExportAggStats(t.ReadBytes));
1351+
tableAggr->SetWriteRows(tableAggr->GetWriteRows() + ExportAggStats(t.WriteRows));
1352+
tableAggr->SetWriteBytes(tableAggr->GetWriteBytes() + ExportAggStats(t.WriteBytes));
1353+
tableAggr->SetEraseRows(tableAggr->GetEraseRows() + ExportAggStats(t.EraseRows));
1354+
tableAggr->SetAffectedPartitions(tableAggr->GetAffectedPartitions() + ExportAggStats(t.AffectedPartitions));
1355+
14021356
}
1403-
for (auto& [id, a] : stageStat.Aggregations) {
1404-
auto& aggrStat = (*stageStats.MutableOperatorAggregation())[id];
1405-
aggrStat.SetOperatorId(id);
1406-
ExportAggStats(a.Bytes, *aggrStat.MutableBytes());
1407-
ExportAggStats(a.Rows, *aggrStat.MutableRows());
1357+
1358+
1359+
if (CollectFullStats(StatsMode)) {
1360+
auto& stageStats = *protoStages[stageStat.StageId.StageId];
1361+
stageStats.SetTotalTasksCount(stageStat.Task2Index.size());
1362+
stageStats.SetFinishedTasksCount(stageStat.FinishedCount);
1363+
1364+
stageStats.SetBaseTimeMs(BaseTimeMs);
1365+
stageStat.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs());
1366+
ExportAggStats(stageStat.SourceCpuTimeUs, *stageStats.MutableSourceCpuTimeUs());
1367+
stageStat.MaxMemoryUsage.ExportAggStats(BaseTimeMs, *stageStats.MutableMaxMemoryUsage());
1368+
1369+
ExportAggStats(stageStat.InputRows, *stageStats.MutableInputRows());
1370+
ExportAggStats(stageStat.InputBytes, *stageStats.MutableInputBytes());
1371+
ExportAggStats(stageStat.OutputRows, *stageStats.MutableOutputRows());
1372+
ExportAggStats(stageStat.OutputBytes, *stageStats.MutableOutputBytes());
1373+
ExportAggStats(stageStat.ResultRows, *stageStats.MutableResultRows());
1374+
ExportAggStats(stageStat.ResultBytes, *stageStats.MutableResultBytes());
1375+
ExportAggStats(stageStat.IngressRows, *stageStats.MutableIngressRows());
1376+
ExportAggStats(stageStat.IngressBytes, *stageStats.MutableIngressBytes());
1377+
ExportAggStats(stageStat.IngressDecompressedBytes, *stageStats.MutableIngressDecompressedBytes());
1378+
ExportAggStats(stageStat.EgressRows, *stageStats.MutableEgressRows());
1379+
ExportAggStats(stageStat.EgressBytes, *stageStats.MutableEgressBytes());
1380+
1381+
ExportOffsetAggStats(stageStat.StartTimeMs, *stageStats.MutableStartTimeMs(), BaseTimeMs);
1382+
ExportOffsetAggStats(stageStat.FinishTimeMs, *stageStats.MutableFinishTimeMs(), BaseTimeMs);
1383+
ExportAggStats(stageStat.DurationUs, *stageStats.MutableDurationUs());
1384+
stageStat.WaitInputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitInputTimeUs());
1385+
stageStat.WaitOutputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitOutputTimeUs());
1386+
1387+
stageStat.SpillingComputeBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeBytes());
1388+
stageStat.SpillingChannelBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelBytes());
1389+
stageStat.SpillingComputeTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeTimeUs());
1390+
stageStat.SpillingChannelTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelTimeUs());
1391+
1392+
FillStageDurationUs(stageStats);
1393+
1394+
for (auto& [path, t] : stageStat.Tables) {
1395+
auto& table = *stageStats.AddTables();
1396+
table.SetTablePath(path);
1397+
ExportAggStats(t.ReadRows, *table.MutableReadRows());
1398+
ExportAggStats(t.ReadBytes, *table.MutableReadBytes());
1399+
ExportAggStats(t.WriteRows, *table.MutableWriteRows());
1400+
ExportAggStats(t.WriteBytes, *table.MutableWriteBytes());
1401+
ExportAggStats(t.EraseRows, *table.MutableEraseRows());
1402+
ExportAggStats(t.EraseBytes, *table.MutableEraseBytes());
1403+
table.SetAffectedPartitions(ExportAggStats(t.AffectedPartitions));
1404+
}
1405+
for (auto& [id, i] : stageStat.Ingress) {
1406+
ExportAggAsyncBufferStats(i, (*stageStats.MutableIngress())[id]);
1407+
}
1408+
for (auto& [id, i] : stageStat.Input) {
1409+
ExportAggAsyncBufferStats(i, (*stageStats.MutableInput())[id]);
1410+
}
1411+
for (auto& [id, o] : stageStat.Output) {
1412+
ExportAggAsyncBufferStats(o, (*stageStats.MutableOutput())[id]);
1413+
}
1414+
for (auto& [id, e] : stageStat.Egress) {
1415+
ExportAggAsyncBufferStats(e, (*stageStats.MutableEgress())[id]);
1416+
}
1417+
for (auto& [id, j] : stageStat.Joins) {
1418+
auto& joinStat = (*stageStats.MutableOperatorJoin())[id];
1419+
joinStat.SetOperatorId(id);
1420+
ExportAggStats(j.Bytes, *joinStat.MutableBytes());
1421+
ExportAggStats(j.Rows, *joinStat.MutableRows());
1422+
}
1423+
for (auto& [id, f] : stageStat.Filters) {
1424+
auto& filterStat = (*stageStats.MutableOperatorFilter())[id];
1425+
filterStat.SetOperatorId(id);
1426+
ExportAggStats(f.Bytes, *filterStat.MutableBytes());
1427+
ExportAggStats(f.Rows, *filterStat.MutableRows());
1428+
}
1429+
for (auto& [id, a] : stageStat.Aggregations) {
1430+
auto& aggrStat = (*stageStats.MutableOperatorAggregation())[id];
1431+
aggrStat.SetOperatorId(id);
1432+
ExportAggStats(a.Bytes, *aggrStat.MutableBytes());
1433+
ExportAggStats(a.Rows, *aggrStat.MutableRows());
1434+
}
14081435
}
14091436
}
14101437

1411-
for (const auto& [_, tableStats] : TableStats) {
1412-
stats.AddTables()->CopyFrom(*tableStats);
1413-
}
1438+
stats.SetDurationUs(TInstant::Now().MicroSeconds() - StartTs.MicroSeconds());
14141439
}
14151440

14161441
void TQueryExecutionStats::AdjustExternalAggr(NYql::NDqProto::TDqExternalAggrStats& stats) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
8989
, UserToken(args.UserToken)
9090
, Deadline(args.Deadline)
9191
, StatsMode(args.StatsMode)
92+
, WithProgressStats(args.WithProgressStats)
9293
, RlPath(args.RlPath)
9394
, ResourcesSnapshot(std::move(args.ResourcesSnapshot))
9495
, ExecuterSpan(args.ExecuterSpan)
@@ -224,6 +225,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
224225
}
225226

226227
request.MutableRuntimeSettings()->SetStatsMode(GetDqStatsMode(StatsMode));
228+
request.MutableRuntimeSettings()->SetWithProgressStats(WithProgressStats);
227229
request.SetStartAllOrFail(true);
228230
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
229231
request.MutableRuntimeSettings()->SetUseSpilling(TasksGraph.GetMeta().AllowWithSpilling);
@@ -499,6 +501,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
499501
.MemoryPool = NRm::EKqpMemoryPool::DataQuery,
500502
.WithSpilling = TasksGraph.GetMeta().AllowWithSpilling,
501503
.StatsMode = GetDqStatsMode(StatsMode),
504+
.WithProgressStats = WithProgressStats,
502505
.Deadline = Deadline,
503506
.ShareMailbox = (computeTasksSize <= 1),
504507
.RlPath = Nothing(),

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class TKqpPlanner {
4848
const TIntrusiveConstPtr<NACLib::TUserToken>& UserToken;
4949
const TInstant Deadline;
5050
const Ydb::Table::QueryStatsCollection::Mode& StatsMode;
51+
const bool WithProgressStats;
5152
const TMaybe<NKikimrKqp::TRlPath>& RlPath;
5253
NWilson::TSpan& ExecuterSpan;
5354
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
@@ -110,6 +111,7 @@ class TKqpPlanner {
110111
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
111112
const TInstant Deadline;
112113
const Ydb::Table::QueryStatsCollection::Mode StatsMode;
114+
const bool WithProgressStats;
113115
const TMaybe<NKikimrKqp::TRlPath> RlPath;
114116
THashSet<ui32> TrackingNodes;
115117
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
274274
.MemoryPool = memoryPool,
275275
.WithSpilling = msgRtSettings.GetUseSpilling(),
276276
.StatsMode = msgRtSettings.GetStatsMode(),
277+
.WithProgressStats = msgRtSettings.GetWithProgressStats(),
277278
.Deadline = TInstant(),
278279
.ShareMailbox = false,
279280
.RlPath = rlPath,

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1567,14 +1567,13 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
15671567
}
15681568

15691569
if (QueryState->ReportStats()) {
1570-
if (QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
1571-
NKqpProto::TKqpStatsQuery& stats = *ev->Get()->Record.MutableQueryStats();
1572-
NKqpProto::TKqpStatsQuery executionStats;
1573-
executionStats.Swap(&stats);
1574-
stats = QueryState->QueryStats.ToProto();
1575-
stats.MutableExecutions()->MergeFrom(executionStats.GetExecutions());
1576-
ev->Get()->Record.SetQueryPlan(SerializeAnalyzePlan(stats, QueryState->UserRequestContext->PoolId));
1577-
}
1570+
NKqpProto::TKqpStatsQuery& stats = *ev->Get()->Record.MutableQueryStats();
1571+
NKqpProto::TKqpStatsQuery executionStats;
1572+
executionStats.Swap(&stats);
1573+
stats = QueryState->QueryStats.ToProto();
1574+
stats.MutableExecutions()->MergeFrom(executionStats.GetExecutions());
1575+
ev->Get()->Record.SetQueryPlan(SerializeAnalyzePlan(stats, QueryState->UserRequestContext->PoolId));
1576+
stats.SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds());
15781577
}
15791578

15801579
LOG_D("Forwarded TEvExecuterProgress to " << QueryState->RequestActorId);

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ struct TComputeRuntimeSettings {
256256

257257
i64 AsyncInputPushLimit = std::numeric_limits<i64>::max();
258258

259+
bool WithProgressStats = false;
260+
259261
inline bool CollectNone() const {
260262
return StatsMode <= NDqProto::DQ_STATS_MODE_NONE;
261263
}

0 commit comments

Comments
 (0)