Skip to content

Commit 34a69bf

Browse files
authored
Merge branch 'main' into dev-unescape-string
2 parents c32ce98 + 7e66eca commit 34a69bf

File tree

4 files changed

+109
-3
lines changed

4 files changed

+109
-3
lines changed

src/query/sql/src/executor/physical_plan.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,25 @@ impl PhysicalPlan {
430430
}
431431
}
432432

433+
pub fn name(&self) -> String {
434+
match self {
435+
PhysicalPlan::TableScan(_) => "TableScan".to_string(),
436+
PhysicalPlan::Filter(_) => "Filter".to_string(),
437+
PhysicalPlan::Project(_) => "Project".to_string(),
438+
PhysicalPlan::EvalScalar(_) => "EvalScalar".to_string(),
439+
PhysicalPlan::AggregatePartial(_) => "AggregatePartial".to_string(),
440+
PhysicalPlan::AggregateFinal(_) => "AggregateFinal".to_string(),
441+
PhysicalPlan::Sort(_) => "Sort".to_string(),
442+
PhysicalPlan::Limit(_) => "Limit".to_string(),
443+
PhysicalPlan::HashJoin(_) => "HashJoin".to_string(),
444+
PhysicalPlan::Exchange(_) => "Exchange".to_string(),
445+
PhysicalPlan::UnionAll(_) => "UnionAll".to_string(),
446+
PhysicalPlan::DistributedInsertSelect(_) => "DistributedInsertSelect".to_string(),
447+
PhysicalPlan::ExchangeSource(_) => "Exchange Source".to_string(),
448+
PhysicalPlan::ExchangeSink(_) => "Exchange Sink".to_string(),
449+
}
450+
}
451+
433452
pub fn children<'a>(&'a self) -> Box<dyn Iterator<Item = &'a PhysicalPlan> + 'a> {
434453
match self {
435454
PhysicalPlan::TableScan(_) => Box::new(std::iter::empty()),

src/query/sql/src/executor/physical_plan_builder.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,12 @@ impl PhysicalPlanBuilder {
356356
..
357357
}) => agg.input.output_schema()?,
358358

359-
_ => unreachable!(),
359+
_ => {
360+
return Err(ErrorCode::Internal(format!(
361+
"invalid input physical plan: {}",
362+
input.name(),
363+
)));
364+
}
360365
};
361366

362367
let agg_funcs: Vec<AggregateFunctionDesc> = agg.aggregate_functions.iter().map(|v| {
@@ -419,10 +424,17 @@ impl PhysicalPlanBuilder {
419424
})
420425
}
421426

422-
_ => unreachable!(),
427+
_ => {
428+
return Err(ErrorCode::Internal(format!(
429+
"invalid input physical plan: {}",
430+
input.name(),
431+
)));
432+
}
423433
}
424434
}
425-
AggregateMode::Initial => unreachable!(),
435+
AggregateMode::Initial => {
436+
return Err(ErrorCode::Internal("Invalid aggregate mode: Initial"));
437+
}
426438
};
427439

428440
Ok(result)

src/query/sql/src/planner/optimizer/distributed/distributed.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ pub fn optimize_distributed_query(ctx: Arc<dyn TableContext>, s_expr: &SExpr) ->
5050
// Traverse the SExpr tree to find top_k, if find, push down it to Exchange::Merge
5151
fn push_down_topk_to_merge(s_expr: &mut SExpr, mut top_k: Option<TopK>) -> Result<()> {
5252
if let RelOperator::Exchange(Exchange::Merge) = s_expr.plan {
53+
// A quick fix for Merge child is aggregate.
54+
// Todo: consider to push down topk to the above of aggregate.
55+
if let RelOperator::Aggregate(_) = s_expr.child(0)?.plan {
56+
return Ok(());
57+
}
5358
if let Some(top_k) = top_k {
5459
let child = &mut s_expr.children[0];
5560
*child = SExpr::create_unary(top_k.sort.into(), child.clone());

tests/logictest/suites/mode/cluster/04_0002_explain_v2

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,76 @@ EvalScalar
131131
├── order by: []
132132
└── limit: NONE
133133

134+
statement query T
135+
explain select count(1) as c, count(b) as d, max(a) as e from t1 order by c, e, d limit 10;
136+
137+
----
138+
Limit
139+
├── limit: 10
140+
├── offset: 0
141+
└── Sort
142+
├── sort keys: [c ASC NULLS LAST, e ASC NULLS LAST, d ASC NULLS LAST]
143+
└── EvalScalar
144+
├── expressions: [count(1) (#8), max(a) (#10), count(b) (#9)]
145+
└── AggregateFinal
146+
├── group by: []
147+
├── aggregate functions: [count(), count(b), max(a)]
148+
└── Exchange
149+
├── exchange type: Merge
150+
└── AggregatePartial
151+
├── group by: []
152+
├── aggregate functions: [count(), count(b), max(a)]
153+
└── TableScan
154+
├── table: default.default.t1
155+
├── read rows: 0
156+
├── read bytes: 0
157+
├── partitions total: 0
158+
├── partitions scanned: 0
159+
└── push downs: [filters: [], limit: NONE]
160+
161+
statement query T
162+
explain select (t1.a + 1) as c,(t1.b+1) as d, (t2.a+1) as e from t1 join t2 on t1.a = t2.a order by c, d, e limit 10;
163+
164+
----
165+
Limit
166+
├── limit: 10
167+
├── offset: 0
168+
└── Sort
169+
├── sort keys: [c ASC NULLS LAST, d ASC NULLS LAST, e ASC NULLS LAST]
170+
└── Exchange
171+
├── exchange type: Merge
172+
└── Limit
173+
├── limit: 10
174+
├── offset: 0
175+
└── Sort
176+
├── sort keys: [c ASC NULLS LAST, d ASC NULLS LAST, e ASC NULLS LAST]
177+
└── EvalScalar
178+
├── expressions: [+(t1.a (#0), 1), +(t1.b (#1), 1), +(t2.a (#2), 1)]
179+
└── HashJoin
180+
├── join type: INNER
181+
├── build keys: [t2.a (#2)]
182+
├── probe keys: [t1.a (#0)]
183+
├── filters: []
184+
├── Exchange(Build)
185+
│ ├── exchange type: Hash(t2.a (#2))
186+
│ └── TableScan
187+
│ ├── table: default.default.t2
188+
│ ├── read rows: 0
189+
│ ├── read bytes: 0
190+
│ ├── partitions total: 0
191+
│ ├── partitions scanned: 0
192+
│ ├── push downs: [filters: [], limit: NONE]
193+
│ └── output columns: [0]
194+
└── Exchange(Probe)
195+
├── exchange type: Hash(t1.a (#0))
196+
└── TableScan
197+
├── table: default.default.t1
198+
├── read rows: 0
199+
├── read bytes: 0
200+
├── partitions total: 0
201+
├── partitions scanned: 0
202+
└── push downs: [filters: [], limit: NONE]
203+
134204
statement ok
135205
set prefer_broadcast_join = 1;
136206

0 commit comments

Comments
 (0)