Skip to content

Commit 8f0e559

Browse files
authored
Merge pull request #9092 from xudong963/topk
feat: optimize topk in cluser mode
2 parents 8011bd0 + 3d43924 commit 8f0e559

File tree

4 files changed

+125
-0
lines changed

4 files changed

+125
-0
lines changed

src/query/sql/src/planner/optimizer/distributed/distributed.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,28 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::cmp;
1516
use std::sync::Arc;
1617

1718
use common_catalog::table_context::TableContext;
1819
use common_exception::Result;
1920

21+
use crate::optimizer::distributed::topk::TopK;
2022
use crate::optimizer::property::require_property;
2123
use crate::optimizer::Distribution;
2224
use crate::optimizer::RelExpr;
2325
use crate::optimizer::RequiredProperty;
2426
use crate::optimizer::SExpr;
2527
use crate::plans::Exchange;
28+
use crate::plans::Limit;
29+
use crate::plans::RelOperator;
2630

2731
pub fn optimize_distributed_query(ctx: Arc<dyn TableContext>, s_expr: &SExpr) -> Result<SExpr> {
2832
let required = RequiredProperty {
2933
distribution: Distribution::Any,
3034
};
3135
let mut result = require_property(ctx, &required, s_expr)?;
36+
push_down_topk_to_merge(&mut result, None)?;
3237
let rel_expr = RelExpr::with_s_expr(&result);
3338
let physical_prop = rel_expr.derive_physical_prop()?;
3439
let root_required = RequiredProperty {
@@ -41,3 +46,36 @@ pub fn optimize_distributed_query(ctx: Arc<dyn TableContext>, s_expr: &SExpr) ->
4146

4247
Ok(result)
4348
}
49+
50+
// Traverse the SExpr tree to find top_k, if find, push down it to Exchange::Merge
51+
fn push_down_topk_to_merge(s_expr: &mut SExpr, mut top_k: Option<TopK>) -> Result<()> {
52+
if let RelOperator::Exchange(Exchange::Merge) = s_expr.plan {
53+
if let Some(top_k) = top_k {
54+
let child = &mut s_expr.children[0];
55+
*child = SExpr::create_unary(top_k.sort.into(), child.clone());
56+
*child = SExpr::create_unary(top_k.limit.into(), child.clone());
57+
}
58+
return Ok(());
59+
}
60+
for child in s_expr.children.iter_mut() {
61+
if let RelOperator::Sort(sort) = &child.plan {
62+
if let RelOperator::Limit(limit) = &s_expr.plan {
63+
// If limit.limit is None, no need to push down.
64+
if let Some(mut count) = limit.limit {
65+
count += limit.offset;
66+
top_k = Some(TopK {
67+
sort: sort.clone(),
68+
limit: Limit {
69+
limit: limit
70+
.limit
71+
.map(|origin_limit| cmp::max(origin_limit, count)),
72+
offset: 0,
73+
},
74+
});
75+
}
76+
}
77+
}
78+
push_down_topk_to_merge(child, top_k.clone())?;
79+
}
80+
Ok(())
81+
}

src/query/sql/src/planner/optimizer/distributed/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@
1414

1515
#[allow(clippy::module_inception)]
1616
mod distributed;
17+
mod topk;
1718

1819
pub use distributed::optimize_distributed_query;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::plans::Limit;
16+
use crate::plans::Sort;
17+
18+
#[derive(Clone, Debug)]
19+
pub struct TopK {
20+
pub(crate) sort: Sort,
21+
pub(crate) limit: Limit,
22+
}

tests/logictest/suites/mode/cluster/04_0002_explain_v2

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,70 @@ Exchange
167167
statement ok
168168
set prefer_broadcast_join = 0;
169169

170+
statement query T
171+
explain select * from (SELECT number AS a FROM numbers(10)) x order by x.a limit 3;
172+
173+
----
174+
Limit
175+
├── limit: 3
176+
├── offset: 0
177+
└── Sort
178+
├── sort keys: [number ASC NULLS LAST]
179+
└── Exchange
180+
├── exchange type: Merge
181+
└── Limit
182+
├── limit: 3
183+
├── offset: 0
184+
└── Sort
185+
├── sort keys: [number ASC NULLS LAST]
186+
└── TableScan
187+
├── table: default.system.numbers
188+
├── read rows: 10
189+
├── read bytes: 80
190+
├── partitions total: 1
191+
├── partitions scanned: 1
192+
└── push downs: [filters: [], limit: 3]
193+
194+
statement query T
195+
explain select * from (SELECT number AS a FROM numbers(10)) x right join (SELECT number AS a FROM numbers(5)) y using(a) order by x.a limit 3;
196+
197+
----
198+
Limit
199+
├── limit: 3
200+
├── offset: 0
201+
└── Sort
202+
├── sort keys: [number ASC NULLS LAST]
203+
└── Exchange
204+
├── exchange type: Merge
205+
└── Limit
206+
├── limit: 3
207+
├── offset: 0
208+
└── Sort
209+
├── sort keys: [number ASC NULLS LAST]
210+
└── HashJoin
211+
├── join type: RIGHT OUTER
212+
├── build keys: [y.a (#1)]
213+
├── probe keys: [x.a (#0)]
214+
├── filters: []
215+
├── Exchange(Build)
216+
│ ├── exchange type: Hash(y.a (#1))
217+
│ └── TableScan
218+
│ ├── table: default.system.numbers
219+
│ ├── read rows: 5
220+
│ ├── read bytes: 40
221+
│ ├── partitions total: 1
222+
│ ├── partitions scanned: 1
223+
│ └── push downs: [filters: [], limit: NONE]
224+
└── Exchange(Probe)
225+
├── exchange type: Hash(x.a (#0))
226+
└── TableScan
227+
├── table: default.system.numbers
228+
├── read rows: 10
229+
├── read bytes: 80
230+
├── partitions total: 1
231+
├── partitions scanned: 1
232+
└── push downs: [filters: [], limit: NONE]
233+
170234
statement ok
171235
drop table t1;
172236

0 commit comments

Comments
 (0)