@@ -189,8 +189,9 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
189
189
FinishTimeMs.resize (taskCount);
190
190
StartTimeMs.resize (taskCount);
191
191
DurationUs.resize (taskCount);
192
- WaitInputTimeUs.resize (taskCount);
193
- WaitOutputTimeUs.resize (taskCount);
192
+
193
+ WaitInputTimeUs.Resize (taskCount);
194
+ WaitOutputTimeUs.Resize (taskCount);
194
195
195
196
SpillingComputeBytes.Resize (taskCount);
196
197
SpillingChannelBytes.Resize (taskCount);
@@ -215,6 +216,10 @@ void TStageExecutionStats::SetHistorySampleCount(ui32 historySampleCount) {
215
216
HistorySampleCount = historySampleCount;
216
217
CpuTimeUs.HistorySampleCount = historySampleCount;
217
218
MaxMemoryUsage.HistorySampleCount = historySampleCount;
219
+
220
+ WaitInputTimeUs.HistorySampleCount = historySampleCount;
221
+ WaitOutputTimeUs.HistorySampleCount = historySampleCount;
222
+
218
223
SpillingComputeBytes.HistorySampleCount = historySampleCount;
219
224
SpillingChannelBytes.HistorySampleCount = historySampleCount;
220
225
SpillingComputeTimeUs.HistorySampleCount = historySampleCount;
@@ -252,6 +257,12 @@ void TStageExecutionStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqSta
252
257
if (stageStats.HasMaxMemoryUsage ()) {
253
258
MaxMemoryUsage.ExportHistory (baseTimeMs, *stageStats.MutableMaxMemoryUsage ());
254
259
}
260
+ if (stageStats.HasWaitInputTimeUs ()) {
261
+ WaitInputTimeUs.ExportHistory (baseTimeMs, *stageStats.MutableWaitInputTimeUs ());
262
+ }
263
+ if (stageStats.HasWaitOutputTimeUs ()) {
264
+ WaitOutputTimeUs.ExportHistory (baseTimeMs, *stageStats.MutableWaitOutputTimeUs ());
265
+ }
255
266
if (stageStats.HasSpillingComputeBytes ()) {
256
267
SpillingComputeBytes.ExportHistory (baseTimeMs, *stageStats.MutableSpillingComputeBytes ());
257
268
}
@@ -346,8 +357,8 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
346
357
baseTimeMs = NonZeroMin (baseTimeMs, finishTimeMs);
347
358
348
359
SetNonZero (DurationUs[index], durationUs);
349
- SetNonZero (WaitInputTimeUs[ index] , taskStats.GetWaitInputTimeUs ());
350
- SetNonZero (WaitOutputTimeUs[ index] , taskStats.GetWaitOutputTimeUs ());
360
+ WaitInputTimeUs. SetNonZero (index, taskStats.GetWaitInputTimeUs ());
361
+ WaitOutputTimeUs. SetNonZero (index, taskStats.GetWaitOutputTimeUs ());
351
362
352
363
SpillingComputeBytes.SetNonZero (index, taskStats.GetSpillingComputeWriteBytes ());
353
364
SpillingChannelBytes.SetNonZero (index, taskStats.GetSpillingChannelWriteBytes ());
@@ -466,6 +477,13 @@ TProgressStatEntry operator - (const TProgressStatEntry& l, const TProgressStatE
466
477
};
467
478
}
468
479
480
+ void MergeAggr (NDqProto::TDqStatsAggr& aggr, const NDqProto::TDqStatsAggr& stat) noexcept {
481
+ aggr.SetMin (NonZeroMin (aggr.GetMin (), stat.GetMin ()));
482
+ aggr.SetMax (std::max (aggr.GetMax (), stat.GetMax ()));
483
+ aggr.SetSum (aggr.GetSum () + stat.GetSum ());
484
+ aggr.SetCnt (aggr.GetCnt () + stat.GetCnt ());
485
+ }
486
+
469
487
void UpdateAggr (NDqProto::TDqStatsAggr* aggr, ui64 value) noexcept {
470
488
if (value) {
471
489
if (aggr->GetMin () == 0 ) {
@@ -479,6 +497,19 @@ void UpdateAggr(NDqProto::TDqStatsAggr* aggr, ui64 value) noexcept {
479
497
}
480
498
}
481
499
500
+ void MergeExternal (NDqProto::TDqExternalAggrStats& asyncAggr, const NDqProto::TDqExternalAggrStats& asyncStat) noexcept {
501
+ MergeAggr (*asyncAggr.MutableExternalRows (), asyncStat.GetExternalRows ());
502
+ MergeAggr (*asyncAggr.MutableExternalBytes (), asyncStat.GetExternalBytes ());
503
+ MergeAggr (*asyncAggr.MutableStorageRows (), asyncStat.GetStorageRows ());
504
+ MergeAggr (*asyncAggr.MutableStorageBytes (), asyncStat.GetStorageBytes ());
505
+ MergeAggr (*asyncAggr.MutableCpuTimeUs (), asyncStat.GetCpuTimeUs ());
506
+ MergeAggr (*asyncAggr.MutableWaitInputTimeUs (), asyncStat.GetWaitInputTimeUs ());
507
+ MergeAggr (*asyncAggr.MutableWaitOutputTimeUs (), asyncStat.GetWaitOutputTimeUs ());
508
+ MergeAggr (*asyncAggr.MutableFirstMessageMs (), asyncStat.GetFirstMessageMs ());
509
+ MergeAggr (*asyncAggr.MutableLastMessageMs (), asyncStat.GetLastMessageMs ());
510
+ asyncAggr.SetPartitionCount (asyncAggr.GetPartitionCount () + asyncStat.GetExternalRows ().GetCnt ());
511
+ }
512
+
482
513
ui64 UpdateAsyncAggr (NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept {
483
514
ui64 baseTimeMs = 0 ;
484
515
@@ -652,9 +683,27 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
652
683
FillStageDurationUs (*stageStats);
653
684
654
685
for (auto & sourcesStat : task.GetSources ()) {
655
- BaseTimeMs = NonZeroMin (BaseTimeMs, UpdateAsyncAggr (*(*stageStats->MutableIngress ())[sourcesStat.GetIngressName ()].MutableIngress (), sourcesStat.GetIngress ()));
656
- BaseTimeMs = NonZeroMin (BaseTimeMs, UpdateAsyncAggr (*(*stageStats->MutableIngress ())[sourcesStat.GetIngressName ()].MutablePush (), sourcesStat.GetPush ()));
657
- BaseTimeMs = NonZeroMin (BaseTimeMs, UpdateAsyncAggr (*(*stageStats->MutableIngress ())[sourcesStat.GetIngressName ()].MutablePop (), sourcesStat.GetPop ()));
686
+ auto & ingress = (*stageStats->MutableIngress ())[sourcesStat.GetIngressName ()];
687
+ MergeExternal (*ingress.MutableExternal (), sourcesStat.GetExternal ());
688
+
689
+ const auto & [it, inserted] = ExternalPartitionStats.emplace (stageStats->GetStageId (), sourcesStat.GetIngressName ());
690
+ auto & externalPartitionStat = it->second ;
691
+
692
+ for (auto & externalPartition : sourcesStat.GetExternalPartitions ()) {
693
+ const auto & [it, inserted] = externalPartitionStat.Stat .emplace (externalPartition.GetPartitionId (),
694
+ TExternalPartitionStat (externalPartition.GetExternalRows (), externalPartition.GetExternalBytes (),
695
+ externalPartition.GetFirstMessageMs (), externalPartition.GetLastMessageMs ()));
696
+ if (!inserted) {
697
+ it->second .ExternalRows += externalPartition.GetExternalRows ();
698
+ it->second .ExternalBytes += externalPartition.GetExternalBytes ();
699
+ it->second .FirstMessageMs = NonZeroMin (it->second .FirstMessageMs , externalPartition.GetFirstMessageMs ());
700
+ it->second .LastMessageMs = std::max (it->second .LastMessageMs , externalPartition.GetLastMessageMs ());
701
+ }
702
+ }
703
+
704
+ BaseTimeMs = NonZeroMin (BaseTimeMs, UpdateAsyncAggr (*ingress.MutableIngress (), sourcesStat.GetIngress ()));
705
+ BaseTimeMs = NonZeroMin (BaseTimeMs, UpdateAsyncAggr (*ingress.MutablePush (), sourcesStat.GetPush ()));
706
+ BaseTimeMs = NonZeroMin (BaseTimeMs, UpdateAsyncAggr (*ingress.MutablePop (), sourcesStat.GetPop ()));
658
707
}
659
708
for (auto & inputChannelStat : task.GetInputChannels ()) {
660
709
BaseTimeMs = NonZeroMin (BaseTimeMs, UpdateAsyncAggr (*(*stageStats->MutableInput ())[inputChannelStat.GetSrcStageId ()].MutablePush (), inputChannelStat.GetPush ()));
@@ -823,9 +872,14 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask(
823
872
UpdateAggr (stageStats->MutableFinishTimeMs (), finishTimeMs);
824
873
BaseTimeMs = NonZeroMin (BaseTimeMs, finishTimeMs);
825
874
875
+ FillStageDurationUs (*stageStats);
826
876
UpdateAggr (stageStats->MutableWaitInputTimeUs (), task.GetWaitInputTimeUs ());
827
877
UpdateAggr (stageStats->MutableWaitOutputTimeUs (), task.GetWaitOutputTimeUs ());
828
- FillStageDurationUs (*stageStats);
878
+
879
+ UpdateAggr (stageStats->MutableSpillingComputeBytes (), task.GetSpillingComputeWriteBytes ());
880
+ UpdateAggr (stageStats->MutableSpillingChannelBytes (), task.GetSpillingChannelWriteBytes ());
881
+ UpdateAggr (stageStats->MutableSpillingComputeTimeUs (), task.GetSpillingComputeReadTimeUs () + task.GetSpillingComputeWriteTimeUs ());
882
+ UpdateAggr (stageStats->MutableSpillingChannelTimeUs (), task.GetSpillingChannelReadTimeUs () + task.GetSpillingChannelWriteTimeUs ());
829
883
830
884
for (auto & tableStats: task.GetTables ()) {
831
885
auto * tableAggrStats = GetOrCreateTableAggrStats (stageStats, tableStats.GetTablePath ());
@@ -1094,8 +1148,8 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
1094
1148
ExportOffsetAggStats (stageStat.StartTimeMs , *stageStats.MutableStartTimeMs (), BaseTimeMs);
1095
1149
ExportOffsetAggStats (stageStat.FinishTimeMs , *stageStats.MutableFinishTimeMs (), BaseTimeMs);
1096
1150
ExportAggStats (stageStat.DurationUs , *stageStats.MutableDurationUs ());
1097
- ExportAggStats ( stageStat.WaitInputTimeUs , *stageStats.MutableWaitInputTimeUs ());
1098
- ExportAggStats ( stageStat.WaitOutputTimeUs , *stageStats.MutableWaitOutputTimeUs ());
1151
+ stageStat.WaitInputTimeUs . ExportAggStats (BaseTimeMs , *stageStats.MutableWaitInputTimeUs ());
1152
+ stageStat.WaitOutputTimeUs . ExportAggStats (BaseTimeMs , *stageStats.MutableWaitOutputTimeUs ());
1099
1153
1100
1154
stageStat.SpillingComputeBytes .ExportAggStats (BaseTimeMs, *stageStats.MutableSpillingComputeBytes ());
1101
1155
stageStat.SpillingChannelBytes .ExportAggStats (BaseTimeMs, *stageStats.MutableSpillingChannelBytes ());
@@ -1152,6 +1206,15 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
1152
1206
}
1153
1207
}
1154
1208
1209
+ void TQueryExecutionStats::AdjustExternalAggr (NYql::NDqProto::TDqExternalAggrStats& stats) {
1210
+ if (stats.HasFirstMessageMs ()) {
1211
+ AdjustDqStatsAggr (*stats.MutableFirstMessageMs ());
1212
+ }
1213
+ if (stats.HasLastMessageMs ()) {
1214
+ AdjustDqStatsAggr (*stats.MutableLastMessageMs ());
1215
+ }
1216
+ }
1217
+
1155
1218
void TQueryExecutionStats::AdjustAsyncAggr (NYql::NDqProto::TDqAsyncStatsAggr& stats) {
1156
1219
if (stats.HasFirstMessageMs ()) {
1157
1220
AdjustDqStatsAggr (*stats.MutableFirstMessageMs ());
@@ -1168,6 +1231,9 @@ void TQueryExecutionStats::AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& st
1168
1231
}
1169
1232
1170
1233
void TQueryExecutionStats::AdjustAsyncBufferAggr (NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
1234
+ if (stats.HasExternal ()) {
1235
+ AdjustExternalAggr (*stats.MutableExternal ());
1236
+ }
1171
1237
if (stats.HasIngress ()) {
1172
1238
AdjustAsyncAggr (*stats.MutableIngress ());
1173
1239
}
@@ -1184,13 +1250,15 @@ void TQueryExecutionStats::AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferS
1184
1250
1185
1251
void TQueryExecutionStats::AdjustDqStatsAggr (NYql::NDqProto::TDqStatsAggr& stats) {
1186
1252
if (auto min = stats.GetMin ()) {
1187
- stats.SetMin (min - BaseTimeMs);
1253
+ stats.SetMin (min > BaseTimeMs ? min - BaseTimeMs : 0 );
1188
1254
}
1189
1255
if (auto max = stats.GetMax ()) {
1190
- stats.SetMax (max - BaseTimeMs);
1256
+ stats.SetMax (max > BaseTimeMs ? max - BaseTimeMs : 0 );
1191
1257
}
1192
1258
if (auto cnt = stats.GetCnt ()) {
1193
- stats.SetSum (stats.GetSum () - BaseTimeMs * cnt);
1259
+ auto sum = stats.GetSum ();
1260
+ auto baseSum = BaseTimeMs * cnt;
1261
+ stats.SetSum (sum > baseSum ? sum - baseSum : 0 );
1194
1262
}
1195
1263
}
1196
1264
@@ -1222,6 +1290,39 @@ void TQueryExecutionStats::Finish() {
1222
1290
for (auto & [stageId, stagetype] : TasksGraph->GetStagesInfo ()) {
1223
1291
auto stageStats = GetOrCreateStageStats (stageId, *TasksGraph, *Result);
1224
1292
stageStats->SetBaseTimeMs (BaseTimeMs);
1293
+
1294
+ if (ExternalPartitionStats.contains (stageStats->GetStageId ())) {
1295
+ auto & externalPartitionStat = ExternalPartitionStats[stageStats->GetStageId ()];
1296
+ auto & ingress = (*stageStats->MutableIngress ())[externalPartitionStat.Name ];
1297
+ auto & external = *ingress.MutableExternal ();
1298
+ for (auto & [partitionId, partitionStat] : externalPartitionStat.Stat ) {
1299
+ auto & externalRows = *external.MutableExternalRows ();
1300
+ externalRows.SetMin (NonZeroMin (externalRows.GetMin (), partitionStat.ExternalRows ));
1301
+ externalRows.SetMax (std::max (externalRows.GetMax (), partitionStat.ExternalRows ));
1302
+ externalRows.SetSum (externalRows.GetSum () + partitionStat.ExternalRows );
1303
+ externalRows.SetCnt (externalRows.GetCnt () + 1 );
1304
+
1305
+ auto & externalBytes = *external.MutableExternalBytes ();
1306
+ externalBytes.SetMin (NonZeroMin (externalBytes.GetMin (), partitionStat.ExternalBytes ));
1307
+ externalBytes.SetMax (std::max (externalBytes.GetMax (), partitionStat.ExternalBytes ));
1308
+ externalBytes.SetSum (externalBytes.GetSum () + partitionStat.ExternalBytes );
1309
+ externalBytes.SetCnt (externalBytes.GetCnt () + 1 );
1310
+
1311
+ auto & firstMessageMs = *external.MutableFirstMessageMs ();
1312
+ firstMessageMs.SetMin (NonZeroMin (firstMessageMs.GetMin (), partitionStat.FirstMessageMs ));
1313
+ firstMessageMs.SetMax (std::max (firstMessageMs.GetMax (), partitionStat.FirstMessageMs ));
1314
+ firstMessageMs.SetSum (firstMessageMs.GetSum () + partitionStat.FirstMessageMs );
1315
+ firstMessageMs.SetCnt (firstMessageMs.GetCnt () + 1 );
1316
+
1317
+ auto & lastMessageMs = *external.MutableLastMessageMs ();
1318
+ lastMessageMs.SetMin (NonZeroMin (lastMessageMs.GetMin (), partitionStat.LastMessageMs ));
1319
+ lastMessageMs.SetMax (std::max (lastMessageMs.GetMax (), partitionStat.LastMessageMs ));
1320
+ lastMessageMs.SetSum (lastMessageMs.GetSum () + partitionStat.LastMessageMs );
1321
+ lastMessageMs.SetCnt (lastMessageMs.GetCnt () + 1 );
1322
+ }
1323
+ external.SetPartitionCount (external.GetPartitionCount () + externalPartitionStat.Stat .size ());
1324
+ }
1325
+
1225
1326
AdjustBaseTime (stageStats);
1226
1327
auto it = StageStats.find (stageId.StageId );
1227
1328
if (it != StageStats.end ()) {
0 commit comments