Skip to content

Commit 281f33b

Browse files
authored
[KQP] Add ShuffleEliminated flag (#20298)
2 parents 2f7ba77 + b8fb3cd commit 281f33b

File tree

6 files changed

+32
-87
lines changed

6 files changed

+32
-87
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,6 @@ ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart
2929
ydb/core/kqp/ut/olap [*/*] chunk chunk
3030
ydb/core/kqp/ut/join KqpIndexLookupJoin.LeftJoinRightNullFilter+StreamLookup
3131
ydb/core/kqp/ut/join KqpIndexLookupJoin.LeftJoinRightNullFilter-StreamLookup
32-
ydb/core/kqp/ut/join KqpJoinOrder.CanonizedJoinOrderLookupBug
33-
ydb/core/kqp/ut/join KqpJoinOrder.FiveWayJoinStatsOverride-StreamLookupJoin+ColumnStore
34-
ydb/core/kqp/ut/join KqpJoinOrder.TPCDS16-StreamLookupJoin+ColumnStore
35-
ydb/core/kqp/ut/join KqpJoinOrder.TPCDS34-StreamLookupJoin+ColumnStore
36-
ydb/core/kqp/ut/join KqpJoinOrder.TPCDS61-StreamLookupJoin+ColumnStore
37-
ydb/core/kqp/ut/join KqpJoinOrder.TPCDS87-StreamLookupJoin+ColumnStore
38-
ydb/core/kqp/ut/join KqpJoinOrder.TPCDS94-StreamLookupJoin+ColumnStore
39-
ydb/core/kqp/ut/join KqpJoinOrder.TPCDS95-StreamLookupJoin+ColumnStore
40-
ydb/core/kqp/ut/join KqpJoinOrder.TPCH10-StreamLookupJoin+ColumnStore
41-
ydb/core/kqp/ut/join KqpJoinOrder.TPCH11-StreamLookupJoin+ColumnStore
42-
ydb/core/kqp/ut/join KqpJoinOrder.TPCH21-StreamLookupJoin+ColumnStore
43-
ydb/core/kqp/ut/join KqpJoinOrder.TPCH3-StreamLookupJoin+ColumnStore
44-
ydb/core/kqp/ut/join KqpJoinOrder.TPCH5-StreamLookupJoin+ColumnStore
45-
ydb/core/kqp/ut/join KqpJoinOrder.TPCH8-StreamLookupJoin+ColumnStore
46-
ydb/core/kqp/ut/join KqpJoinOrder.TestJoinOrderHintsComplex-StreamLookupJoin+ColumnStore
47-
ydb/core/kqp/ut/join KqpJoinOrder.ShuffleEliminationManyKeysJoinPredicate
48-
ydb/core/kqp/ut/join KqpJoinOrder.ShuffleEliminationOneJoin
49-
ydb/core/kqp/ut/join KqpJoinOrder.ShuffleEliminationReuseShuffleTwoJoins
50-
ydb/core/kqp/ut/join KqpJoinOrder.TPCHRandomJoinViewJustWorks+ColumnStore
5132
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
5233
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
5334
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
@@ -133,14 +114,6 @@ ydb/tests/functional/tenants test_tenants.py.TestTenants.test_stop_start[enable_
133114
ydb/tests/functional/tenants test_tenants.py.TestTenants.test_stop_start[enable_alter_database_create_hive_first--true]
134115
ydb/tests/functional/tpc/large [test_tpcds.py] chunk chunk
135116
ydb/tests/functional/tpc/large sole chunk chunk
136-
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[10]
137-
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[11]
138-
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[12]
139-
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[36]
140-
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[67]
141-
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[86]
142-
ydb/tests/functional/tpc/large test_tpcds.py.TestTpcdsS1.test_tpcds[9]
143-
ydb/tests/functional/tpc/medium test_tpch.py.TestTpchS1.test_tpch[13]
144117
ydb/tests/olap/scenario sole chunk chunk
145118
ydb/tests/olap/scenario test_alter_compression.py.TestAlterCompression.test[alter_compression]
146119
ydb/tests/olap/scenario test_alter_tiering.py.TestAlterTiering.test[many_tables]

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
214214

215215
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
216216
YQL_ENSURE(args.ComputesByStages);
217-
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
217+
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta);
218218
IActor* computeActor = CreateKqpScanComputeActor(
219219
args.ExecuterId, args.TxId,
220220
args.Task, AsyncIoFactory, runtimeSettings, memoryLimits,

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ class TComputeStageInfo {
4444
return true;
4545
}
4646

47-
TMetaScan& MergeMetaReads(const NYql::NDqProto::TDqTask& task, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const bool forceOneToMany) {
47+
TMetaScan& MergeMetaReads(const NYql::NDqProto::TDqTask& task, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) {
4848
YQL_ENSURE(meta.ReadsSize(), "unexpected merge with no reads");
49-
if (forceOneToMany || !task.HasMetaId()) {
49+
if (!task.HasMetaId()) {
5050
MetaInfo.emplace_back(TMetaScan(meta));
5151
return MetaInfo.back();
5252
} else {
@@ -85,12 +85,12 @@ class TComputeStagesWithScan {
8585
}
8686
}
8787

88-
TMetaScan& UpsertTaskWithScan(const NYql::NDqProto::TDqTask& dqTask, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const bool forceOneToMany) {
88+
TMetaScan& UpsertTaskWithScan(const NYql::NDqProto::TDqTask& dqTask, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) {
8989
auto it = Stages.find(dqTask.GetStageId());
9090
if (it == Stages.end()) {
9191
it = Stages.emplace(dqTask.GetStageId(), TComputeStageInfo()).first;
9292
}
93-
return it->second.MergeMetaReads(dqTask, meta, forceOneToMany);
93+
return it->second.MergeMetaReads(dqTask, meta);
9494
}
9595
};
9696

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,7 +1741,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
17411741
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
17421742

17431743
auto& columnShardHashV1Params = stageInfo.Meta.ColumnShardHashV1Params;
1744-
if (enableShuffleElimination && stage.GetIsShuffleEliminated() && stageInfo.Meta.ColumnTableInfoPtr) {
1744+
bool shuffleEliminated = enableShuffleElimination && stage.GetIsShuffleEliminated();
1745+
if (shuffleEliminated && stageInfo.Meta.ColumnTableInfoPtr) {
17451746
const auto& tableDesc = stageInfo.Meta.ColumnTableInfoPtr->Description;
17461747
columnShardHashV1Params.SourceShardCount = tableDesc.GetColumnShardCount();
17471748
columnShardHashV1Params.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
@@ -1787,7 +1788,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17871788
}
17881789
}
17891790

1790-
if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() || (!isOlapScan && readSettings.IsSorted())) {
1791+
if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() && !shuffleEliminated || (!isOlapScan && readSettings.IsSorted())) {
17911792
for (auto&& pair : nodeShards) {
17921793
auto& shardsInfo = pair.second;
17931794
for (auto&& shardInfo : shardsInfo) {
@@ -1806,7 +1807,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
18061807
}
18071808
}
18081809

1809-
} else if (enableShuffleElimination && stage.GetIsShuffleEliminated() /* save partitioning for shuffle elimination */) {
1810+
} else if (shuffleEliminated /* save partitioning for shuffle elimination */) {
18101811
std::size_t stageInternalTaskId = 0;
18111812
columnShardHashV1Params.TaskIndexByHash = std::make_shared<TVector<ui64>>();
18121813
columnShardHashV1Params.TaskIndexByHash->resize(columnShardHashV1Params.SourceShardCount);

ydb/core/kqp/ut/join/data/join_order/lookupbug.json

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,34 @@
11
{
2-
"op_name":"Union",
2+
"op_name":"LeftJoin (MapJoin)",
33
"args":
44
[
55
{
6-
"op_name":"InnerJoin (MapJoin)",
6+
"op_name":"LeftJoin (MapJoin)",
77
"args":
88
[
99
{
10-
"op_name":"TableRangeScan",
11-
"table":"quota"
12-
},
13-
{
14-
"op_name":"InnerJoin (MapJoin)",
10+
"op_name":"LeftJoin (MapJoin)",
1511
"args":
1612
[
17-
{
18-
"op_name":"LeftJoin (MapJoin)",
19-
"args":
20-
[
21-
{
22-
"op_name":"TableRangeScan",
23-
"table":"browsers"
24-
},
25-
{
26-
"op_name":"TableLookup",
27-
"table":"browser_groups"
28-
}
29-
]
30-
},
3113
{
3214
"op_name":"TableFullScan",
3315
"table":"quotas_browsers_relation"
34-
}
35-
]
36-
}
37-
]
38-
},
39-
{
40-
"op_name":"LeftOnlyJoin (MapJoin)",
41-
"args":
42-
[
43-
{
44-
"op_name":"LeftJoin (MapJoin)",
45-
"args":
46-
[
47-
{
48-
"op_name":"InnerJoin (MapJoin)",
49-
"args":
50-
[
51-
{
52-
"op_name":"TableRangeScan",
53-
"table":"quotas_browsers_relation"
54-
},
55-
{
56-
"op_name":"TableRangeScan",
57-
"table":"browsers"
58-
}
59-
]
6016
},
6117
{
62-
"op_name":"TableRangeScan",
63-
"table":"browser_groups"
18+
"op_name":"TableLookup",
19+
"table":"browsers"
6420
}
6521
]
6622
},
6723
{
68-
"op_name":"TableRangeScan",
69-
"table":"quota"
24+
"op_name":"TableLookup",
25+
"table":"browser_groups"
7026
}
7127
]
28+
},
29+
{
30+
"op_name":"TableFullScan",
31+
"table":"quota"
7232
}
7333
]
7434
}

ydb/core/kqp/ut/join/kqp_join_order_ut.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,17 @@ static void CreateSampleTable(NYdb::NQuery::TSession session, bool useColumnStor
9292
CreateView(session, "view/tpch_random_join_view.sql");
9393
}
9494

95-
static TKikimrRunner GetKikimrWithJoinSettings(bool useStreamLookupJoin = false, TString stats = "", bool useCBO = true){
95+
struct TExecuteParams {
96+
bool RemoveLimitOperator = false;
97+
bool EnableSeparationComputeActorsFromRead = true;
98+
};
99+
100+
static TKikimrRunner GetKikimrWithJoinSettings(
101+
bool useStreamLookupJoin = false,
102+
TString stats = "",
103+
bool useCBO = true,
104+
const TExecuteParams& params = {}
105+
){
96106
TVector<NKikimrKqp::TKqpSetting> settings;
97107

98108
NKikimrKqp::TKqpSetting setting;
@@ -116,6 +126,7 @@ static TKikimrRunner GetKikimrWithJoinSettings(bool useStreamLookupJoin = false,
116126
appConfig.MutableTableServiceConfig()->SetDefaultEnableShuffleElimination(true);
117127

118128
auto serverSettings = TKikimrSettings().SetAppConfig(appConfig);
129+
serverSettings.FeatureFlags.SetEnableSeparationComputeActorsFromRead(params.EnableSeparationComputeActorsFromRead);
119130
serverSettings.SetKqpSettings(settings);
120131
return TKikimrRunner(serverSettings);
121132
}

0 commit comments

Comments
 (0)