Skip to content

Commit 9d849d2

Browse files
authored
remove stream lookup feature flag usage (#17369)
1 parent c6a5181 commit 9d849d2

File tree

11 files changed

+48
-93
lines changed

11 files changed

+48
-93
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,6 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
631631
}
632632

633633
kqpConfig.EnableKqpScanQuerySourceRead = serviceConfig.GetEnableKqpScanQuerySourceRead();
634-
kqpConfig.EnableKqpDataQueryStreamLookup = serviceConfig.GetEnableKqpDataQueryStreamLookup();
635634
kqpConfig.EnableKqpScanQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();
636635
kqpConfig.EnableKqpDataQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin();
637636
kqpConfig.BindingsMode = RemapBindingsMode(serviceConfig.GetBindingsMode());

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,6 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
280280
auto &event = ev->Get()->Record;
281281

282282
bool allowMultiBroadcasts = TableServiceConfig.GetAllowMultiBroadcasts();
283-
bool enableKqpDataQueryStreamLookup = TableServiceConfig.GetEnableKqpDataQueryStreamLookup();
284283
bool enableKqpDataQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin();
285284
bool enableKqpScanQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();
286285

@@ -325,7 +324,6 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
325324
Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
326325

327326
if (TableServiceConfig.GetSqlVersion() != defaultSyntaxVersion ||
328-
TableServiceConfig.GetEnableKqpDataQueryStreamLookup() != enableKqpDataQueryStreamLookup ||
329327
TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin() != enableKqpScanQueryStreamIdxLookupJoin ||
330328
TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin() != enableKqpDataQueryStreamIdxLookupJoin ||
331329
TableServiceConfig.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead ||

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
123123
|| Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW);
124124
}
125125

126-
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW
127-
|| tableServiceConfig.GetEnableKqpDataQueryStreamLookup());
128-
129126
ReadOnlyTx = IsReadOnlyTx();
130127
}
131128

ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ auto NewLambdaFrom(TExprContext& ctx, TPositionHandle pos, TNodeOnNodeOwnedMap&
416416
}
417417

418418
auto LevelLambdaFrom(
419-
const TIndexDescription& indexDesc, TExprContext& ctx, TPositionHandle pos, TNodeOnNodeOwnedMap& replaces,
419+
const TIndexDescription& indexDesc, TExprContext& ctx, TPositionHandle pos, TNodeOnNodeOwnedMap& replaces,
420420
const TExprNode& fromArgs, const TExprBase& fromBody)
421421
{
422422
auto newLambda = NewLambdaFrom(ctx, pos, replaces, fromArgs, fromBody);
@@ -478,13 +478,13 @@ void RemapIdToParent(TExprContext& ctx, TPositionHandle pos, TExprNodePtr& read)
478478
.Args({mapArg})
479479
.template Body<TCoAsStruct>().Add(mapMembers).Build()
480480
.Build()
481-
.Done().Ptr();
481+
.Done().Ptr();
482482
}
483483

484484
void VectorReadLevel(
485-
const TIndexDescription& indexDesc, TExprContext& ctx, TPositionHandle pos, const TKqpOptimizeContext& kqpCtx,
485+
const TIndexDescription& indexDesc, TExprContext& ctx, TPositionHandle pos, const TKqpOptimizeContext& kqpCtx,
486486
const TExprNodePtr& lambda, const TCoTopBase& top,
487-
const TKqpTable& levelTable, const TCoAtomList& levelColumns,
487+
const TKqpTable& levelTable, const TCoAtomList& levelColumns,
488488
TExprNodePtr& read)
489489
{
490490
const auto& settings = std::get<NKikimrKqp::TVectorIndexKmeansTreeDescription>(indexDesc.SpecializedIndexDescription)
@@ -736,54 +736,32 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
736736
const bool needDataRead = CheckIndexCovering(lookupIndex, implTable);
737737

738738
if (!needDataRead) {
739-
if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
740-
TKqpStreamLookupSettings settings;
741-
settings.Strategy = EStreamLookupStrategyType::LookupRows;
742-
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
743-
.Table(BuildTableMeta(*implTable, node.Pos(), ctx))
744-
.LookupKeys(lookupIndex.LookupKeys())
745-
.Columns(lookupIndex.Columns())
746-
.Settings(settings.BuildNode(ctx, node.Pos()))
747-
.Done();
748-
}
749-
750-
return Build<TKqlLookupTable>(ctx, node.Pos())
751-
.Table(BuildTableMeta(*implTable, node.Pos(), ctx))
752-
.LookupKeys(lookupIndex.LookupKeys())
753-
.Columns(lookupIndex.Columns())
754-
.Done();
755-
}
756-
757-
auto keyColumnsList = BuildKeyColumnsList(tableDesc, node.Pos(), ctx);
758-
759-
if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
760739
TKqpStreamLookupSettings settings;
761740
settings.Strategy = EStreamLookupStrategyType::LookupRows;
762-
TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
741+
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
763742
.Table(BuildTableMeta(*implTable, node.Pos(), ctx))
764743
.LookupKeys(lookupIndex.LookupKeys())
765-
.Columns(keyColumnsList)
766-
.Settings(settings.BuildNode(ctx, node.Pos()))
767-
.Done();
768-
769-
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
770-
.Table(lookupIndex.Table())
771-
.LookupKeys(lookupIndexTable.Ptr())
772744
.Columns(lookupIndex.Columns())
773745
.Settings(settings.BuildNode(ctx, node.Pos()))
774746
.Done();
775747
}
776748

777-
TExprBase lookupIndexTable = Build<TKqlLookupTable>(ctx, node.Pos())
749+
auto keyColumnsList = BuildKeyColumnsList(tableDesc, node.Pos(), ctx);
750+
751+
TKqpStreamLookupSettings settings;
752+
settings.Strategy = EStreamLookupStrategyType::LookupRows;
753+
TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
778754
.Table(BuildTableMeta(*implTable, node.Pos(), ctx))
779755
.LookupKeys(lookupIndex.LookupKeys())
780756
.Columns(keyColumnsList)
757+
.Settings(settings.BuildNode(ctx, node.Pos()))
781758
.Done();
782759

783-
return Build<TKqlLookupTable>(ctx, node.Pos())
760+
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
784761
.Table(lookupIndex.Table())
785762
.LookupKeys(lookupIndexTable.Ptr())
786763
.Columns(lookupIndex.Columns())
764+
.Settings(settings.BuildNode(ctx, node.Pos()))
787765
.Done();
788766
}
789767

@@ -1037,7 +1015,7 @@ TExprBase KqpRewriteTopSortOverIndexRead(const TExprBase& node, TExprContext& ct
10371015
auto [implTable, indexDesc] = tableDesc.Metadata->GetIndex(indexName);
10381016
if (indexDesc->Type == TIndexDescription::EType::GlobalSyncVectorKMeansTree) {
10391017
auto reject = [&] (std::string_view because) {
1040-
auto message = TStringBuilder{} << "Given predicate is not suitable for used index: "
1018+
auto message = TStringBuilder{} << "Given predicate is not suitable for used index: "
10411019
<< indexName << ", because " << because << ", node dump:\n" << node.Ref().Dump();
10421020
TIssue issue{ctx.GetPosition(readTableIndex.Pos()), message};
10431021
SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_KIKIMR_WRONG_INDEX_USAGE, issue);

ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -239,23 +239,9 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos,
239239
.Done();
240240
}
241241

242-
if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
243-
TKqpStreamLookupSettings settings;
244-
settings.Strategy = EStreamLookupStrategyType::LookupRows;
245-
return Build<TKqlStreamLookupTable>(ctx, pos)
246-
.Table(table)
247-
.LookupKeys<TCoSkipNullMembers>()
248-
.Input(keysToLookup)
249-
.Members()
250-
.Add(skipNullColumns)
251-
.Build()
252-
.Build()
253-
.Columns(columns)
254-
.Settings(settings.BuildNode(ctx, pos))
255-
.Done();
256-
}
257-
258-
return Build<TKqlLookupTable>(ctx, pos)
242+
TKqpStreamLookupSettings settings;
243+
settings.Strategy = EStreamLookupStrategyType::LookupRows;
244+
return Build<TKqlStreamLookupTable>(ctx, pos)
259245
.Table(table)
260246
.LookupKeys<TCoSkipNullMembers>()
261247
.Input(keysToLookup)
@@ -264,6 +250,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos,
264250
.Build()
265251
.Build()
266252
.Columns(columns)
253+
.Settings(settings.BuildNode(ctx, pos))
267254
.Done();
268255
}
269256

@@ -446,9 +433,9 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
446433
.Index().Value("1").Build()
447434
.Build()
448435
.Lambda(ctx.DeepCopyLambda(extraRightFilter.Cast().Ref()))
449-
.Build()
450-
.Build()
451-
.Build()
436+
.Build()
437+
.Build()
438+
.Build()
452439
.Done();
453440
}
454441

@@ -468,11 +455,11 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
468455
.Index().Value("1").Build()
469456
.Build()
470457
.Members(rightReadMatch.ExtractMembers.Cast().Members())
471-
.Build()
458+
.Build()
472459
.Build()
473460
.Build()
474461
.Done();
475-
}
462+
}
476463

477464
if (rightReadMatch.FilterNullMembers) {
478465
lookupJoin = Build<TCoMap>(ctx, join.Pos())
@@ -490,12 +477,12 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
490477
.Index().Value("1").Build()
491478
.Build()
492479
.Members(rightReadMatch.FilterNullMembers.Cast().Members())
493-
.Build()
480+
.Build()
494481
.Build()
495482
.Build()
496483
.Done();
497484
}
498-
485+
499486
if (rightReadMatch.SkipNullMembers) {
500487
lookupJoin = Build<TCoMap>(ctx, join.Pos())
501488
.Input(lookupJoin.Cast())
@@ -512,7 +499,7 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
512499
.Index().Value("1").Build()
513500
.Build()
514501
.Members(rightReadMatch.SkipNullMembers.Cast().Members())
515-
.Build()
502+
.Build()
516503
.Build()
517504
.Build()
518505
.Done();
@@ -534,9 +521,9 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
534521
.Index().Value("1").Build()
535522
.Build()
536523
.Lambda(rightReadMatch.FlatMap.Cast().Lambda())
537-
.Build()
538-
.Build()
539-
.Build()
524+
.Build()
525+
.Build()
526+
.Build()
540527
.Done();
541528
}
542529

@@ -860,8 +847,8 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
860847
.Predicate(joinKeyPredicate.Cast())
861848
.Value<TCoAsStruct>()
862849
.Add(lookupMembers)
863-
.Build()
864-
.Build()
850+
.Build()
851+
.Build()
865852
.Add(leftRowArg)
866853
.Done();
867854

@@ -907,7 +894,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
907894
.Lambda(ctx.DeepCopyLambda(prefixLookup->Filter.Cast().Ref()))
908895
.Done();
909896
}
910-
897+
911898
if (prefixLookup->LookupColumns.Raw() != prefixLookup->ResultColumns.Raw()) {
912899
lookup = Build<TCoExtractMembers>(ctx, join.Pos())
913900
.Input(lookup)
@@ -967,7 +954,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
967954

968955
TVector<TString> CollectLabels(const TExprBase& node) {
969956
TVector<TString> rels;
970-
957+
971958
if (node.Maybe<TDqPrecompute>()) {
972959
auto precompute = node.Cast<TDqPrecompute>();
973960
return CollectLabels(precompute.Input());
@@ -978,14 +965,14 @@ TVector<TString> CollectLabels(const TExprBase& node) {
978965

979966
if (join.LeftLabel().Maybe<TCoAtom>()) {
980967
rels.push_back(join.LeftLabel().Cast<TCoAtom>().StringValue());
981-
} else {
968+
} else {
982969
auto lhs = CollectLabels(join.LeftInput());
983970
rels.insert(rels.end(), std::make_move_iterator(lhs.begin()), std::make_move_iterator(lhs.end()));
984971
}
985972

986973
if (join.RightLabel().Maybe<TCoAtom>()) {
987974
rels.push_back(join.RightLabel().Cast<TCoAtom>().StringValue());
988-
} else {
975+
} else {
989976
auto rhs = CollectLabels(join.RightInput());
990977
rels.insert(rels.end(), std::make_move_iterator(rhs.begin()), std::make_move_iterator(rhs.end()));
991978
}
@@ -1017,14 +1004,14 @@ TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const T
10171004
return node;
10181005
}
10191006

1020-
/*
1007+
/*
10211008
* this cycle looks for applied hints for these join labels. if we've found one then we will leave the function.
10221009
* But if it is a LookupJoin we will rewrite it with KqpJoinToIndexLookupImpl because lookup join needs to be rewritten
10231010
*/
10241011
auto joinLabels = CollectLabels(node);
10251012
for (const auto& hint: hints.JoinAlgoHints->Hints) {
10261013
if (
1027-
std::unordered_set<TString>(hint.JoinLabels.begin(), hint.JoinLabels.end()) ==
1014+
std::unordered_set<TString>(hint.JoinLabels.begin(), hint.JoinLabels.end()) ==
10281015
std::unordered_set<TString>(joinLabels.begin(), joinLabels.end()) && hint.Applied
10291016
) {
10301017
if (hint.Algo == EJoinAlgoType::LookupJoin || hint.Algo == EJoinAlgoType::LookupJoinReverse) {

ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ TMaybeNode<TExprBase> KqpRewriteLiteralLookup(const TExprBase& node, TExprContex
4444
auto flatMapRangeInput = lookupKeysFlatMap.Cast().Input().Maybe<TCoRangeFinalize>();
4545

4646
// This rule should depend on feature flag for safety
47-
if (!flatMapRangeInput || !kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
47+
if (!flatMapRangeInput) {
4848
return {};
4949
}
5050

@@ -151,10 +151,6 @@ TExprBase KqpRewriteLookupTable(const TExprBase& node, TExprContext& ctx, const
151151

152152
const TKqlLookupTable& lookup = node.Cast<TKqlLookupTable>();
153153

154-
if (!kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
155-
return node;
156-
}
157-
158154
TKqpStreamLookupSettings settings;
159155
settings.Strategy = EStreamLookupStrategyType::LookupRows;
160156
return Build<TKqlStreamLookupTable>(ctx, lookup.Pos())

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
127127
AddHandler(1, &TKqpWriteConstraint::Match, HNDL(BuildWriteConstraint<true>));
128128
AddHandler(1, &TKqpReadOlapTableRanges::Match, HNDL(AddColumnForEmptyColumnsOlapRead));
129129

130-
130+
131131
AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpReadTable));
132132
AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpLookupTable));
133133
AddHandler(2, &TKqlUpsertRows::Match, HNDL(RewriteReturningUpsert));
@@ -451,7 +451,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
451451
{
452452
// TODO: Allow push to left stage for data queries.
453453
// It is now possible as we don't use datashard transactions for reads in data queries.
454-
bool pushLeftStage = (KqpCtx.IsScanQuery() || KqpCtx.Config->EnableKqpDataQueryStreamLookup) && AllowFuseJoinInputs(node);
454+
bool pushLeftStage = AllowFuseJoinInputs(node);
455455
bool shuffleEliminationWithMap = KqpCtx.Config->OptShuffleEliminationWithMap.Get().GetOrElse(true);
456456
bool rightCollectStage = !KqpCtx.Config->AllowMultiBroadcasts;
457457
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,

ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,9 @@ NYql::NNodes::TExprBase KqpBuildSequencerStages(NYql::NNodes::TExprBase node, NY
581581
NYql::NNodes::TExprBase KqpRewriteLookupTablePhy(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
582582
const TKqpOptimizeContext& kqpCtx) {
583583

584-
if (!node.Maybe<TDqStage>() || !kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
584+
Y_UNUSED(kqpCtx);
585+
586+
if (!node.Maybe<TDqStage>()) {
585587
return node;
586588
}
587589

@@ -608,7 +610,7 @@ NYql::NNodes::TExprBase KqpRewriteLookupTablePhy(NYql::NNodes::TExprBase node, N
608610
<< KqpExprToPrettyString(lookupKeys, ctx));
609611

610612
TKqpStreamLookupSettings settings;
611-
settings.Strategy = EStreamLookupStrategyType::LookupRows;
613+
settings.Strategy = EStreamLookupStrategyType::LookupRows;
612614
TNodeOnNodeOwnedMap replaceMap;
613615
TVector<TExprBase> newInputs;
614616
TVector<TCoArgument> newArgs;
@@ -768,12 +770,12 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
768770
}
769771

770772
NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStagesKeepSorted(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
771-
TTypeAnnotationContext& typeCtx, bool ruleEnabled)
773+
TTypeAnnotationContext& typeCtx, bool ruleEnabled)
772774
{
773775
if (!ruleEnabled) {
774776
return node;
775777
}
776-
778+
777779
if (!node.Maybe<TKqlIndexLookupJoin>()) {
778780
return node;
779781
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
158158
NKikimrConfig::TFeatureFlags FeatureFlags;
159159

160160
bool EnableKqpScanQuerySourceRead = false;
161-
bool EnableKqpDataQueryStreamLookup = false;
162161
bool EnableKqpScanQueryStreamIdxLookupJoin = false;
163162
bool EnableKqpDataQueryStreamIdxLookupJoin = false;
164163
NSQLTranslation::EBindingsMode BindingsMode = NSQLTranslation::EBindingsMode::ENABLED;

ydb/core/protos/table_service_config.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,8 @@ message TTableServiceConfig {
241241
reserved 27; // EnableKqpDataQuerySourceRead
242242
optional uint64 SessionIdleDurationSeconds = 28 [default = 600];
243243
optional TAggregationConfig AggregationConfig = 29;
244-
optional bool EnableKqpScanQueryStreamLookup = 30 [default = true];
245-
optional bool EnableKqpDataQueryStreamLookup = 31 [default = true];
244+
reserved 30; // optional bool EnableKqpScanQueryStreamLookup = 30 [default = true];
245+
reserved 31; // optional bool EnableKqpDataQueryStreamLookup = 31 [default = true];
246246
optional TExecuterRetriesConfig ExecuterRetriesConfig = 32;
247247
reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
248248
optional bool EnablePublishKqpProxyByRM = 34 [default = true];

0 commit comments

Comments
 (0)