Skip to content

Commit ac826cc

Browse files
authored
[YQ-3761] Fix RewriteAsHoppingWindow optimization (#10455) (#10547)
1 parent 1ecf0f5 commit ac826cc

File tree

28 files changed

+114
-132
lines changed

28 files changed

+114
-132
lines changed

ydb/library/yql/dq/opt/dq_opt_log.cpp

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -183,32 +183,18 @@ static void CollectSinkStages(const NNodes::TDqQuery& dqQuery, THashSet<TExprNod
183183
}
184184

185185
NNodes::TExprBase DqMergeQueriesWithSinks(NNodes::TExprBase dqQueryNode, TExprContext& ctx) {
186-
NNodes::TDqQuery dqQuery = dqQueryNode.Cast<NNodes::TDqQuery>();
187-
188-
THashSet<TExprNode::TPtr, TExprNode::TPtrHash> sinkStages;
189-
CollectSinkStages(dqQuery, sinkStages);
190-
TOptimizeExprSettings settings{nullptr};
191-
settings.VisitLambdas = false;
192-
bool deletedDqQueryChild = false;
193-
TExprNode::TPtr newDqQueryNode;
194-
auto status = OptimizeExpr(dqQueryNode.Ptr(), newDqQueryNode, [&sinkStages, &deletedDqQueryChild](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
195-
for (ui32 childIndex = 0; childIndex < node->ChildrenSize(); ++childIndex) {
196-
TExprNode* child = node->Child(childIndex);
197-
if (child->IsCallable(NNodes::TDqQuery::CallableName())) {
198-
NNodes::TDqQuery dqQueryChild(child);
199-
CollectSinkStages(dqQueryChild, sinkStages);
200-
deletedDqQueryChild = true;
201-
return ctx.ChangeChild(*node, childIndex, dqQueryChild.World().Ptr());
202-
}
203-
}
204-
return node;
205-
}, ctx, settings);
206-
YQL_ENSURE(status != IGraphTransformer::TStatus::Error, "Failed to merge DqQuery nodes: " << status);
207-
208-
if (deletedDqQueryChild) {
209-
auto dqQueryBuilder = Build<TDqQuery>(ctx, dqQuery.Pos());
210-
dqQueryBuilder.World(newDqQueryNode->ChildPtr(TDqQuery::idx_World));
211-
186+
auto maybeDqQuery = dqQueryNode.Maybe<NNodes::TDqQuery>();
187+
YQL_ENSURE(maybeDqQuery, "Expected DqQuery!");
188+
auto dqQuery = maybeDqQuery.Cast();
189+
190+
if (auto maybeDqQueryChild = dqQuery.World().Maybe<NNodes::TDqQuery>()) {
191+
auto dqQueryChild = maybeDqQueryChild.Cast();
192+
auto dqQueryBuilder = Build<TDqQuery>(ctx, dqQuery.Pos())
193+
.World(dqQueryChild.World());
194+
195+
THashSet<TExprNode::TPtr, TExprNode::TPtrHash> sinkStages;
196+
CollectSinkStages(dqQuery, sinkStages);
197+
CollectSinkStages(maybeDqQueryChild.Cast(), sinkStages);
212198
auto sinkStagesBuilder = dqQueryBuilder.SinkStages();
213199
for (const TExprNode::TPtr& stage : sinkStages) {
214200
sinkStagesBuilder.Add(stage);

ydb/library/yql/providers/clickhouse/expr_nodes/yql_clickhouse_expr_nodes.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@
4141
"Base": "TCallable",
4242
"Match": {"Type": "Callable", "Name": "ClSourceSettings"},
4343
"Children": [
44-
{"Index": 0, "Name": "Table", "Type": "TCoAtom"},
45-
{"Index": 1, "Name": "Token", "Type": "TCoSecureParam"},
46-
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}
44+
{"Index": 0, "Name": "World", "Type": "TExprBase"},
45+
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
46+
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
47+
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"}
4748
]
4849
}
4950
]

ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource_type_ann.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ class TClickHouseDataSourceTypeAnnotationTransformer : public TVisitorTransforme
2525
}
2626

2727
TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
28-
if (!EnsureArgsCount(*input, 3U, ctx)) {
28+
if (!EnsureArgsCount(*input, 4, ctx)) {
29+
return TStatus::Error;
30+
}
31+
32+
if (!EnsureWorldType(*input->Child(TClSourceSettings::idx_World), ctx)) {
2933
return TStatus::Error;
3034
}
3135

ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
5252

5353
return Build<TDqSourceWrap>(ctx, read->Pos())
5454
.Input<TClSourceSettings>()
55+
.World(clReadTable.World())
5556
.Table(clReadTable.Table())
5657
.Token<TCoSecureParam>()
5758
.Name().Build(token)

ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@
4141
"Base": "TCallable",
4242
"Match": {"Type": "Callable", "Name": "GenSourceSettings"},
4343
"Children": [
44-
{"Index": 0, "Name": "Cluster", "Type": "TCoAtom"},
45-
{"Index": 1, "Name": "Table", "Type": "TCoAtom"},
46-
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
47-
{"Index": 3, "Name": "Columns", "Type": "TCoAtomList"},
48-
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
44+
{"Index": 0, "Name": "World", "Type": "TExprBase"},
45+
{"Index": 1, "Name": "Cluster", "Type": "TCoAtom"},
46+
{"Index": 2, "Name": "Table", "Type": "TCoAtom"},
47+
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"},
48+
{"Index": 4, "Name": "Columns", "Type": "TCoAtomList"},
49+
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
4950
]
5051
}
5152
]

ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,11 @@ namespace NYql {
7676
}
7777

7878
TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
79-
if (!EnsureArgsCount(*input, 5, ctx)) {
79+
if (!EnsureArgsCount(*input, 6, ctx)) {
80+
return TStatus::Error;
81+
}
82+
83+
if (!EnsureWorldType(*input->Child(TGenSourceSettings::idx_World), ctx)) {
8084
return TStatus::Error;
8185
}
8286

ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ namespace NYql {
8585
// clang-format off
8686
return Build<TDqSourceWrap>(ctx, read->Pos())
8787
.Input<TGenSourceSettings>()
88+
.World(genReadTable.World())
8889
.Cluster(genReadTable.DataSource().Cluster())
8990
.Table(genReadTable.Table())
9091
.Token<TCoSecureParam>()

ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.json

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,12 @@
6767
"Base": "TCallable",
6868
"Match": {"Type": "Callable", "Name": "DqPqTopicSource"},
6969
"Children": [
70-
{"Index": 0, "Name": "Topic", "Type": "TPqTopic"},
71-
{"Index": 1, "Name": "Columns", "Type": "TExprBase"},
72-
{"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"},
73-
{"Index": 3, "Name": "Token", "Type": "TCoSecureParam"}
70+
{"Index": 0, "Name": "World", "Type": "TExprBase"},
71+
{"Index": 1, "Name": "Topic", "Type": "TPqTopic"},
72+
{"Index": 2, "Name": "Columns", "Type": "TExprBase"},
73+
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"},
74+
{"Index": 4, "Name": "Token", "Type": "TCoSecureParam"},
75+
{"Index": 5, "Name": "FilterPredicate", "Type": "TCoLambda"}
7476
]
7577
},
7678
{

ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,16 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
131131
}
132132

133133
TStatus HandleDqTopicSource(TExprBase input, TExprContext& ctx) {
134-
if (!EnsureArgsCount(input.Ref(), 4, ctx)) {
134+
if (!EnsureArgsCount(input.Ref(), 6, ctx)) {
135135
return TStatus::Error;
136136
}
137137

138138
TDqPqTopicSource topicSource = input.Cast<TDqPqTopicSource>();
139+
140+
if (!EnsureWorldType(topicSource.World().Ref(), ctx)) {
141+
return TStatus::Error;
142+
}
143+
139144
TPqTopic topic = topicSource.Topic();
140145

141146
if (!EnsureCallable(topic.Ref(), ctx)) {

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
131131

132132
return Build<TDqSourceWrap>(ctx, read->Pos())
133133
.Input<TDqPqTopicSource>()
134+
.World(pqReadTopic.World())
134135
.Topic(pqReadTopic.Topic())
135136
.Columns(std::move(columns))
136137
.Settings(BuildTopicReadSettings(clusterName, dqSettings, read->Pos(), ctx))

0 commit comments

Comments
 (0)