Skip to content

Commit e72a639

Browse files
authored
25-1-analytics [KQP] Splitted hashfunc into hashv1 and columnshardhashv1 (#19725)
2 parents b3bc0f3 + b250ff7 commit e72a639

File tree

100 files changed

+498
-55
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+498
-55
lines changed

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -550,11 +550,11 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
550550
BuildUnionAllChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, enableSpilling, log);
551551
break;
552552
case NKqpProto::TKqpPhyConnection::kHashShuffle: {
553-
ui32 hashKind = NHashKind::EUndefined;
553+
std::optional<EHashShuffleFuncType> hashKind;
554554
auto forceSpilling = input.GetHashShuffle().GetUseSpilling();
555555
switch (input.GetHashShuffle().GetHashKindCase()) {
556556
case NKqpProto::TKqpPhyCnHashShuffle::kHashV1: {
557-
hashKind = NHashKind::EHashV1;
557+
hashKind = EHashShuffleFuncType::HashV1;
558558
break;
559559
}
560560
case NKqpProto::TKqpPhyCnHashShuffle::kColumnShardHashV1: {
@@ -579,13 +579,15 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
579579
);
580580

581581
inputStageInfo.Meta.HashParamsByOutput[outputIdx] = columnShardHashV1Params;
582-
hashKind = NHashKind::EColumnShardHashV1;
582+
hashKind = EHashShuffleFuncType::ColumnShardHashV1;
583583
break;
584584
}
585585
default: {
586586
Y_ENSURE(false, "undefined type of hash for shuffle");
587587
}
588588
}
589+
590+
Y_ENSURE(hashKind.has_value(), "HashKind wasn't set!");
589591
BuildHashShuffleChannels(
590592
tasksGraph,
591593
stageInfo,
@@ -595,7 +597,7 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
595597
input.GetHashShuffle().GetKeyColumns(),
596598
enableSpilling,
597599
log,
598-
hashKind,
600+
hashKind.value(),
599601
forceSpilling
600602
);
601603
break;
@@ -1128,12 +1130,15 @@ void FillOutputDesc(
11281130
}
11291131
hashPartitionDesc.SetPartitionsCount(output.PartitionsCount);
11301132

1131-
switch (output.HashKind) {
1132-
case NHashKind::EHashV1: {
1133+
Y_ENSURE(output.HashKind.has_value(), "HashKind wasn't set before the FillOutputDesc!");
1134+
1135+
switch (output.HashKind.value()) {
1136+
using enum EHashShuffleFuncType;
1137+
case HashV1: {
11331138
hashPartitionDesc.MutableHashV1();
11341139
break;
11351140
}
1136-
case NHashKind::EColumnShardHashV1: {
1141+
case ColumnShardHashV1: {
11371142
auto& columnShardHashV1Params = stageInfo.Meta.GetColumnShardHashV1Params(outputIdx);
11381143
LOG_DEBUG_S(
11391144
*TlsActivationContext,

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/core/kqp/opt/kqp_statistics_transformer.h>
88
#include <ydb/core/kqp/opt/kqp_column_statistics_requester.h>
99
#include <ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
10+
#include <ydb/core/kqp/opt/kqp_opt_hash_func_propagate_transformer.h>
1011
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>
1112

1213

@@ -336,6 +337,15 @@ class TKqpRunner : public IKqpRunner {
336337
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
337338
.AddPostTypeAnnotation()
338339
.Add(CreateKqpBuildPhysicalQueryTransformer(OptimizeCtx, BuildQueryCtx), "BuildPhysicalQuery")
340+
.Add(CreateKqpTxsHashFuncPropagateTransformer(
341+
CreateTypeAnnotationTransformer(
342+
CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config), *typesCtx
343+
),
344+
*typesCtx,
345+
Config
346+
),
347+
"HashFuncPropagate"
348+
)
339349
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
340350
.Build(false);
341351

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
#include "kqp_opt_hash_func_propagate_transformer.h"
2+
3+
#include <ydb/library/yql/dq/opt/dq_opt_stat.h>
4+
#include <yql/essentials/utils/log/log.h>
5+
#include <yql/essentials/core/yql_expr_type_annotation.h>
6+
#include <yql/essentials/core/services/yql_transform_pipeline.h>
7+
8+
#include <yql/essentials/core/yql_expr_optimize.h>
9+
10+
#include <ydb/library/yql/dq/common/dq_common.h>
11+
12+
using namespace NYql;
13+
using namespace NYql::NNodes;
14+
using namespace NKikimr::NKqp;
15+
using namespace NYql::NDq;
16+
17+
TVector<TDqPhyStage> TopSortStages(const TDqPhyStageList& stages) {
18+
TVector<TDqPhyStage> topSortedStages;
19+
topSortedStages.reserve(stages.Size());
20+
std::function<void(const TDqPhyStage&)> topSort;
21+
THashSet<ui64 /*uniqueId*/> visitedStages;
22+
23+
// Assume there is no cycles.
24+
topSort = [&](const TDqPhyStage& stage) {
25+
if (visitedStages.contains(stage.Ref().UniqueId())) {
26+
return;
27+
}
28+
29+
for (const auto& input : stage.Inputs()) {
30+
if (auto connection = input.Maybe<TDqConnection>()) {
31+
// NOTE: somehow `Output()` is actually an input.
32+
if (auto phyStage = connection.Cast().Output().Stage().Maybe<TDqPhyStage>()) {
33+
topSort(phyStage.Cast());
34+
}
35+
}
36+
}
37+
38+
visitedStages.insert(stage.Ref().UniqueId());
39+
topSortedStages.push_back(stage);
40+
};
41+
42+
for (const auto& stage : stages) {
43+
topSort(stage);
44+
}
45+
46+
return topSortedStages;
47+
}
48+
49+
TMaybeNode<TKqpPhysicalTx> PropogateHashFuncToHashShuffles(
50+
const TKqpPhysicalTx& tx,
51+
TExprContext& ctx,
52+
const TKikimrConfiguration::TPtr& config
53+
) {
54+
const auto topSortedStages = TopSortStages(tx.Stages());
55+
56+
TVector<TDqPhyStage> newStages;
57+
newStages.reserve(topSortedStages.size());
58+
59+
THashMap<ui64, NDq::EHashShuffleFuncType> hashTypeByStageID;
60+
TNodeOnNodeOwnedMap stagesMap;
61+
for (const auto& stage : topSortedStages) {
62+
bool isRead = false;
63+
VisitExpr(
64+
stage.Program().Body().Ptr(),
65+
[&isRead](const TExprNode::TPtr& node) {
66+
if (TKqpTable::Match(node.Get())) {
67+
isRead = true;
68+
}
69+
return !isRead;
70+
}
71+
);
72+
73+
bool enableShuffleElimination = config->OptShuffleElimination.Get().GetOrElse(config->DefaultEnableShuffleElimination);
74+
auto stageHashType = config->HashShuffleFuncType.Get().GetOrElse(config->DefaultHashShuffleFuncType);
75+
if (isRead && enableShuffleElimination) {
76+
stageHashType = config->ColumnShardHashShuffleFuncType.Get().GetOrElse(config->DefaultColumnShardHashShuffleFuncType);
77+
} else {
78+
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
79+
auto input = stage.Inputs().Item(i);
80+
if (auto maybeMap = input.Maybe<TDqCnMap>()) {
81+
ui64 inputStageID = maybeMap.Cast().Output().Stage().Ptr()->UniqueId();
82+
if (hashTypeByStageID.contains(inputStageID)) {
83+
stageHashType = hashTypeByStageID[inputStageID];
84+
}
85+
}
86+
}
87+
}
88+
89+
ui64 stageID = stage.Ptr()->UniqueId();
90+
hashTypeByStageID[stageID] = stageHashType;
91+
92+
TNodeOnNodeOwnedMap stagesInputMap;
93+
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
94+
auto input = stage.Inputs().Item(i);
95+
96+
if (auto maybeHashShuffle = input.Maybe<TDqCnHashShuffle>()) {
97+
auto hashShuffle = maybeHashShuffle.Cast();
98+
auto withHashFunc =
99+
Build<TDqCnHashShuffle>(ctx, hashShuffle.Pos())
100+
.InitFrom(maybeHashShuffle.Cast());
101+
102+
if (!hashShuffle.UseSpilling().IsValid()) {
103+
// this is YQL bug when we can't init theу next field by index, if the previous wasn't initialized
104+
withHashFunc
105+
.UseSpilling().Build(false);
106+
}
107+
108+
withHashFunc
109+
.HashFunc()
110+
.Build(ToString(hashTypeByStageID[stageID]));
111+
112+
stagesInputMap.emplace(input.Raw(), withHashFunc.Done().Ptr());
113+
}
114+
}
115+
116+
auto newStage = Build<TDqPhyStage>(ctx, stage.Pos())
117+
.InitFrom(stage)
118+
.Inputs(ctx.ReplaceNodes(ctx.ReplaceNodes(stage.Inputs().Ptr(), stagesInputMap), stagesMap))
119+
.Done();
120+
stagesMap.emplace(stage.Raw(), newStage.Ptr());
121+
newStages.emplace_back(std::move(newStage));
122+
}
123+
124+
return
125+
Build<TKqpPhysicalTx>(ctx, tx.Pos())
126+
.InitFrom(tx)
127+
.Stages()
128+
.Add(newStages)
129+
.Build()
130+
.Results(ctx.ReplaceNodes(tx.Results().Ptr(), stagesMap))
131+
.Done();
132+
}
133+
134+
class TKqpTxHashFuncPropagateTransformer : public TSyncTransformerBase {
135+
public:
136+
TKqpTxHashFuncPropagateTransformer(const TKikimrConfiguration::TPtr& config)
137+
: Config(config)
138+
{}
139+
140+
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
141+
YQL_ENSURE(TExprBase(input).Maybe<TKqpPhysicalTx>());
142+
143+
auto tx = TExprBase(input).Cast<TKqpPhysicalTx>();
144+
auto optimizedTx = PropogateHashFuncToHashShuffles(tx, ctx, Config);
145+
if (!optimizedTx) {
146+
return TStatus::Error;
147+
}
148+
149+
output = optimizedTx.Cast().Ptr();
150+
return IGraphTransformer::TStatus::Ok;
151+
}
152+
153+
void Rewind() override {}
154+
155+
private:
156+
TKikimrConfiguration::TPtr Config;
157+
};
158+
159+
TAutoPtr<IGraphTransformer> CreateKqpTxHashFuncPropagateTransformer(const TKikimrConfiguration::TPtr& config) {
160+
return THolder<IGraphTransformer>(new TKqpTxHashFuncPropagateTransformer(config));
161+
}
162+
163+
class TKqpTxsHashFuncPropagateTransformer : public TSyncTransformerBase {
164+
public:
165+
TKqpTxsHashFuncPropagateTransformer(
166+
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
167+
TTypeAnnotationContext& typesCtx,
168+
const TKikimrConfiguration::TPtr& config
169+
)
170+
: TypeAnnTransformer(std::move(typeAnnTransformer))
171+
{
172+
TxTransformer =
173+
TTransformationPipeline(&typesCtx)
174+
.AddServiceTransformers()
175+
.Add(*TypeAnnTransformer, "TypeAnnotation")
176+
.AddPostTypeAnnotation(/* forSubgraph */ true)
177+
.Add(CreateKqpTxHashFuncPropagateTransformer(config), "Peephole")
178+
.Build(false);
179+
}
180+
181+
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
182+
if (!TKqpPhysicalQuery::Match(input.Get())) {
183+
return TStatus::Error;
184+
}
185+
186+
TKqpPhysicalQuery query(input);
187+
188+
TVector<TKqpPhysicalTx> txs;
189+
txs.reserve(query.Transactions().Size());
190+
for (const auto& tx : query.Transactions()) {
191+
auto expr = TransformTx(tx, ctx);
192+
txs.push_back(expr.Cast());
193+
}
194+
195+
auto phyQuery = Build<TKqpPhysicalQuery>(ctx, query.Pos())
196+
.Transactions()
197+
.Add(txs)
198+
.Build()
199+
.Results(query.Results())
200+
.Settings(query.Settings())
201+
.Done();
202+
203+
output = phyQuery.Ptr();
204+
return TStatus::Ok;
205+
}
206+
207+
void Rewind() final {
208+
TxTransformer->Rewind();
209+
}
210+
211+
private:
212+
TMaybeNode<TKqpPhysicalTx> TransformTx(const TKqpPhysicalTx& tx, TExprContext& ctx) {
213+
TxTransformer->Rewind();
214+
215+
auto expr = tx.Ptr();
216+
217+
while (true) {
218+
auto status = InstantTransform(*TxTransformer, expr, ctx);
219+
if (status == TStatus::Error) {
220+
return {};
221+
}
222+
if (status == TStatus::Ok) {
223+
break;
224+
}
225+
}
226+
return TKqpPhysicalTx(expr);
227+
}
228+
229+
TAutoPtr<IGraphTransformer> TxTransformer;
230+
TAutoPtr<NYql::IGraphTransformer> TypeAnnTransformer;
231+
};
232+
233+
TAutoPtr<IGraphTransformer> NKikimr::NKqp::CreateKqpTxsHashFuncPropagateTransformer(
234+
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
235+
TTypeAnnotationContext& typesCtx,
236+
const TKikimrConfiguration::TPtr& config
237+
) {
238+
return THolder<IGraphTransformer>(new TKqpTxsHashFuncPropagateTransformer(typeAnnTransformer, typesCtx, config));
239+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#pragma once
2+
3+
#include "kqp_opt.h"
4+
5+
#include <ydb/core/kqp/common/kqp_yql.h>
6+
#include <yql/essentials/ast/yql_expr.h>
7+
#include <yql/essentials/core/yql_graph_transformer.h>
8+
#include <yql/essentials/core/yql_expr_optimize.h>
9+
#include <yql/essentials/core/yql_expr_type_annotation.h>
10+
#include <yql/essentials/core/yql_opt_utils.h>
11+
12+
13+
namespace NKikimr {
14+
namespace NKqp {
15+
16+
using namespace NYql;
17+
using namespace NYql::NNodes;
18+
using namespace NOpt;
19+
20+
TAutoPtr<IGraphTransformer> CreateKqpTxsHashFuncPropagateTransformer(
21+
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
22+
TTypeAnnotationContext& typesCtx,
23+
const TKikimrConfiguration::TPtr& config
24+
);
25+
26+
}
27+
}

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,13 @@ class TxPlanSerializer {
507507
keyColumns.AppendValue(TString(column.Value()));
508508
}
509509
}
510+
511+
auto& hashFunc = planNode.NodeInfo["HashFunc"];
512+
if (hashShuffle.HashFunc().IsValid()) {
513+
hashFunc = hashShuffle.HashFunc().Cast().StringValue();
514+
} else {
515+
hashFunc = "HashV1";
516+
}
510517
} else if (auto merge = connection.Maybe<TDqCnMerge>()) {
511518
planNode.TypeName = "Merge";
512519
auto& sortColumns = planNode.NodeInfo["SortColumns"];
@@ -2237,7 +2244,13 @@ struct TQueryPlanReconstructor {
22372244
result["Node Type"] = plan.GetMapSafe().at("Node Type").GetStringSafe();
22382245

22392246
if (plan.GetMapSafe().at("Node Type") == "HashShuffle") {
2240-
result["Node Type"] = TStringBuilder{} << "HashShuffle (KeyColumns: " << plan.GetMapSafe().at("KeyColumns") << ")";
2247+
TStringBuilder stringBuilder;
2248+
stringBuilder << "HashShuffle (" <<
2249+
"KeyColumns: " << plan.GetMapSafe().at("KeyColumns") << ", " <<
2250+
"HashFunc: " << plan.GetMapSafe().at("HashFunc")
2251+
<< ")";
2252+
2253+
result["Node Type"] = stringBuilder;
22412254
}
22422255

22432256
if (plan.GetMapSafe().contains("CTE Name")) {

ydb/core/kqp/opt/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ SRCS(
1414
kqp_statistics_transformer.cpp
1515
kqp_column_statistics_requester.cpp
1616
kqp_constant_folding_transformer.cpp
17+
kqp_opt_hash_func_propagate_transformer.cpp
1718
)
1819

1920
PEERDIR(

0 commit comments

Comments
 (0)