Skip to content

Commit a59ab09

Browse files
authored
[OLAP] Push OLAP projections. (#19996)
1 parent 85ef01a commit a59ab09

File tree

9 files changed

+390
-5
lines changed

9 files changed

+390
-5
lines changed

ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,23 @@
568568
{"Index": 0, "Name": "Input", "Type": "TExprBase"}
569569
]
570570
},
571+
{
572+
"Name": "TKqpOlapProjection",
573+
"Base": "TCallable",
574+
"Match": {"Type": "Callable", "Name": "KqpOlapProjection"},
575+
"Children": [
576+
{"Index": 0, "Name": "OlapOperation", "Type": "TExprBase"},
577+
{"Index": 1, "Name": "ColumnName", "Type": "TCoAtom"}
578+
]
579+
},
580+
{
581+
"Name": "TKqpOlapProjections",
582+
"Base": "TKqpOlapOperationBase",
583+
"Match": {"Type": "Callable", "Name": "KqpOlapProjections"},
584+
"Children": [
585+
{"Index": 1, "Name": "Projections", "Type": "TExprList"}
586+
]
587+
},
571588
{
572589
"Name": "TKqpOlapFilter",
573590
"Base": "TKqpOlapOperationBase",

ydb/core/kqp/host/kqp_type_ann.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,67 @@ bool ValidateOlapFilterConditions(const TExprNode* node, const TStructExprType*
10971097
return false;
10981098
}
10991099

1100+
TStatus AnnotateOlapProjection(const TExprNode::TPtr& node, TExprContext& ctx) {
1101+
if (!EnsureArgsCount(*node, 2, ctx)) {
1102+
return TStatus::Error;
1103+
}
1104+
1105+
auto* olapOperation = node->Child(TKqpOlapProjection::idx_OlapOperation);
1106+
// Exptecting that type annotation is supported for olap operation.
1107+
if (!olapOperation->GetTypeAnn()) {
1108+
return TStatus::Repeat;
1109+
}
1110+
1111+
node->SetTypeAnn(olapOperation->GetTypeAnn());
1112+
return TStatus::Ok;
1113+
}
1114+
1115+
TStatus AnnotateOlapProjections(const TExprNode::TPtr& node, TExprContext& ctx) {
1116+
if (!EnsureArgsCount(*node, 2, ctx)) {
1117+
return TStatus::Error;
1118+
}
1119+
1120+
auto* input = node->Child(TKqpOlapProjections::idx_Input);
1121+
const TTypeAnnotationNode* inputType;
1122+
if (!EnsureNewSeqType<false, false, true>(*input, ctx, &inputType)) {
1123+
return TStatus::Error;
1124+
}
1125+
1126+
if (!EnsureStructType(input->Pos(), *inputType, ctx)) {
1127+
return TStatus::Error;
1128+
}
1129+
1130+
// For each `Projection` we want to replace a type annotation for column
1131+
// which associated with a `Projection`.
1132+
// For example: JsonDocumnet -> JsonValue.
1133+
THashMap<TString, const TTypeAnnotationNode*> projectionsTypes;
1134+
const auto* projections = node->Child(TKqpOlapProjections::idx_Projections);
1135+
for (const auto& expr : TExprBase(projections).Cast<TExprList>()) {
1136+
auto projection = TExprBase(expr).Cast<TKqpOlapProjection>();
1137+
const auto* projectionTypeAnn = projection.Ptr()->GetTypeAnn();
1138+
// Expecting annotation for projection.
1139+
if (!projectionTypeAnn) {
1140+
return TStatus::Repeat;
1141+
}
1142+
projectionsTypes.emplace(TString(projection.ColumnName()), projectionTypeAnn);
1143+
}
1144+
1145+
TVector<const TItemExprType*> newItemTypes;
1146+
const auto* originalStructType = inputType->Cast<TStructExprType>();
1147+
for (const auto* originalItemType : originalStructType->GetItems()) {
1148+
const auto& itemName = originalItemType->GetName();
1149+
if (projectionsTypes.contains(itemName)) {
1150+
newItemTypes.push_back(ctx.MakeType<TItemExprType>(itemName, projectionsTypes[itemName]));
1151+
} else {
1152+
newItemTypes.push_back(originalItemType);
1153+
}
1154+
}
1155+
1156+
// Create a final type (Flow(Struct{items}))
1157+
node->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TStructExprType>(newItemTypes)));
1158+
return TStatus::Ok;
1159+
}
1160+
11001161
TStatus AnnotateOlapFilter(const TExprNode::TPtr& node, TExprContext& ctx) {
11011162
if (!EnsureArgsCount(*node, 2, ctx)) {
11021163
return TStatus::Error;
@@ -2160,6 +2221,14 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
21602221
return AnnotateOlapUnaryLogicOperator(input, ctx);
21612222
}
21622223

2224+
if (TKqpOlapProjection::Match(input.Get())) {
2225+
return AnnotateOlapProjection(input, ctx);
2226+
}
2227+
2228+
if (TKqpOlapProjections::Match(input.Get())) {
2229+
return AnnotateOlapProjections(input, ctx);
2230+
}
2231+
21632232
if (TKqpOlapFilter::Match(input.Get())) {
21642233
return AnnotateOlapFilter(input, ctx);
21652234
}

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
4545
AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable));
4646
AddHandler(0, &TCoTopSort::Match, HNDL(ApplyLimitToOlapReadTable));
4747
AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter));
48+
AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapProjections));
4849
AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage));
4950
AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushOlapAggregate));
5051
AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushdownOlapGroupByKeys));
@@ -239,6 +240,12 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
239240
return output;
240241
}
241242

243+
TMaybeNode<TExprBase> PushOlapProjections(TExprBase node, TExprContext& ctx) {
244+
TExprBase output = KqpPushOlapProjections(node, ctx, KqpCtx, TypesCtx);
245+
DumpAppliedRule("PushOlapProjections", node.Ptr(), output.Ptr(), ctx);
246+
return output;
247+
}
248+
242249
TMaybeNode<TExprBase> PushAggregateCombineToStage(TExprBase node, TExprContext& ctx,
243250
IOptimizationContext& optCtx, const TGetParents& getParents)
244251
{

ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp

Lines changed: 143 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -782,7 +782,146 @@ std::pair<TVector<TOLAPPredicateNode>, TVector<TOLAPPredicateNode>> SplitForPart
782782
return {pushable, remaining};
783783
}
784784

785-
} // anonymous namespace end
785+
bool IsSuitableToCollectProjection(TExprBase node) {
786+
// Currently support only `JsonDocument`.
787+
if (auto maybeJsonValue = node.Maybe<TCoJsonValue>()) {
788+
auto jsonMember = maybeJsonValue.Cast().Json().Maybe<TCoMember>();
789+
auto jsonPath = maybeJsonValue.Cast().JsonPath().Maybe<TCoUtf8>();
790+
return jsonMember && jsonPath;
791+
}
792+
return false;
793+
}
794+
795+
// Collects all operations for projections and returns a vector of pair - [columName, olap operation].
796+
TVector<std::pair<TString, TExprNode::TPtr>> CollectOlapOperationsForProjections(const TExprNode::TPtr& node, const TExprNode& arg,
797+
TNodeOnNodeOwnedMap& replaces,
798+
const THashSet<TString>& predicateMembers, TExprContext& ctx) {
799+
auto asStructPred = [](const TExprNode::TPtr& node) -> bool { return !!TMaybeNode<TCoAsStruct>(node); };
800+
auto memberPred = [](const TExprNode::TPtr& node) { return !!TMaybeNode<TCoMember>(node); };
801+
802+
TVector<std::pair<TString, TExprNode::TPtr>> olapOperationsForProjections;
803+
// Expressions for projections are placed in `AsStruct` callable.
804+
if (auto asStruct = FindNode(node, asStructPred)) {
805+
// Process each child for `AsStruct` callable.
806+
for (auto child : TExprBase(asStruct).Cast<TCoAsStruct>()) {
807+
if (IsSuitableToCollectProjection(child.Item(1))) {
808+
// Search for the `TCoMember` in expression, we need expression with only one `TCoMember`.
809+
if (auto originalMembers = FindNodes(child.Item(1).Ptr(), memberPred); originalMembers.size() == 1) {
810+
// Convert YQL op to OLAP op.
811+
if (auto olapOperations = ConvertComparisonNode(TExprBase(child.Item(1)), arg, ctx, node->Pos(), false);
812+
olapOperations.size() == 1) {
813+
auto originalMember = TExprBase(originalMembers.front()).Cast<TCoMember>();
814+
815+
auto originalMemberName = TString(originalMember.Name());
816+
// We cannot push projection if some predicate for the same column still not pushed.
817+
if (!predicateMembers.contains(originalMemberName)) {
818+
auto newMember = Build<TCoMember>(ctx, node->Pos())
819+
.Struct(originalMember.Struct())
820+
.Name(originalMember.Name())
821+
.Done();
822+
823+
auto olapOperation = olapOperations.front();
824+
// Replace full expression with only member.
825+
replaces[child.Item(1).Raw()] = newMember.Ptr();
826+
olapOperationsForProjections.emplace_back(TString(newMember.Name()), olapOperation.Ptr());
827+
828+
YQL_CLOG(TRACE, ProviderKqp)
829+
<< "[OLAP PROJECTION] Operation in olap dialect: " << KqpExprToPrettyString(olapOperation, ctx);
830+
}
831+
}
832+
}
833+
}
834+
}
835+
}
836+
837+
return olapOperationsForProjections;
838+
}
839+
840+
void CollectPredicateMembers(TExprNode::TPtr predicate, THashSet<TString>& predicateMembers) {
841+
auto memberPred = [](const TExprNode::TPtr& node) { return !!TMaybeNode<TCoMember>(node); };
842+
auto members = FindNodes(predicate, memberPred);
843+
for (const auto& member : members) {
844+
predicateMembers.insert(TString(TExprBase(member).Cast<TCoMember>().Name()));
845+
}
846+
}
847+
848+
} // anonymous namespace end
849+
850+
TExprBase KqpPushOlapProjections(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
851+
TTypeAnnotationContext& typesCtx)
852+
{
853+
Y_UNUSED(typesCtx);
854+
if (!(kqpCtx.Config->HasOptEnableOlapPushdown() && kqpCtx.Config->HasOptEnableOlapPushdownProjections())) {
855+
return node;
856+
}
857+
858+
if (!node.Maybe<TCoFlatMap>().Input().Maybe<TKqpReadOlapTableRanges>()) {
859+
return node;
860+
}
861+
862+
auto flatmap = node.Cast<TCoFlatMap>();
863+
const auto& lambda = flatmap.Lambda();
864+
865+
// Collect `TCoMembers` from predicate, we cannot push projection if some predicate for the same column still not pushed.
866+
THashSet<TString> predicateMembers;
867+
if (auto maybeOptionalIf = lambda.Body().Maybe<TCoOptionalIf>()) {
868+
CollectPredicateMembers(maybeOptionalIf.Cast().Predicate().Ptr(), predicateMembers);
869+
}
870+
871+
// Combinations of `OlapAgg` and `OlapProjections` are not supported yet.
872+
auto olapAggPred = [](const TExprNode::TPtr& node) -> bool { return !!TMaybeNode<TKqpOlapAgg>(node); };
873+
if (auto maybeOlapAgg = FindNode(lambda.Body().Ptr(), olapAggPred)) {
874+
return node;
875+
}
876+
877+
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
878+
auto read = flatmap.Input().Cast<TKqpReadOlapTableRanges>();
879+
880+
TNodeOnNodeOwnedMap replaces;
881+
auto olapOperationsForProjections = CollectOlapOperationsForProjections(flatmap.Ptr(), lambdaArg, replaces, predicateMembers, ctx);
882+
if (olapOperationsForProjections.empty()) {
883+
return node;
884+
}
885+
886+
TVector<TExprBase> projections;
887+
for (const auto& [columnName, olapOperation] : olapOperationsForProjections) {
888+
auto olapProjection = Build<TKqpOlapProjection>(ctx, node.Pos())
889+
.OlapOperation(olapOperation)
890+
.ColumnName().Build(columnName)
891+
.Done();
892+
projections.push_back(olapProjection);
893+
}
894+
895+
auto olapProjections = Build<TKqpOlapProjections>(ctx, node.Pos())
896+
.Input(read.Process().Body())
897+
.Projections()
898+
.Add(projections)
899+
.Build()
900+
.Done();
901+
902+
auto newLambda = Build<TCoLambda>(ctx, node.Pos())
903+
.Args({"arg"})
904+
.Body<TExprApplier>()
905+
.Apply(olapProjections)
906+
.With(read.Process().Args().Arg(0), "arg")
907+
.Build()
908+
.Done();
909+
910+
auto newRead = Build<TKqpReadOlapTableRanges>(ctx, node.Pos())
911+
.Table(read.Table())
912+
.Ranges(read.Ranges())
913+
.Columns(read.Columns())
914+
.Settings(read.Settings())
915+
.ExplainPrompt(read.ExplainPrompt())
916+
.Process(newLambda)
917+
.Done();
918+
919+
replaces[read.Raw()] = newRead.Ptr();
920+
auto newFlatmap = TExprBase(TExprBase(ctx.ReplaceNodes(flatmap.Ptr(), replaces)).Cast<TCoFlatMap>());
921+
922+
YQL_CLOG(TRACE, ProviderKqp) << "[OLAP PROJECTION] After rewrite: " << KqpExprToPrettyString(newFlatmap, ctx);
923+
return newFlatmap;
924+
}
786925

787926
TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
788927
TTypeAnnotationContext& typesCtx)
@@ -808,7 +947,6 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
808947

809948
const auto& lambda = flatmap.Lambda();
810949
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
811-
812950
YQL_CLOG(TRACE, ProviderKqp) << "Initial OLAP lambda: " << KqpExprToPrettyString(lambda, ctx);
813951

814952
const auto maybeOptionalIf = lambda.Body().Maybe<TCoOptionalIf>();
@@ -819,6 +957,8 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
819957
const auto& optionaIf = maybeOptionalIf.Cast();
820958
auto predicate = optionaIf.Predicate();
821959
auto value = optionaIf.Value();
960+
// Use original value in final flatmap, because we need an original ast for the given value in `KqpPushOlapProjection`.
961+
auto originalValue = value;
822962

823963
TOLAPPredicateNode predicateTree;
824964
predicateTree.ExprNode = predicate.Ptr();
@@ -938,7 +1078,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
9381078
.With(lambda.Args().Arg(0), "new_arg")
9391079
.Build()
9401080
.Value<TExprApplier>()
941-
.Apply(value)
1081+
.Apply(originalValue)
9421082
.With(lambda.Args().Arg(0), "new_arg")
9431083
.Build()
9441084
.Build()

ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ NYql::NNodes::TExprBase KqpApplyLimitToOlapReadTable(NYql::NNodes::TExprBase nod
5252
NYql::NNodes::TExprBase KqpPushOlapFilter(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
5353
const TKqpOptimizeContext& kqpCtx, NYql::TTypeAnnotationContext& typesCtx);
5454

55+
NYql::NNodes::TExprBase KqpPushOlapProjections(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
56+
const TKqpOptimizeContext& kqpCtx, NYql::TTypeAnnotationContext& typesCtx);
57+
5558
NYql::NNodes::TExprBase KqpPushOlapAggregate(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
5659
const TKqpOptimizeContext& kqpCtx);
5760

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
8282
REGISTER_SETTING(*this, OptEnableInplaceUpdate);
8383
REGISTER_SETTING(*this, OptEnablePredicateExtract);
8484
REGISTER_SETTING(*this, OptEnableOlapPushdown);
85+
REGISTER_SETTING(*this, OptEnableOlapPushdownProjections);
8586
REGISTER_SETTING(*this, OptEnableOlapProvideComputeSharding);
8687
REGISTER_SETTING(*this, OptOverrideStatistics);
8788
REGISTER_SETTING(*this, OptimizerHints).Parser([](const TString& v) { return NYql::TOptimizerHints::Parse(v); });
@@ -152,6 +153,10 @@ bool TKikimrSettings::HasOptEnableOlapPushdown() const {
152153
return GetOptionalFlagValue(OptEnableOlapPushdown.Get()) != EOptionalFlag::Disabled;
153154
}
154155

156+
bool TKikimrSettings::HasOptEnableOlapPushdownProjections() const {
157+
return GetOptionalFlagValue(OptEnableOlapPushdownProjections.Get()) == EOptionalFlag::Enabled;
158+
}
159+
155160
bool TKikimrSettings::HasOptEnableOlapProvideComputeSharding() const {
156161
return GetOptionalFlagValue(OptEnableOlapProvideComputeSharding.Get()) == EOptionalFlag::Enabled;
157162
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ struct TKikimrSettings {
6969
NCommon::TConfSetting<bool, Static> OptEnableInplaceUpdate;
7070
NCommon::TConfSetting<bool, Static> OptEnablePredicateExtract;
7171
NCommon::TConfSetting<bool, Static> OptEnableOlapPushdown;
72+
NCommon::TConfSetting<bool, Static> OptEnableOlapPushdownProjections;
7273
NCommon::TConfSetting<bool, Static> OptEnableOlapProvideComputeSharding;
7374
NCommon::TConfSetting<bool, Static> OptUseFinalizeByKey;
7475
NCommon::TConfSetting<bool, Static> OptShuffleElimination;
@@ -100,6 +101,7 @@ struct TKikimrSettings {
100101
bool HasOptDisableTopSort() const;
101102
bool HasOptDisableSqlInToJoin() const;
102103
bool HasOptEnableOlapPushdown() const;
104+
bool HasOptEnableOlapPushdownProjections() const;
103105
bool HasOptEnableOlapProvideComputeSharding() const;
104106
bool HasOptUseFinalizeByKey() const;
105107
bool HasMaxSequentialReadsInFlight() const;

0 commit comments

Comments
 (0)