Skip to content

Commit 206b1d0

Browse files
committed
Split TKqpOlapApply into 2 parts to support arbitrary subgraph pushdown (#18524)
1 parent b1c3672 commit 206b1d0

File tree

4 files changed

+83
-72
lines changed

4 files changed

+83
-72
lines changed

ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -682,15 +682,22 @@
682682
{"Index": 2, "Name": "ReturningType", "Type": "TExprBase"}
683683
]
684684
},
685+
{
686+
"Name": "TKqpOlapApplyColumnArg",
687+
"Base": "TExprBase",
688+
"Match": {"Type": "Callable", "Name": "KqpOlapApplyColumnArg"},
689+
"Children": [
690+
{"Index": 0, "Name": "TableRowType", "Type": "TExprBase"},
691+
{"Index": 1, "Name": "ColumnName", "Type": "TCoAtom"}
692+
]
693+
},
685694
{
686695
"Name": "TKqpOlapApply",
687696
"Base": "TExprBase",
688697
"Match": {"Type": "Callable", "Name": "KqpOlapApply"},
689698
"Children": [
690-
{"Index": 0, "Name": "Type", "Type": "TExprBase"},
691-
{"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
692-
{"Index": 2, "Name": "Parameters", "Type": "TExprList"},
693-
{"Index": 3, "Name": "Lambda", "Type": "TCoLambda"}
699+
{"Index": 0, "Name": "Lambda", "Type": "TCoLambda"},
700+
{"Index": 1, "Name": "Args", "Type": "TExprList"}
694701
]
695702
},
696703
{

ydb/core/kqp/host/kqp_type_ann.cpp

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,69 +1121,63 @@ TStatus AnnotateOlapFilter(const TExprNode::TPtr& node, TExprContext& ctx) {
11211121
return TStatus::Ok;
11221122
}
11231123

1124-
TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
1125-
if (!EnsureArgsCount(*node, 4U, ctx)) {
1124+
TStatus AnnotateOlapApplyColumnArg(const TExprNode::TPtr& node, TExprContext& ctx) {
1125+
if (!EnsureArgsCount(*node, 2U, ctx)) {
11261126
return TStatus::Error;
11271127
}
11281128

1129-
const auto type = node->Child(TKqpOlapApply::idx_Type);
1130-
if (!EnsureType(*type, ctx)) {
1129+
const auto& row = node->Head();
1130+
if (!EnsureType(row, ctx)) {
11311131
return TStatus::Error;
11321132
}
1133-
1134-
const auto argsType = type->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
1135-
if (!EnsureStructType(type->Pos(), *argsType, ctx)) {
1133+
const auto& rowType = row.GetTypeAnn()->Cast<TTypeExprType>()->GetType();
1134+
if (!EnsureStructType(row.Pos(), *rowType, ctx)) {
11361135
return TStatus::Error;
11371136
}
1137+
const auto& rowStructType = rowType->Cast<TStructExprType>();
11381138

1139-
const auto columns = node->Child(TKqpOlapApply::idx_Columns);
1140-
if (!EnsureTupleOfAtoms(*columns, ctx)) {
1139+
1140+
if (!EnsureAtom(node->Tail(), ctx)) {
11411141
return TStatus::Error;
11421142
}
1143-
1144-
const auto structType = argsType->Cast<TStructExprType>();
1145-
std::vector<const NYql::TTypeAnnotationNode*> argsTypes(columns->ChildrenSize());
1146-
1147-
for (auto i = 0U; i < argsTypes.size(); ++i) {
1148-
if (const auto argType = structType->FindItemType(columns->Child(i)->Content()))
1149-
argsTypes[i] = argType;
1150-
else {
1151-
ctx.AddError(TIssue(ctx.GetPosition(columns->Child(i)->Pos()),
1152-
TStringBuilder() << "Missed column: " << columns->Child(i)->Content()
1153-
));
1154-
return TStatus::Error;
1155-
}
1143+
const auto& columnName = node->Tail().Content();
1144+
if (const auto& columnType = rowStructType->FindItemType(columnName)) {
1145+
node->SetTypeAnn(columnType);
1146+
return TStatus::Ok;
1147+
} else {
1148+
ctx.AddError(TIssue(ctx.GetPosition(node->Tail().Pos()),
1149+
TStringBuilder() << "Missed column: " << columnName
1150+
));
1151+
return TStatus::Error;
11561152
}
1153+
}
11571154

1158-
TExprList parameters = TExprList(node->Child(TKqpOlapApply::idx_Parameters));
1159-
1160-
for(auto expr: parameters) {
1161-
if (!EnsureArgsCount(*expr.Ptr(), 2U, ctx)) {
1162-
return TStatus::Error;
1163-
}
1155+
TStatus AnnotateOlapApply(const TExprNode::TPtr& node, TExprContext& ctx) {
1156+
if (!EnsureArgsCount(*node, 2U, ctx)) {
1157+
return TStatus::Error;
1158+
}
11641159

1165-
TCoParameter param = TMaybeNode<TCoParameter>(expr.Ptr()).Cast();
1166-
const auto& paramType = expr.Ptr()->Child(TCoParameter::idx_Type);
1167-
if (!EnsureType(*paramType, ctx)) {
1168-
return TStatus::Error;
1169-
}
1160+
TExprList args = TExprList(node->Child(TKqpOlapApply::idx_Args));
1161+
std::vector<const NYql::TTypeAnnotationNode*> argTypes;
11701162

1171-
argsTypes.push_back(paramType->GetTypeAnn()->Cast<TTypeExprType>()->GetType());
1163+
for(const auto& arg: args) {
1164+
argTypes.push_back(arg.Ref().GetTypeAnn());
11721165
}
11731166

1174-
if (!EnsureLambda(node->Tail(), ctx)) {
1167+
auto& lambda = node->ChildRef(TKqpOlapApply::idx_Lambda);
1168+
if (!EnsureLambda(*lambda, ctx)) {
11751169
return TStatus::Error;
11761170
}
11771171

1178-
if (!UpdateLambdaAllArgumentsTypes(node->TailRef(), argsTypes, ctx)) {
1172+
if (!UpdateLambdaAllArgumentsTypes(lambda, argTypes, ctx)) {
11791173
return TStatus::Error;
11801174
}
11811175

1182-
if (!node->Tail().GetTypeAnn()) {
1176+
if (!lambda->GetTypeAnn()) {
11831177
return TStatus::Repeat;
11841178
}
11851179

1186-
node->SetTypeAnn(ctx.MakeType<TUnitExprType>());
1180+
node->SetTypeAnn(lambda->GetTypeAnn());
11871181
return TStatus::Ok;
11881182
}
11891183

@@ -1994,6 +1988,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
19941988
return AnnotateOlapFilter(input, ctx);
19951989
}
19961990

1991+
if (TKqpOlapApplyColumnArg::Match(input.Get())) {
1992+
return AnnotateOlapApplyColumnArg(input, ctx);
1993+
}
1994+
19971995
if (TKqpOlapApply::Match(input.Get())) {
19981996
return AnnotateOlapApply(input, ctx);
19991997
}

ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -219,34 +219,40 @@ TMaybeNode<TExprBase> YqlApplyPushdown(const TExprBase& apply, const TExprNode&
219219
return false;
220220
});
221221

222+
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22560
223+
if (!members.size()) {
224+
return nullptr;
225+
}
226+
222227
TNodeOnNodeOwnedMap replacements(members.size());
223-
TExprNode::TListType columns, arguments;
224-
columns.reserve(members.size());
225-
arguments.reserve(members.size());
228+
TExprNode::TListType realArgs;
229+
TExprNode::TListType lambdaArgs;
230+
226231
for (const auto& member : members) {
227-
columns.emplace_back(member->TailPtr());
228-
TString argumentName = "members_" + TString(columns.back()->Content());
229-
arguments.emplace_back(ctx.NewArgument(member->Pos(), TStringBuf(argumentName)));
230-
replacements.emplace(member.Get(), arguments.back());
232+
const auto& columnName = member->TailPtr();
233+
auto columnArg = Build<TKqpOlapApplyColumnArg>(ctx, member->Pos())
234+
.TableRowType(ExpandType(argument.Pos(), *argument.GetTypeAnn(), ctx))
235+
.ColumnName(columnName)
236+
.Done();
237+
238+
realArgs.push_back(columnArg.Ptr());
239+
TString argumentName = "members_" + TString(columnName->Content());
240+
lambdaArgs.emplace_back(ctx.NewArgument(member->Pos(), TStringBuf(argumentName)));
241+
replacements.emplace(member.Get(), lambdaArgs.back());
231242
}
232243

233244
for(const auto& pptr : parameters) {
234-
TCoParameter parameter = TMaybeNode<TCoParameter>(pptr).Cast();
245+
realArgs.push_back(pptr);
246+
const auto& parameter = TMaybeNode<TCoParameter>(pptr).Cast();
235247
TString argumentName = "parameter_" + TString(parameter.Name().StringValue());
236-
arguments.emplace_back(ctx.NewArgument(pptr->Pos(), TStringBuf(argumentName)));
237-
replacements.emplace(pptr.Get(), arguments.back());
248+
lambdaArgs.emplace_back(ctx.NewArgument(pptr->Pos(), TStringBuf(argumentName)));
249+
replacements.emplace(pptr.Get(), lambdaArgs.back());
238250
}
239251

240-
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22560
241-
if (!columns.size()) {
242-
return nullptr;
243-
}
244252

245253
return Build<TKqpOlapApply>(ctx, apply.Pos())
246-
.Type(ExpandType(argument.Pos(), *argument.GetTypeAnn(), ctx))
247-
.Columns().Add(std::move(columns)).Build()
248-
.Parameters().Add(std::move(parameters)).Build()
249-
.Lambda(ctx.NewLambda(apply.Pos(), ctx.NewArguments(argument.Pos(), std::move(arguments)), ctx.ReplaceNodes(apply.Ptr(), replacements)))
254+
.Lambda(ctx.NewLambda(apply.Pos(), ctx.NewArguments(argument.Pos(), std::move(lambdaArgs)), ctx.ReplaceNodes(apply.Ptr(), replacements)))
255+
.Args().Add(std::move(realArgs)).Build()
250256
.Done();
251257
}
252258

ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -589,18 +589,18 @@ const TTypedColumn CompileExists(const TExprBase& arg, TKqpOlapCompileContext& c
589589
TTypedColumn CompileYqlKernelScalarApply(const TKqpOlapApply& apply, TKqpOlapCompileContext& ctx) {
590590
std::vector<ui64> ids;
591591
TTypeAnnotationNode::TListType argTypes;
592-
ids.reserve(apply.Columns().Size());
593-
argTypes.reserve(apply.Columns().Size());
594-
for (const auto& member : apply.Columns()) {
595-
const auto arg = GetOrCreateColumnIdAndType(member, ctx);
596-
ids.emplace_back(arg.Id);
597-
argTypes.emplace_back(arg.Type);
598-
}
599-
600-
for(const auto& param: apply.Parameters()) {
601-
const auto& arg = GetOrCreateColumnIdAndType(param, ctx);
602-
ids.emplace_back(arg.Id);
603-
argTypes.emplace_back(arg.Type);
592+
ids.reserve(apply.Args().Size());
593+
argTypes.reserve(apply.Args().Size());
594+
for (const auto& arg : apply.Args()) {
595+
if (const auto& column = arg.Maybe<TKqpOlapApplyColumnArg>()) {
596+
const auto ssaCol = GetOrCreateColumnIdAndType(column.Cast().ColumnName(), ctx);
597+
ids.emplace_back(ssaCol.Id);
598+
argTypes.emplace_back(ssaCol.Type);
599+
} else {
600+
const auto& ssaCol = GetOrCreateColumnIdAndType(arg, ctx);
601+
ids.emplace_back(ssaCol.Id);
602+
argTypes.emplace_back(ssaCol.Type);
603+
}
604604
}
605605

606606
auto *const command = ctx.CreateAssignCmd();

0 commit comments

Comments
 (0)