Skip to content

Commit e437bd0

Browse files
authored
Allow multiple join-broadcasts in single stage (#7556)
1 parent 07b9b4e commit e437bd0

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,13 +1319,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
13191319
const auto& input = stage.GetInputs(inputIndex);
13201320

13211321
// Current assumptions:
1322-
// 1. `Broadcast` can not be the 1st stage input unless it's a single input
1323-
// 2. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll`
1324-
if (inputIndex == 0) {
1325-
if (stage.InputsSize() > 1) {
1326-
YQL_ENSURE(input.GetTypeCase() != NKqpProto::TKqpPhyConnection::kBroadcast);
1327-
}
1328-
} else {
1322+
// 1. All stage's inputs, except 1st one, must be a `Broadcast` or `UnionAll`
1323+
// 2. Stages where 1st input is `Broadcast` are not partitioned.
1324+
if (inputIndex > 0) {
13291325
switch (input.GetTypeCase()) {
13301326
case NKqpProto::TKqpPhyConnection::kBroadcast:
13311327
case NKqpProto::TKqpPhyConnection::kHashShuffle:

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3870,6 +3870,55 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
38703870
AssertTableReads(result, "/Root/SecondaryKeys/Index/indexImplTable", 1);
38713871
}
38723872

3873+
Y_UNIT_TEST(MultipleBroadcastJoin) {
3874+
TKikimrSettings kisettings;
3875+
NKikimrConfig::TAppConfig appConfig;
3876+
appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX);
3877+
kisettings.SetAppConfig(appConfig);
3878+
3879+
TKikimrRunner kikimr(kisettings);
3880+
3881+
auto db = kikimr.GetTableClient();
3882+
auto client = kikimr.GetQueryClient();
3883+
auto session = db.CreateSession().GetValueSync().GetSession();
3884+
3885+
{
3886+
auto session = db.CreateSession().GetValueSync().GetSession();
3887+
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
3888+
--!syntax_v1
3889+
3890+
create table demo_ba(id text, some text, ref1 text, ref2 text, primary key(id));
3891+
create table demo_ref1(id text, code text, some text, primary key(id), index ix_code global on (code));
3892+
create table demo_ref2(id text, code text, some text, primary key(id), index ix_code global on (code));
3893+
)").GetValueSync());
3894+
}
3895+
3896+
auto query = R"(
3897+
select ba_0.id, ba_0.some,
3898+
r_1.id, r_1.some, r_1.code,
3899+
r_2.id, r_2.some, r_2.code
3900+
from demo_ba ba_0
3901+
left join demo_ref1 r_1 on r_1.id=ba_0.ref1
3902+
left join demo_ref2 r_2 on r_2.code=ba_0.ref2
3903+
where ba_0.id in ("ba#10"u,"ba#20"u,"ba#30"u,"ba#40"u,"ba#50"u,"ba#60"u,"ba#70"u,"ba#80"u,"ba#90"u,"ba#100"u);
3904+
)";
3905+
3906+
auto settings = NYdb::NQuery::TExecuteQuerySettings()
3907+
.Syntax(NYdb::NQuery::ESyntax::YqlV1)
3908+
.ConcurrentResultSets(false);
3909+
{
3910+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
3911+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3912+
//CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0)));
3913+
//CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1)));
3914+
}
3915+
{
3916+
auto it = client.StreamExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
3917+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
3918+
Cerr << StreamResultToYson(it);
3919+
}
3920+
3921+
}
38733922

38743923
Y_UNIT_TEST(ComplexLookupLimit) {
38753924
TKikimrSettings settings;

0 commit comments

Comments
 (0)