Skip to content

Commit 5d0978a

Browse files
authored
Allow multiple join-broadcasts in single stage (#7556) (#7714)
1 parent e098cea commit 5d0978a

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,13 +1269,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
12691269
const auto& input = stage.GetInputs(inputIndex);
12701270

12711271
// Current assumptions:
1272-
// 1. `Broadcast` can not be the 1st stage input unless it's a single input
1273-
// 2. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll`
1274-
if (inputIndex == 0) {
1275-
if (stage.InputsSize() > 1) {
1276-
YQL_ENSURE(input.GetTypeCase() != NKqpProto::TKqpPhyConnection::kBroadcast);
1277-
}
1278-
} else {
1272+
// 1. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll`
1273+
// 2. Stages where 1st input is `Broadcast` are not partitioned.
1274+
if (inputIndex > 0) {
12791275
switch (input.GetTypeCase()) {
12801276
case NKqpProto::TKqpPhyConnection::kBroadcast:
12811277
case NKqpProto::TKqpPhyConnection::kHashShuffle:

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3938,6 +3938,55 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
39383938
AssertTableReads(result, "/Root/SecondaryKeys/Index/indexImplTable", 1);
39393939
}
39403940

3941+
Y_UNIT_TEST(MultipleBroadcastJoin) {
3942+
TKikimrSettings kisettings;
3943+
NKikimrConfig::TAppConfig appConfig;
3944+
appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX);
3945+
kisettings.SetAppConfig(appConfig);
3946+
3947+
TKikimrRunner kikimr(kisettings);
3948+
3949+
auto db = kikimr.GetTableClient();
3950+
auto client = kikimr.GetQueryClient();
3951+
auto session = db.CreateSession().GetValueSync().GetSession();
3952+
3953+
{
3954+
auto session = db.CreateSession().GetValueSync().GetSession();
3955+
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
3956+
--!syntax_v1
3957+
3958+
create table demo_ba(id text, some text, ref1 text, ref2 text, primary key(id));
3959+
create table demo_ref1(id text, code text, some text, primary key(id), index ix_code global on (code));
3960+
create table demo_ref2(id text, code text, some text, primary key(id), index ix_code global on (code));
3961+
)").GetValueSync());
3962+
}
3963+
3964+
auto query = R"(
3965+
select ba_0.id, ba_0.some,
3966+
r_1.id, r_1.some, r_1.code,
3967+
r_2.id, r_2.some, r_2.code
3968+
from demo_ba ba_0
3969+
left join demo_ref1 r_1 on r_1.id=ba_0.ref1
3970+
left join demo_ref2 r_2 on r_2.code=ba_0.ref2
3971+
where ba_0.id in ("ba#10"u,"ba#20"u,"ba#30"u,"ba#40"u,"ba#50"u,"ba#60"u,"ba#70"u,"ba#80"u,"ba#90"u,"ba#100"u);
3972+
)";
3973+
3974+
auto settings = NYdb::NQuery::TExecuteQuerySettings()
3975+
.Syntax(NYdb::NQuery::ESyntax::YqlV1)
3976+
.ConcurrentResultSets(false);
3977+
{
3978+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
3979+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3980+
//CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0)));
3981+
//CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1)));
3982+
}
3983+
{
3984+
auto it = client.StreamExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
3985+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3986+
Cerr << StreamResultToYson(it);
3987+
}
3988+
3989+
}
39413990

39423991
Y_UNIT_TEST_TWIN(ComplexLookupLimit, NewPredicateExtract) {
39433992
TKikimrSettings settings;

0 commit comments

Comments
 (0)