Skip to content

Commit 12086fb

Browse files
pashandor789Pavel Ivanov
authored andcommitted
fix
1 parent c361dda commit 12086fb

File tree

4 files changed

+43
-9
lines changed

4 files changed

+43
-9
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
211211

212212
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
213213
YQL_ENSURE(args.ComputesByStages);
214-
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
214+
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta);
215215
IActor* computeActor = CreateKqpScanComputeActor(
216216
args.ExecuterId, args.TxId,
217217
args.Task, AsyncIoFactory, runtimeSettings, memoryLimits,

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ class TComputeStageInfo {
4444
return true;
4545
}
4646

47-
TMetaScan& MergeMetaReads(const NYql::NDqProto::TDqTask& task, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const bool forceOneToMany) {
47+
TMetaScan& MergeMetaReads(const NYql::NDqProto::TDqTask& task, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) {
4848
YQL_ENSURE(meta.ReadsSize(), "unexpected merge with no reads");
49-
if (forceOneToMany || !task.HasMetaId()) {
49+
if (!task.HasMetaId()) {
5050
MetaInfo.emplace_back(TMetaScan(meta));
5151
return MetaInfo.back();
5252
} else {
@@ -85,12 +85,12 @@ class TComputeStagesWithScan {
8585
}
8686
}
8787

88-
TMetaScan& UpsertTaskWithScan(const NYql::NDqProto::TDqTask& dqTask, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const bool forceOneToMany) {
88+
TMetaScan& UpsertTaskWithScan(const NYql::NDqProto::TDqTask& dqTask, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) {
8989
auto it = Stages.find(dqTask.GetStageId());
9090
if (it == Stages.end()) {
9191
it = Stages.emplace(dqTask.GetStageId(), TComputeStageInfo()).first;
9292
}
93-
return it->second.MergeMetaReads(dqTask, meta, forceOneToMany);
93+
return it->second.MergeMetaReads(dqTask, meta);
9494
}
9595
};
9696

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1733,7 +1733,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
17331733
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
17341734

17351735
auto& columnShardHashV1Params = stageInfo.Meta.ColumnShardHashV1Params;
1736-
if (enableShuffleElimination && stage.GetIsShuffleEliminated() && stageInfo.Meta.ColumnTableInfoPtr) {
1736+
bool shuffleEliminated = enableShuffleElimination && stage.GetIsShuffleEliminated();
1737+
if (shuffleEliminated && stageInfo.Meta.ColumnTableInfoPtr) {
17371738
const auto& tableDesc = stageInfo.Meta.ColumnTableInfoPtr->Description;
17381739
columnShardHashV1Params.SourceShardCount = tableDesc.GetColumnShardCount();
17391740
columnShardHashV1Params.SourceTableKeyColumnTypes = std::make_shared<TVector<NScheme::TTypeInfo>>();
@@ -1779,7 +1780,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17791780
}
17801781
}
17811782

1782-
if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() || (!isOlapScan && readSettings.IsSorted())) {
1783+
if (!AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead() && !shuffleEliminated || (!isOlapScan && readSettings.IsSorted())) {
17831784
for (auto&& pair : nodeShards) {
17841785
auto& shardsInfo = pair.second;
17851786
for (auto&& shardInfo : shardsInfo) {
@@ -1798,7 +1799,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
17981799
}
17991800
}
18001801

1801-
} else if (enableShuffleElimination && stage.GetIsShuffleEliminated() /* save partitioning for shuffle elimination */) {
1802+
} else if (shuffleEliminated /* save partitioning for shuffle elimination */) {
18021803
std::size_t stageInternalTaskId = 0;
18031804
columnShardHashV1Params.TaskIndexByHash = std::make_shared<TVector<ui64>>();
18041805
columnShardHashV1Params.TaskIndexByHash->resize(columnShardHashV1Params.SourceShardCount);

ydb/core/kqp/ut/join/kqp_join_order_ut.cpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,17 @@ static void CreateSampleTable(NYdb::NQuery::TSession session, bool useColumnStor
9292
CreateView(session, "view/tpch_random_join_view.sql");
9393
}
9494

95-
static TKikimrRunner GetKikimrWithJoinSettings(bool useStreamLookupJoin = false, TString stats = "", bool useCBO = true){
95+
struct TExecuteParams {
96+
bool RemoveLimitOperator = false;
97+
bool EnableSeparationComputeActorsFromRead = true;
98+
};
99+
100+
static TKikimrRunner GetKikimrWithJoinSettings(
101+
bool useStreamLookupJoin = false,
102+
TString stats = "",
103+
bool useCBO = true,
104+
const TExecuteParams& params = {}
105+
){
96106
TVector<NKikimrKqp::TKqpSetting> settings;
97107

98108
NKikimrKqp::TKqpSetting setting;
@@ -116,6 +126,7 @@ static TKikimrRunner GetKikimrWithJoinSettings(bool useStreamLookupJoin = false,
116126
appConfig.MutableTableServiceConfig()->SetDefaultEnableShuffleElimination(true);
117127

118128
auto serverSettings = TKikimrSettings().SetAppConfig(appConfig);
129+
serverSettings.FeatureFlags.SetEnableSeparationComputeActorsFromRead(params.EnableSeparationComputeActorsFromRead);
119130
serverSettings.SetKqpSettings(settings);
120131
return TKikimrRunner(serverSettings);
121132
}
@@ -365,8 +376,20 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
365376
TChainTester(65).Test();
366377
}
367378

379+
<<<<<<< HEAD
368380
TString ExecuteJoinOrderTestGenericQueryWithStats(const TString& queryPath, const TString& statsPath, bool useStreamLookupJoin, bool useColumnStore, bool useCBO = true) {
369381
auto kikimr = GetKikimrWithJoinSettings(useStreamLookupJoin, GetStatic(statsPath), useCBO);
382+
=======
383+
std::pair<TString, std::vector<NYdb::TResultSet>> ExecuteJoinOrderTestGenericQueryWithStats(
384+
const TString& queryPath,
385+
const TString& statsPath,
386+
bool useStreamLookupJoin,
387+
bool useColumnStore,
388+
bool useCBO = true,
389+
const TExecuteParams& params = {}
390+
) {
391+
auto kikimr = GetKikimrWithJoinSettings(useStreamLookupJoin, GetStatic(statsPath), useCBO, params);
392+
>>>>>>> 62f01135b35 ([KQP] Add ShuffleEliminated flag (#20175))
370393
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableViews(true);
371394
auto db = kikimr.GetQueryClient();
372395
auto session = db.GetSession().GetValueSync().GetSession();
@@ -692,17 +715,27 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
692715
TVector<TString> RequestedLabels;
693716
};
694717

718+
<<<<<<< HEAD
695719
Y_UNIT_TEST(ShuffleEliminationOneJoin) {
696720
auto plan = ExecuteJoinOrderTestGenericQueryWithStats("queries/shuffle_elimination_one_join.sql", "stats/tpch1000s.json", false, true, true);
721+
=======
722+
Y_UNIT_TEST_TWIN(ShuffleEliminationOneJoin, EnableSeparationComputeActorsFromRead) {
723+
auto [plan, _] = ExecuteJoinOrderTestGenericQueryWithStats("queries/shuffle_elimination_one_join.sql", "stats/tpch1000s.json", false, true, true, {.EnableSeparationComputeActorsFromRead = EnableSeparationComputeActorsFromRead});
724+
>>>>>>> 62f01135b35 ([KQP] Add ShuffleEliminated flag (#20175))
697725
auto joinFinder = TFindJoinWithLabels(plan);
698726
auto join = joinFinder.Find({"customer", "orders"});
699727
UNIT_ASSERT_C(join.Join == "InnerJoin (Grace)", join.Join);
700728
UNIT_ASSERT(!join.LhsShuffled);
701729
UNIT_ASSERT(join.RhsShuffled);
702730
}
703731

732+
<<<<<<< HEAD
704733
Y_UNIT_TEST(ShuffleEliminationReuseShuffleTwoJoins) {
705734
auto plan = ExecuteJoinOrderTestGenericQueryWithStats("queries/shuffle_elimination_reuse_shuffle_two_joins.sql", "stats/tpch1000s.json", false, true, true);
735+
=======
736+
Y_UNIT_TEST_TWIN(ShuffleEliminationReuseShuffleTwoJoins, EnableSeparationComputeActorsFromRead) {
737+
auto [plan, _] = ExecuteJoinOrderTestGenericQueryWithStats("queries/shuffle_elimination_reuse_shuffle_two_joins.sql", "stats/tpch1000s.json", false, true, true, {.EnableSeparationComputeActorsFromRead = EnableSeparationComputeActorsFromRead});
738+
>>>>>>> 62f01135b35 ([KQP] Add ShuffleEliminated flag (#20175))
706739
auto joinFinder = TFindJoinWithLabels(plan);
707740

708741
{

0 commit comments

Comments
 (0)