Skip to content

Commit ee21311

Browse files
Use GraceJoinCore instead of MapJoinCore (#11537)
Co-authored-by: Pavel Velikhov <pavelvelikhov@ydb.tech>
1 parent 0d9831e commit ee21311

File tree

96 files changed

+2128
-884
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+2128
-884
lines changed

ydb/core/kqp/kqp_default_settings.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,8 @@ DefaultSettings {
7272
Name: "_KqpDisableLlvmForUdfStages"
7373
Value: "false"
7474
}
75+
76+
DefaultSettings {
77+
Name: "UseGraceJoinCoreForMap"
78+
Value: "true"
79+
}

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,7 +1525,13 @@ class TxPlanSerializer {
15251525
}
15261526

15271527
std::variant<ui32, TArgContext> Visit(const TCoFlatMapBase& flatMap, const TCoGraceJoinCore& join, TQueryPlanNode& planNode) {
1528-
const auto name = TStringBuilder() << join.JoinKind().Value() << "Join (Grace)";
1528+
auto joinAlgo = "(Grace)";
1529+
for (size_t i=0; i<join.Flags().Size(); i++) {
1530+
if (join.Flags().Item(i).StringValue() == "Broadcast") {
1531+
joinAlgo = "(MapJoin)";
1532+
}
1533+
}
1534+
const auto name = TStringBuilder() << join.JoinKind().Value() << "Join " << joinAlgo;
15291535

15301536
TOperator op;
15311537
op.Properties["Name"] = name;
@@ -1541,7 +1547,13 @@ class TxPlanSerializer {
15411547
}
15421548

15431549
std::variant<ui32, TArgContext> Visit(const TCoGraceJoinCore& join, TQueryPlanNode& planNode) {
1544-
const auto name = TStringBuilder() << join.JoinKind().Value() << "Join (Grace)";
1550+
auto joinAlgo = "(Grace)";
1551+
for (size_t i=0; i<join.Flags().Size(); i++) {
1552+
if (join.Flags().Item(i).StringValue() == "Broadcast") {
1553+
joinAlgo = "(MapJoin)";
1554+
}
1555+
}
1556+
const auto name = TStringBuilder() << join.JoinKind().Value() << "Join " << joinAlgo;
15451557

15461558
TOperator op;
15471559
op.Properties["Name"] = name;

ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
9393
{
9494
#define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name)
9595
AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate));
96-
AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoin));
96+
AddHandler(0, &TDqPhyGraceJoin::Match, HNDL(RewriteMapJoinWithGraceCore));
97+
AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoinWithMapCore));
9798
AddHandler(0, &TDqPhyCrossJoin::Match, HNDL(RewriteCrossJoin));
9899
AddHandler(0, &TDqPhyJoinDict::Match, HNDL(RewriteDictJoin));
99100
AddHandler(0, &TDqJoin::Match, HNDL(RewritePureJoin));
@@ -110,9 +111,15 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
110111
return output;
111112
}
112113

113-
TMaybeNode<TExprBase> RewriteMapJoin(TExprBase node, TExprContext& ctx) {
114-
TExprBase output = DqPeepholeRewriteMapJoin(node, ctx);
115-
DumpAppliedRule("RewriteMapJoin", node.Ptr(), output.Ptr(), ctx);
114+
TMaybeNode<TExprBase> RewriteMapJoinWithGraceCore(TExprBase node, TExprContext& ctx) {
115+
TExprBase output = DqPeepholeRewriteMapJoinWithGraceCore(node, ctx);
116+
DumpAppliedRule("RewriteMapJoinWithGraceCore", node.Ptr(), output.Ptr(), ctx);
117+
return output;
118+
}
119+
120+
TMaybeNode<TExprBase> RewriteMapJoinWithMapCore(TExprBase node, TExprContext& ctx) {
121+
TExprBase output = DqPeepholeRewriteMapJoinWithMapCore(node, ctx);
122+
DumpAppliedRule("RewriteMapJoinWithMapCore", node.Ptr(), output.Ptr(), ctx);
116123
return output;
117124
}
118125

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
422422
// It is now possible as we don't use datashard transactions for reads in data queries.
423423
bool pushLeftStage = (KqpCtx.IsScanQuery() || KqpCtx.Config->EnableKqpDataQueryStreamLookup) && AllowFuseJoinInputs(node);
424424
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
425-
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false
425+
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false)
426426
);
427427
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
428428
return output;

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,11 @@ static void FillPlan(const NYdb::NTable::TScanQueryPart& streamPart, TCollectedS
902902
if (!plan.empty()) {
903903
res.PlanJson = plan;
904904
}
905+
906+
auto ast = res.QueryStats->query_ast();
907+
if (!ast.empty()) {
908+
res.Ast = ast;
909+
}
905910
}
906911
}
907912

@@ -913,6 +918,11 @@ static void FillPlan(const NYdb::NScripting::TYqlResultPart& streamPart, TCollec
913918
if (!plan.empty()) {
914919
res.PlanJson = plan;
915920
}
921+
922+
auto ast = res.QueryStats->query_ast();
923+
if (!ast.empty()) {
924+
res.Ast = ast;
925+
}
916926
}
917927
}
918928

@@ -924,6 +934,11 @@ static void FillPlan(const NYdb::NQuery::TExecuteQueryPart& streamPart, TCollect
924934
if (!plan.empty()) {
925935
res.PlanJson = plan;
926936
}
937+
938+
auto ast = res.QueryStats->query_ast();
939+
if (!ast.empty()) {
940+
res.Ast = ast;
941+
}
927942
}
928943
}
929944

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ inline TKikimrRunner DefaultKikimrRunner(TVector<NKikimrKqp::TKqpSetting> kqpSet
217217
struct TCollectedStreamResult {
218218
TString ResultSetYson;
219219
TMaybe<TString> PlanJson;
220+
TMaybe<TString> Ast;
220221
TMaybe<Ydb::TableStats::QueryStats> QueryStats;
221222
ui64 RowsCount = 0;
222223
ui64 ConsumedRuFromHeader = 0;

ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ Y_UNIT_TEST_TWIN(JoinWithSubquery, StreamLookup) {
224224
ON l.Fk = r.Key
225225
);
226226
SELECT j.lValue AS Value FROM $join AS j INNER JOIN `/Root/Kv` AS kv
227-
ON j.lKey = kv.Key;
227+
ON j.lKey = kv.Key
228+
ORDER BY Value;
228229
)";
229230

230231
NKikimrConfig::TAppConfig appConfig;

ydb/core/kqp/ut/join/kqp_join_order_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,4 +623,4 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
623623

624624
}
625625
}
626-
}
626+
}

ydb/core/kqp/ut/join/kqp_join_ut.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
692692
DECLARE $in AS List<Struct<v: String?>>;
693693
SELECT *
694694
FROM AS_TABLE($in) AS k RIGHT SEMI JOIN `/Root/RSJ_SimpleKey_1` AS t ON k.v = t.Value
695+
ORDER BY Key
695696
)");
696697

697698
auto params = TParamsBuilder().AddParam("$in").BeginList()

ydb/core/kqp/ut/query/kqp_explain_ut.cpp

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
7979
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
8080
UNIT_ASSERT(res.PlanJson);
8181

82-
Cerr << *res.PlanJson;
82+
Cerr << *res.PlanJson << Endl;
8383

8484
NJson::TJsonValue plan;
8585
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
8686
UNIT_ASSERT(ValidatePlanNodeIds(plan));
8787

88-
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
88+
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan-Filter");
8989
UNIT_ASSERT(join.IsDefined());
9090
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
9191
UNIT_ASSERT(left.IsDefined());
@@ -106,13 +106,14 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
106106
auto res = CollectStreamResult(it);
107107
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
108108
UNIT_ASSERT(res.PlanJson);
109-
Cerr << *res.PlanJson;
109+
110+
Cerr << *res.PlanJson << Endl;
110111

111112
NJson::TJsonValue plan;
112113
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
113114
UNIT_ASSERT(ValidatePlanNodeIds(plan));
114115

115-
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
116+
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan-Filter");
116117
UNIT_ASSERT(join.IsDefined());
117118
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
118119
UNIT_ASSERT(left.IsDefined());
@@ -193,7 +194,8 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
193194
auto res = CollectStreamResult(it);
194195
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
195196
UNIT_ASSERT(res.PlanJson);
196-
Cerr << *res.PlanJson;
197+
198+
Cout << *res.PlanJson << Endl;
197199

198200
NJson::TJsonValue plan;
199201
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
@@ -202,7 +204,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
202204
auto join = FindPlanNodeByKv(
203205
plan,
204206
"Node Type",
205-
"Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan"
207+
"Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan-Filter"
206208
);
207209

208210
UNIT_ASSERT(join.IsDefined());
@@ -365,9 +367,9 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
365367
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
366368
UNIT_ASSERT(ValidatePlanNodeIds(plan));
367369

368-
auto join1 = FindPlanNodeByKv(plan, "Node Type", "Sort-InnerJoin (MapJoin)-Filter");
370+
auto join1 = FindPlanNodeByKv(plan, "Node Type", "Sort-InnerJoin (MapJoin)-Filter-Filter");
369371
UNIT_ASSERT(join1.IsDefined());
370-
auto join2 = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter");
372+
auto join2 = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-Filter");
371373
UNIT_ASSERT(join2.IsDefined());
372374
}
373375

@@ -907,6 +909,9 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
907909
)").ExtractValueSync();
908910
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
909911

912+
Cerr << result.GetPlan() << Endl;
913+
Cout << result.GetPlan() << Endl;
914+
910915
NJson::TJsonValue plan;
911916
NJson::ReadJsonTree(result.GetPlan(), &plan, true);
912917

@@ -919,8 +924,8 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
919924

920925
auto cteLink1 = FindPlanNodeByKv(
921926
plan,
922-
"CTE Name",
923-
"precompute_1_0"
927+
"Subplan Name",
928+
"CTE precompute_1_0"
924929
);
925930

926931
UNIT_ASSERT(cteLink1.IsDefined());

0 commit comments

Comments
 (0)