Skip to content

Commit 1593a2d

Browse files
authored
Allow use GraceJoinCore for map (Broadcast-et) joins (#7447)
1 parent 142b531 commit 1593a2d

18 files changed

+206
-29
lines changed

ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
9393
{
9494
#define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name)
9595
AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate));
96-
AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoin));
96+
AddHandler(0, &TDqPhyGraceJoin::Match, HNDL(RewriteMapJoinWithGraceCore));
97+
AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoinWithMapCore));
9798
AddHandler(0, &TDqPhyCrossJoin::Match, HNDL(RewriteCrossJoin));
9899
AddHandler(0, &TDqPhyJoinDict::Match, HNDL(RewriteDictJoin));
99100
AddHandler(0, &TDqJoin::Match, HNDL(RewritePureJoin));
@@ -110,9 +111,15 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
110111
return output;
111112
}
112113

113-
TMaybeNode<TExprBase> RewriteMapJoin(TExprBase node, TExprContext& ctx) {
114-
TExprBase output = DqPeepholeRewriteMapJoin(node, ctx);
115-
DumpAppliedRule("RewriteMapJoin", node.Ptr(), output.Ptr(), ctx);
114+
TMaybeNode<TExprBase> RewriteMapJoinWithGraceCore(TExprBase node, TExprContext& ctx) {
115+
TExprBase output = DqPeepholeRewriteMapJoinWithGraceCore(node, ctx);
116+
DumpAppliedRule("RewriteMapJoinWithGraceCore", node.Ptr(), output.Ptr(), ctx);
117+
return output;
118+
}
119+
120+
TMaybeNode<TExprBase> RewriteMapJoinWithMapCore(TExprBase node, TExprContext& ctx) {
121+
TExprBase output = DqPeepholeRewriteMapJoinWithMapCore(node, ctx);
122+
DumpAppliedRule("RewriteMapJoinWithMapCore", node.Ptr(), output.Ptr(), ctx);
116123
return output;
117124
}
118125

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
421421
// It is now possible as we don't use datashard transactions for reads in data queries.
422422
bool pushLeftStage = (KqpCtx.IsScanQuery() || KqpCtx.Config->EnableKqpDataQueryStreamLookup) && AllowFuseJoinInputs(node);
423423
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
424-
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false
424+
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false)
425425
);
426426
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
427427
return output;

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
8383
REGISTER_SETTING(*this, OptEnableOlapProvideComputeSharding);
8484
REGISTER_SETTING(*this, OverrideStatistics);
8585
REGISTER_SETTING(*this, OverridePlanner);
86-
86+
REGISTER_SETTING(*this, UseGraceJoinCoreForMap);
8787

8888
REGISTER_SETTING(*this, OptUseFinalizeByKey);
8989
REGISTER_SETTING(*this, CostBasedOptimizationLevel);

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct TKikimrSettings {
5353
NCommon::TConfSetting<TString, false> OverrideStatistics;
5454
NCommon::TConfSetting<ui64, false> EnableSpillingNodes;
5555
NCommon::TConfSetting<TString, false> OverridePlanner;
56+
NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap;
5657

5758
/* Disable optimizer rules */
5859
NCommon::TConfSetting<bool, false> OptDisableTopSort;

ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true}
4848
]
4949
},
50+
{
51+
"Name": "TDqPhyGraceJoin",
52+
"Base": "TDqJoinBase",
53+
"Match": {"Type": "Callable", "Name": "DqPhyGraceJoin"}
54+
},
5055
{
5156
"Name": "TDqPhyMapJoin",
5257
"Base": "TDqJoinBase",

ydb/library/yql/dq/opt/dq_opt_join.cpp

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,8 @@ std::pair<TVector<TCoAtom>, TVector<TCoAtom>> GetJoinKeys(const TDqJoin& join, T
323323
}
324324

325325

326-
TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput,
327-
TExprContext& ctx)
326+
TDqJoinBase DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput,
327+
TExprContext& ctx, bool useGraceCore)
328328
{
329329
static const std::set<std::string_view> supportedTypes = {"Inner"sv, "Left"sv, "LeftOnly"sv, "LeftSemi"sv};
330330
auto joinType = join.JoinType().Value();
@@ -349,16 +349,29 @@ TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput,
349349
auto leftFilteredInput = BuildSkipNullKeys(ctx, join.Pos(), leftInput, leftFilterKeys);
350350
auto rightFilteredInput = BuildSkipNullKeys(ctx, join.Pos(), rightInput, rightFilterKeys);
351351

352-
return Build<TDqPhyMapJoin>(ctx, join.Pos())
353-
.LeftInput(leftFilteredInput)
354-
.LeftLabel(join.LeftLabel())
355-
.RightInput(rightFilteredInput)
356-
.RightLabel(join.RightLabel())
357-
.JoinType(join.JoinType())
358-
.JoinKeys(join.JoinKeys())
359-
.LeftJoinKeyNames(join.LeftJoinKeyNames())
360-
.RightJoinKeyNames(join.RightJoinKeyNames())
361-
.Done();
352+
if (useGraceCore) {
353+
return Build<TDqPhyGraceJoin>(ctx, join.Pos())
354+
.LeftInput(leftFilteredInput)
355+
.LeftLabel(join.LeftLabel())
356+
.RightInput(rightFilteredInput)
357+
.RightLabel(join.RightLabel())
358+
.JoinType(join.JoinType())
359+
.JoinKeys(join.JoinKeys())
360+
.LeftJoinKeyNames(join.LeftJoinKeyNames())
361+
.RightJoinKeyNames(join.RightJoinKeyNames())
362+
.Done();
363+
} else {
364+
return Build<TDqPhyMapJoin>(ctx, join.Pos())
365+
.LeftInput(leftFilteredInput)
366+
.LeftLabel(join.LeftLabel())
367+
.RightInput(rightFilteredInput)
368+
.RightLabel(join.RightLabel())
369+
.JoinType(join.JoinType())
370+
.JoinKeys(join.JoinKeys())
371+
.LeftJoinKeyNames(join.LeftJoinKeyNames())
372+
.RightJoinKeyNames(join.RightJoinKeyNames())
373+
.Done();
374+
}
362375
}
363376

364377
} // namespace
@@ -609,7 +622,7 @@ TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const T
609622
.Done();
610623
}
611624

612-
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx) {
625+
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap) {
613626
static const std::set<std::string_view> supportedTypes = {
614627
"Inner"sv,
615628
"Left"sv,
@@ -760,7 +773,7 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&
760773

761774
TMaybeNode<TExprBase> phyJoin;
762775
if (join.JoinType().Value() != "Cross"sv) {
763-
phyJoin = DqMakePhyMapJoin(join, leftInputArg, joinRightInput, ctx);
776+
phyJoin = DqMakePhyMapJoin(join, leftInputArg, joinRightInput, ctx, useGraceCoreForMap);
764777
} else {
765778
YQL_ENSURE(join.JoinKeys().Empty());
766779

ydb/library/yql/dq/opt/dq_opt_join.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode
1616

1717
NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx, int& joinCounter);
1818

19-
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx);
19+
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap);
2020

2121
NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx,
22-
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool shuffleMapJoin = true);
22+
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool shuffleMapJoin = true, bool useGraceCoreForMap = false);
2323

2424
NNodes::TExprBase DqBuildHashJoin(const NNodes::TDqJoin& join, EHashJoinMode mode, TExprContext& ctx, IOptimizationContext& optCtx);
2525

ydb/library/yql/dq/opt/dq_opt_peephole.cpp

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/library/yql/core/yql_opt_utils.h>
55
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
66
#include <ydb/library/yql/core/yql_expr_optimize.h>
7+
#include <ydb/library/yql/core/yql_type_helpers.h>
78

89
#include <ydb/library/yql/utils/log/log.h>
910

@@ -130,8 +131,146 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx
130131
}
131132
return structMembers;
132133
}
134+
135+
TExprNode::TPtr ExpandJoinInput(const TStructExprType& type, TExprNode::TPtr&& arg, TExprContext& ctx) {
136+
return ctx.Builder(arg->Pos())
137+
.Callable("ExpandMap")
138+
.Add(0, std::move(arg))
139+
.Lambda(1)
140+
.Param("item")
141+
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
142+
auto i = 0U;
143+
for (const auto& item : type.GetItems()) {
144+
parent.Callable(i++, "Member")
145+
.Arg(0, "item")
146+
.Atom(1, item->GetName())
147+
.Seal();
148+
}
149+
return parent;
150+
})
151+
.Seal()
152+
.Seal().Build();
153+
}
154+
133155
} // anonymous namespace end
134156

157+
TExprBase DqPeepholeRewriteMapJoinWithGraceCore(const TExprBase& node, TExprContext& ctx) {
158+
if (!node.Maybe<TDqPhyGraceJoin>()) {
159+
return node;
160+
}
161+
const auto graceJoin = node.Cast<TDqPhyGraceJoin>();
162+
const auto pos = graceJoin.Pos();
163+
164+
const TString leftTableLabel(GetTableLabel(graceJoin.LeftLabel()));
165+
const TString rightTableLabel(GetTableLabel(graceJoin.RightLabel()));
166+
167+
auto [leftKeyColumnNodes, rightKeyColumnNodes] = JoinKeysToAtoms(ctx, graceJoin, leftTableLabel, rightTableLabel);
168+
const auto keyWidth = leftKeyColumnNodes.size();
169+
170+
const auto itemTypeLeft = GetSequenceItemType(graceJoin.LeftInput(), false, ctx)->Cast<TStructExprType>();
171+
const auto itemTypeRight = GetSequenceItemType(graceJoin.RightInput(), false, ctx)->Cast<TStructExprType>();
172+
173+
TExprNode::TListType leftRenames, rightRenames;
174+
std::vector<TString> fullColNames;
175+
ui32 outputIndex = 0;
176+
177+
for (auto i = 0u; i < itemTypeLeft->GetSize(); i++) {
178+
TString name(itemTypeLeft->GetItems()[i]->GetName());
179+
if (leftTableLabel) {
180+
name = leftTableLabel + "." + name;
181+
}
182+
fullColNames.push_back(name);
183+
leftRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(i)));
184+
leftRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(outputIndex++)));
185+
}
186+
if (graceJoin.JoinType().Value() != "LeftOnly" && graceJoin.JoinType().Value() != "LeftSemi") {
187+
for (auto i = 0u; i < itemTypeRight->GetSize(); i++) {
188+
TString name(itemTypeRight->GetItems()[i]->GetName());
189+
if (rightTableLabel) {
190+
name = rightTableLabel + "." + name;
191+
}
192+
fullColNames.push_back(name);
193+
rightRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(i)));
194+
rightRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(outputIndex++)));
195+
}
196+
}
197+
198+
TTypeAnnotationNode::TListType keyTypesLeft(keyWidth);
199+
TTypeAnnotationNode::TListType keyTypesRight(keyWidth);
200+
TTypeAnnotationNode::TListType keyTypes(keyWidth);
201+
for (auto i = 0U; i < keyTypes.size(); ++i) {
202+
const auto keyTypeLeft = itemTypeLeft->FindItemType(leftKeyColumnNodes[i]->Content());
203+
const auto keyTypeRight = itemTypeRight->FindItemType(rightKeyColumnNodes[i]->Content());
204+
bool optKey = false;
205+
keyTypes[i] = JoinDryKeyType(keyTypeLeft, keyTypeRight, optKey, ctx);
206+
if (!keyTypes[i]) {
207+
keyTypes.clear();
208+
keyTypesLeft.clear();
209+
keyTypesRight.clear();
210+
break;
211+
}
212+
keyTypesLeft[i] = optKey ? ctx.MakeType<TOptionalExprType>(keyTypes[i]) : keyTypes[i];
213+
keyTypesRight[i] = optKey ? ctx.MakeType<TOptionalExprType>(keyTypes[i]) : keyTypes[i];
214+
}
215+
216+
auto leftInput = ExpandJoinInput(*itemTypeLeft, ctx.NewCallable(graceJoin.LeftInput().Pos(), "ToFlow", {graceJoin.LeftInput().Ptr()}), ctx);
217+
auto rightInput = ExpandJoinInput(*itemTypeRight, ctx.NewCallable(graceJoin.RightInput().Pos(), "ToFlow", {graceJoin.RightInput().Ptr()}), ctx);
218+
YQL_ENSURE(!keyTypes.empty());
219+
220+
for (auto i = 0U; i < leftKeyColumnNodes.size(); i++) {
221+
const auto origName = TString(leftKeyColumnNodes[i]->Content());
222+
auto index = itemTypeLeft->FindItem(origName);
223+
YQL_ENSURE(index);
224+
leftKeyColumnNodes[i] = ctx.NewAtom(leftKeyColumnNodes[i]->Pos(), ctx.GetIndexAsString(*index));
225+
}
226+
for (auto i = 0U; i < rightKeyColumnNodes.size(); i++) {
227+
const auto origName = TString(rightKeyColumnNodes[i]->Content());
228+
auto index = itemTypeRight->FindItem(origName);
229+
YQL_ENSURE(index);
230+
rightKeyColumnNodes[i] = ctx.NewAtom(rightKeyColumnNodes[i]->Pos(), ctx.GetIndexAsString(*index));
231+
}
232+
233+
auto [leftKeyColumnNodesCopy, rightKeyColumnNodesCopy] = JoinKeysToAtoms(ctx, graceJoin, leftTableLabel, rightTableLabel);
234+
235+
auto graceJoinCore = Build<TCoGraceJoinCore>(ctx, pos)
236+
.LeftInput(std::move(leftInput))
237+
.RightInput(std::move(rightInput))
238+
.JoinKind(graceJoin.JoinType())
239+
.LeftKeysColumns(ctx.NewList(pos, std::move(leftKeyColumnNodes)))
240+
.RightKeysColumns(ctx.NewList(pos, std::move(rightKeyColumnNodes)))
241+
.LeftRenames(ctx.NewList(pos, std::move(leftRenames)))
242+
.RightRenames(ctx.NewList(pos, std::move(rightRenames)))
243+
.LeftKeysColumnNames(ctx.NewList(pos, std::move(leftKeyColumnNodesCopy)))
244+
.RightKeysColumnNames(ctx.NewList(pos, std::move(rightKeyColumnNodesCopy)))
245+
.Flags()
246+
.Build()
247+
.Done();
248+
249+
auto graceNode = ctx.Builder(pos)
250+
.Callable("NarrowMap")
251+
.Add(0, graceJoinCore.Ptr())
252+
.Lambda(1)
253+
.Params("output", fullColNames.size())
254+
.Callable("AsStruct")
255+
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
256+
ui32 i = 0U;
257+
for (const auto& colName : fullColNames) {
258+
parent.List(i)
259+
.Atom(0, colName)
260+
.Arg(1, "output", i)
261+
.Seal();
262+
i++;
263+
}
264+
return parent;
265+
})
266+
.Seal()
267+
.Seal()
268+
.Seal()
269+
.Build();
270+
271+
return TExprBase(graceNode);
272+
}
273+
135274
/**
136275
* Rewrites a `KqpMapJoin` to the `MapJoinCore`.
137276
*
@@ -142,10 +281,11 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx
142281
* (rely on the fact that there will be only one element in the `FlatMap`-stream)
143282
* - Align key types using `StrictCast`, use internal columns to store converted left keys
144283
*/
145-
TExprBase DqPeepholeRewriteMapJoin(const TExprBase& node, TExprContext& ctx) {
284+
TExprBase DqPeepholeRewriteMapJoinWithMapCore(const TExprBase& node, TExprContext& ctx) {
146285
if (!node.Maybe<TDqPhyMapJoin>()) {
147286
return node;
148287
}
288+
149289
const auto mapJoin = node.Cast<TDqPhyMapJoin>();
150290
const auto pos = mapJoin.Pos();
151291

ydb/library/yql/dq/opt/dq_opt_peephole.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ namespace NYql::NDq {
1010

1111
NNodes::TExprBase DqPeepholeRewriteCrossJoin(const NNodes::TExprBase& node, TExprContext& ctx);
1212
NNodes::TExprBase DqPeepholeRewriteJoinDict(const NNodes::TExprBase& node, TExprContext& ctx);
13-
NNodes::TExprBase DqPeepholeRewriteMapJoin(const NNodes::TExprBase& node, TExprContext& ctx);
13+
NNodes::TExprBase DqPeepholeRewriteMapJoinWithGraceCore(const NNodes::TExprBase& node, TExprContext& ctx);
14+
NNodes::TExprBase DqPeepholeRewriteMapJoinWithMapCore(const NNodes::TExprBase& node, TExprContext& ctx);
1415
NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExprContext& ctx);
1516
NNodes::TExprBase DqPeepholeRewritePureJoin(const NNodes::TExprBase& node, TExprContext& ctx);
1617
NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExprContext& ctx);

ydb/library/yql/dq/opt/dq_opt_phy.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2610,7 +2610,7 @@ TMaybeNode<TDqJoin> DqFlipJoin(const TDqJoin& join, TExprContext& ctx) {
26102610

26112611

26122612
TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx,
2613-
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool shuffleMapJoin)
2613+
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool shuffleMapJoin, bool useGraceCoreForMap)
26142614
{
26152615
if (!node.Maybe<TDqJoin>()) {
26162616
return node;
@@ -2660,7 +2660,7 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon
26602660
// separate stage to receive data from both sides of join.
26612661
// TODO: We can push MapJoin to existing stage for data query, if it doesn't have table reads. This
26622662
// requires some additional knowledge, probably with use of constraints.
2663-
return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx);
2663+
return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx, useGraceCoreForMap);
26642664
}
26652665

26662666
TExprBase DqPrecomputeToInput(const TExprBase& node, TExprContext& ctx) {

ydb/library/yql/dq/type_ann/dq_type_ann.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,10 @@ THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(TTypeAnnotationCont
10541054
return AnnotateDqJoin(input, ctx);
10551055
}
10561056

1057+
if (TDqPhyGraceJoin::Match(input.Get())) {
1058+
return AnnotateDqMapOrDictJoin(input, ctx);
1059+
}
1060+
10571061
if (TDqPhyMapJoin::Match(input.Get())) {
10581062
return AnnotateDqMapOrDictJoin(input, ctx);
10591063
}

ydb/library/yql/providers/dq/common/yql_dq_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ TDqConfiguration::TDqConfiguration() {
115115
}
116116
return res;
117117
});
118+
REGISTER_SETTING(*this, UseGraceJoinCoreForMap);
118119
}
119120

120121
} // namespace NYql

ydb/library/yql/providers/dq/common/yql_dq_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ struct TDqSettings {
144144

145145
NCommon::TConfSetting<ui64, false> _MaxAttachmentsSize;
146146
NCommon::TConfSetting<bool, false> DisableCheckpoints;
147+
NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap;
147148

148149
// This options will be passed to executor_actor and worker_actor
149150
template <typename TProtoConfig>

ydb/library/yql/providers/dq/opt/dqs_opt.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ namespace NYql::NDqs {
5757
TExprBase node{inputExpr};
5858
PERFORM_RULE(DqPeepholeRewriteCrossJoin, node, ctx);
5959
PERFORM_RULE(DqPeepholeRewriteJoinDict, node, ctx);
60-
PERFORM_RULE(DqPeepholeRewriteMapJoin, node, ctx);
60+
PERFORM_RULE(DqPeepholeRewriteMapJoinWithGraceCore, node, ctx);
61+
PERFORM_RULE(DqPeepholeRewriteMapJoinWithMapCore, node, ctx);
6162
PERFORM_RULE(DqPeepholeRewritePureJoin, node, ctx);
6263
PERFORM_RULE(DqPeepholeRewriteReplicate, node, ctx);
6364
PERFORM_RULE(DqPeepholeDropUnusedInputs, node, ctx);

ydb/library/yql/providers/dq/opt/physical_optimize.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,8 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
255255
const auto join = node.Cast<TDqJoin>();
256256
const TParentsMap* parentsMap = getParents();
257257
const auto mode = Config->HashJoinMode.Get().GetOrElse(EHashJoinMode::Off);
258-
return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */, mode);
258+
const auto useGraceJoin = Config->UseGraceJoinCoreForMap.Get().GetOrElse(false);
259+
return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */, mode, true, useGraceJoin);
259260
}
260261

261262
template <bool IsGlobal>

ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class TDqDataSinkConstraintTransformer : public TVisitorTransformerBase {
4848
AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleReplicate));
4949
AddHandler({
5050
TDqJoin::CallableName(),
51+
TDqPhyGraceJoin::CallableName(),
5152
TDqPhyMapJoin::CallableName(),
5253
TDqPhyCrossJoin::CallableName(),
5354
TDqPhyJoinDict::CallableName(),

0 commit comments

Comments
 (0)