Skip to content

Commit 3c5ebc4

Browse files
authored
Hash shuffle spilling and stats fixes (#18878)
2 parents 7903d34 + 41f58ef commit 3c5ebc4

19 files changed

+117
-39
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
654654
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
655655
kqpConfig.EnableSnapshotIsolationRW = serviceConfig.GetEnableSnapshotIsolationRW();
656656
kqpConfig.EnableSpilling = serviceConfig.GetEnableQueryServiceSpilling();
657+
kqpConfig.EnableSpillingInHashJoinShuffleConnections = serviceConfig.GetEnableSpillingInHashJoinShuffleConnections();
657658

658659
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
659660
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
318318

319319
bool enableSnapshotIsolationRW = TableServiceConfig.GetEnableSnapshotIsolationRW();
320320

321+
bool enableSpillingInHashJoinShuffleConnections = TableServiceConfig.GetEnableSpillingInHashJoinShuffleConnections();
322+
321323
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
322324
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
323325

@@ -350,8 +352,9 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
350352
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution ||
351353
TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW ||
352354
TableServiceConfig.GetEnableQueryServiceSpilling() != enableSpilling ||
353-
TableServiceConfig.GetDefaultEnableShuffleElimination() != defaultEnableShuffleElimination
354-
) {
355+
TableServiceConfig.GetDefaultEnableShuffleElimination() != defaultEnableShuffleElimination ||
356+
TableServiceConfig.GetEnableSpillingInHashJoinShuffleConnections() != enableSpillingInHashJoinShuffleConnections)
357+
{
355358

356359
QueryCache->Clear();
357360

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ void TPartitionedStats::ResizeByParts(ui32 partCount, ui32 taskCount) {
148148
Resize(partCount);
149149
}
150150

151-
void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
151+
void TPartitionedStats::SetNonZeroAggSum(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
152152
if (value) {
153153
AFL_ENSURE(partIndex < Parts.size());
154154
auto& part = Parts[partIndex];
@@ -164,7 +164,43 @@ void TPartitionedStats::SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, b
164164
}
165165
}
166166

167-
void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries) {
167+
void TPartitionedStats::SetNonZeroAggMin(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
168+
if (value) {
169+
AFL_ENSURE(partIndex < Parts.size());
170+
auto& part = Parts[partIndex];
171+
AFL_ENSURE(taskIndex < part.size());
172+
part[taskIndex] = value;
173+
AFL_ENSURE(partIndex < Values.size());
174+
if (Values[partIndex] == 0 || value < Values[partIndex]) {
175+
// Min/Max is related to Parts[] only, Values[] should kepp count Sum as well
176+
Sum = Sum + value - Values[partIndex];
177+
Values[partIndex] = value;
178+
if (recordTimeSeries) {
179+
AppendHistory();
180+
}
181+
}
182+
}
183+
}
184+
185+
void TPartitionedStats::SetNonZeroAggMax(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
186+
if (value) {
187+
AFL_ENSURE(partIndex < Parts.size());
188+
auto& part = Parts[partIndex];
189+
AFL_ENSURE(taskIndex < part.size());
190+
part[taskIndex] = value;
191+
AFL_ENSURE(partIndex < Values.size());
192+
if (value > Values[partIndex]) {
193+
// Min/Max is related to Parts[] only, Values[] should kepp count Sum as well
194+
Sum = Sum + value - Values[partIndex];
195+
Values[partIndex] = value;
196+
if (recordTimeSeries) {
197+
AppendHistory();
198+
}
199+
}
200+
}
201+
}
202+
203+
void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries, EPartitionedAggKind aggKind) {
168204
auto [it, inserted] = Indices.try_emplace(key);
169205
if (inserted) {
170206
it->second = Indices.size() - 1;
@@ -175,7 +211,18 @@ void TTimeMultiSeriesStats::SetNonZero(TPartitionedStats& stats, ui32 taskIndex,
175211
if (stats.Parts.size() < PartCount) {
176212
stats.ResizeByParts(PartCount, TaskCount);
177213
}
178-
stats.SetNonZero(taskIndex, it->second, value, recordTimeSeries);
214+
215+
switch (aggKind) {
216+
case EPartitionedAggKind::PartitionedAggSum:
217+
stats.SetNonZeroAggSum(taskIndex, it->second, value, recordTimeSeries);
218+
break;
219+
case EPartitionedAggKind::PartitionedAggMin:
220+
stats.SetNonZeroAggMin(taskIndex, it->second, value, recordTimeSeries);
221+
break;
222+
case EPartitionedAggKind::PartitionedAggMax:
223+
stats.SetNonZeroAggMax(taskIndex, it->second, value, recordTimeSeries);
224+
break;
225+
}
179226
}
180227

181228
void TExternalStats::Resize(ui32 taskCount) {
@@ -524,13 +571,13 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
524571
for (auto& partitionStat : sourceStat.GetExternalPartitions()) {
525572
auto key = partitionStat.GetPartitionId();
526573
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalRows,
527-
index, key, partitionStat.GetExternalRows(), false);
574+
index, key, partitionStat.GetExternalRows(), false, EPartitionedAggKind::PartitionedAggSum);
528575
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.ExternalBytes,
529-
index, key, partitionStat.GetExternalBytes(), true);
576+
index, key, partitionStat.GetExternalBytes(), true, EPartitionedAggKind::PartitionedAggSum);
530577
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.FirstMessageMs,
531-
index, key, partitionStat.GetFirstMessageMs(), false);
578+
index, key, partitionStat.GetFirstMessageMs(), false, EPartitionedAggKind::PartitionedAggMin);
532579
asyncBufferStats.External.SetNonZero(asyncBufferStats.External.LastMessageMs,
533-
index, key, partitionStat.GetLastMessageMs(), false);
580+
index, key, partitionStat.GetLastMessageMs(), false, EPartitionedAggKind::PartitionedAggMax);
534581
}
535582
}
536583
}
@@ -798,6 +845,7 @@ void TQueryExecutionStats::Prepare() {
798845
auto [it, inserted] = StageStats.try_emplace(stageId);
799846
Y_ENSURE(inserted);
800847
it->second.StageId = stageId;
848+
it->second.SetHistorySampleCount(HistorySampleCount);
801849
}
802850
// connections
803851
for (auto& [_, stageStats] : StageStats) {

ydb/core/kqp/executer_actor/kqp_executer_stats.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,23 @@ struct TPartitionedStats : public TTimeSeriesStats {
5252

5353
void ResizeByTasks(ui32 taskCount);
5454
void ResizeByParts(ui32 partCount, ui32 taskCount);
55-
void SetNonZero(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
55+
void SetNonZeroAggSum(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
56+
void SetNonZeroAggMin(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
57+
void SetNonZeroAggMax(ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries);
58+
};
59+
60+
enum EPartitionedAggKind {
61+
PartitionedAggSum,
62+
PartitionedAggMin,
63+
PartitionedAggMax,
5664
};
5765

5866
struct TTimeMultiSeriesStats {
5967
std::unordered_map<TString, ui32> Indices;
6068
ui32 TaskCount = 0;
6169
ui32 PartCount = 0;
6270

63-
void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries);
71+
void SetNonZero(TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries, EPartitionedAggKind aggKind);
6472
};
6573

6674
struct TExternalStats : public TTimeMultiSeriesStats {

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,7 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
548548
break;
549549
case NKqpProto::TKqpPhyConnection::kHashShuffle: {
550550
ui32 hashKind = NHashKind::EUndefined;
551+
auto forceSpilling = input.GetHashShuffle().GetUseSpilling();
551552
switch (input.GetHashShuffle().GetHashKindCase()) {
552553
case NKqpProto::TKqpPhyCnHashShuffle::kHashV1: {
553554
hashKind = NHashKind::EHashV1;
@@ -591,7 +592,8 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
591592
input.GetHashShuffle().GetKeyColumns(),
592593
enableSpilling,
593594
log,
594-
hashKind
595+
hashKind,
596+
forceSpilling
595597
);
596598
break;
597599
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
186186
bool DefaultEnableShuffleElimination = false;
187187
bool FilterPushdownOverJoinOptionalSide = false;
188188
THashSet<TString> YqlCoreOptimizerFlags;
189+
bool EnableSpillingInHashJoinShuffleConnections = false;
189190

190191
void SetDefaultEnabledSpillingNodes(const TString& node);
191192
ui64 GetEnabledSpillingNodes() const;

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,6 +1347,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
13471347
shuffleProto.MutableHashV1();
13481348
}
13491349

1350+
if (Config->EnableSpillingInHashJoinShuffleConnections && shuffle.UseSpilling()) {
1351+
shuffleProto.SetUseSpilling(FromStringWithDefault<bool>(shuffle.UseSpilling().Cast().StringValue(), false));
1352+
}
1353+
13501354
return;
13511355
}
13521356

ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ NKikimrConfig::TAppConfig AppCfgLowComputeLimits(double reasonableTreshold, bool
3939
auto* ts = appCfg.MutableTableServiceConfig();
4040

4141
ts->SetEnableQueryServiceSpilling(enableSpilling);
42+
ts->SetEnableSpillingInHashJoinShuffleConnections(false);
4243

4344
auto* rm = ts->MutableResourceManager();
4445
rm->SetMkqlLightProgramMemoryLimit(100);

ydb/core/protos/kqp_physical.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ message TKqpPhyCnHashShuffle {
261261
THashV1 HashV1 = 2;
262262
TColumnShardHashV1 ColumnShardHashV1 = 3;
263263
}
264+
265+
bool UseSpilling = 4;
264266
}
265267

266268
message TKqpPhyCnBroadcast {

ydb/core/protos/table_service_config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,4 +374,6 @@ message TTableServiceConfig {
374374
optional bool EnableFoldUdfs = 82 [ default = true ];
375375

376376
optional bool FilterPushdownOverJoinOptionalSide = 83 [ default = true ];
377+
378+
optional bool EnableSpillingInHashJoinShuffleConnections = 85 [default = true];
377379
};

0 commit comments

Comments
 (0)