Skip to content

Commit 18cb693

Browse files
committed
update
1 parent 89ca1c5 commit 18cb693

File tree

11 files changed

+238
-197
lines changed

11 files changed

+238
-197
lines changed

src/query/service/src/pipelines/builders/builder_union_all.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use async_channel::Receiver;
1616
use databend_common_exception::Result;
1717
use databend_common_expression::DataBlock;
18+
use databend_common_expression::DataSchemaRef;
1819
use databend_common_pipeline_core::processors::ProcessorPtr;
1920
use databend_common_pipeline_sinks::UnionReceiveSink;
2021
use databend_common_sql::executor::physical_plans::UnionAll;
@@ -26,17 +27,24 @@ use crate::sessions::QueryContext;
2627

2728
impl PipelineBuilder {
2829
pub fn build_union_all(&mut self, union_all: &UnionAll) -> Result<()> {
29-
self.build_pipeline(&union_all.left)?;
30-
let union_all_receiver = self.expand_union_all(&union_all.right)?;
30+
self.build_pipeline(&union_all.children[0])?;
31+
let mut remain_children_receivers = vec![];
32+
for (idx, remaining_child) in union_all.children.iter().skip(1).enumerate() {
33+
remain_children_receivers.push((idx + 1, self.expand_union_all(remaining_child)?));
34+
}
35+
let schemas: Vec<DataSchemaRef> = union_all
36+
.children
37+
.iter()
38+
.map(|plan| plan.output_schema())
39+
.collect::<Result<_>>()?;
3140
self.main_pipeline
3241
.add_transform(|transform_input_port, transform_output_port| {
3342
Ok(ProcessorPtr::create(TransformMergeBlock::try_create(
3443
transform_input_port,
3544
transform_output_port,
36-
union_all.left.output_schema()?,
37-
union_all.right.output_schema()?,
38-
union_all.pairs.clone(),
39-
union_all_receiver.clone(),
45+
schemas.clone(),
46+
union_all.output_cols.clone(),
47+
remain_children_receivers.clone(),
4048
)?))
4149
})?;
4250
Ok(())

src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs

Lines changed: 61 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_pipeline_core::processors::Event;
2727
use databend_common_pipeline_core::processors::InputPort;
2828
use databend_common_pipeline_core::processors::OutputPort;
2929
use databend_common_pipeline_core::processors::Processor;
30+
use databend_common_sql::IndexType;
3031

3132
pub struct TransformMergeBlock {
3233
finished: bool,
@@ -35,90 +36,82 @@ pub struct TransformMergeBlock {
3536

3637
input_data: Option<DataBlock>,
3738
output_data: Option<DataBlock>,
38-
left_schema: DataSchemaRef,
39-
right_schema: DataSchemaRef,
40-
pairs: Vec<(String, String)>,
39+
schemas: Vec<DataSchemaRef>,
40+
output_cols: Vec<Vec<IndexType>>,
4141

42-
receiver: Receiver<DataBlock>,
43-
receiver_result: Option<DataBlock>,
42+
receivers: Vec<(usize, Receiver<DataBlock>)>,
43+
receiver_results: Vec<(usize, DataBlock)>,
4444
}
4545

4646
impl TransformMergeBlock {
4747
pub fn try_create(
4848
input: Arc<InputPort>,
4949
output: Arc<OutputPort>,
50-
left_schema: DataSchemaRef,
51-
right_schema: DataSchemaRef,
52-
pairs: Vec<(String, String)>,
53-
receiver: Receiver<DataBlock>,
50+
schemas: Vec<DataSchemaRef>,
51+
output_cols: Vec<Vec<IndexType>>,
52+
receivers: Vec<(usize, Receiver<DataBlock>)>,
5453
) -> Result<Box<dyn Processor>> {
5554
Ok(Box::new(TransformMergeBlock {
5655
finished: false,
5756
input,
5857
output,
5958
input_data: None,
6059
output_data: None,
61-
left_schema,
62-
right_schema,
63-
pairs,
64-
receiver,
65-
receiver_result: None,
60+
schemas,
61+
output_cols,
62+
receivers,
63+
receiver_results: vec![],
6664
}))
6765
}
6866

69-
fn project_block(&self, block: DataBlock, is_left: bool) -> Result<DataBlock> {
67+
fn project_block(&self, block: DataBlock, idx: Option<usize>) -> Result<DataBlock> {
7068
let num_rows = block.num_rows();
71-
let columns = self
72-
.pairs
73-
.iter()
74-
.map(|(left, right)| {
75-
if is_left {
69+
let columns = if let Some(idx) = idx {
70+
self.check_type(idx, &block)?
71+
} else {
72+
self.output_cols[0]
73+
.iter()
74+
.map(|idx| {
7675
Ok(block
77-
.get_by_offset(self.left_schema.index_of(left)?)
76+
.get_by_offset(self.schemas[0].index_of(&idx.to_string())?)
7877
.clone())
79-
} else {
80-
// If block from right, check if block schema matches self scheme(left schema)
81-
// If unmatched, covert block columns types or report error
82-
self.check_type(left, right, &block)
83-
}
84-
})
85-
.collect::<Result<Vec<_>>>()?;
78+
})
79+
.collect::<Result<Vec<_>>>()?
80+
};
8681
Ok(DataBlock::new(columns, num_rows))
8782
}
8883

89-
fn check_type(
90-
&self,
91-
left_name: &str,
92-
right_name: &str,
93-
block: &DataBlock,
94-
) -> Result<BlockEntry> {
95-
let left_field = self.left_schema.field_with_name(left_name)?;
96-
let left_data_type = left_field.data_type();
84+
fn check_type(&self, idx: usize, block: &DataBlock) -> Result<Vec<BlockEntry>> {
85+
let mut columns = vec![];
86+
for (left_idx, right_idx) in self.output_cols[0].iter().zip(self.output_cols[idx].iter()) {
87+
let left_field = self.schemas[0].field_with_name(&left_idx.to_string())?;
88+
let left_data_type = left_field.data_type();
9789

98-
let right_field = self.right_schema.field_with_name(right_name)?;
99-
let right_data_type = right_field.data_type();
90+
let right_field = self.schemas[idx].field_with_name(&right_idx.to_string())?;
91+
let right_data_type = right_field.data_type();
10092

101-
let index = self.right_schema.index_of(right_name)?;
102-
103-
if left_data_type == right_data_type {
104-
return Ok(block.get_by_offset(index).clone());
105-
}
93+
let offset = self.schemas[idx].index_of(&right_idx.to_string())?;
94+
if left_data_type == right_data_type {
95+
columns.push(block.get_by_offset(offset).clone());
96+
}
10697

107-
if left_data_type.remove_nullable() == right_data_type.remove_nullable() {
108-
let origin_column = block.get_by_offset(index).clone();
109-
let mut builder = ColumnBuilder::with_capacity(left_data_type, block.num_rows());
110-
let value = origin_column.value.as_ref();
111-
for idx in 0..block.num_rows() {
112-
let scalar = value.index(idx).unwrap();
113-
builder.push(scalar);
98+
if left_data_type.remove_nullable() == right_data_type.remove_nullable() {
99+
let origin_column = block.get_by_offset(offset).clone();
100+
let mut builder = ColumnBuilder::with_capacity(left_data_type, block.num_rows());
101+
let value = origin_column.value.as_ref();
102+
for idx in 0..block.num_rows() {
103+
let scalar = value.index(idx).unwrap();
104+
builder.push(scalar);
105+
}
106+
let col = builder.build();
107+
columns.push(BlockEntry::new(left_data_type.clone(), Value::Column(col)));
108+
} else {
109+
return Err(ErrorCode::IllegalDataType(
110+
"The data type on both sides of the union does not match",
111+
));
114112
}
115-
let col = builder.build();
116-
Ok(BlockEntry::new(left_data_type.clone(), Value::Column(col)))
117-
} else {
118-
Err(ErrorCode::IllegalDataType(
119-
"The data type on both sides of the union does not match",
120-
))
121113
}
114+
Ok(columns)
122115
}
123116
}
124117

@@ -148,12 +141,7 @@ impl Processor for TransformMergeBlock {
148141
return Ok(Event::NeedConsume);
149142
}
150143

151-
if self.input_data.is_some() || self.receiver_result.is_some() {
152-
return Ok(Event::Sync);
153-
}
154-
155-
if let Ok(result) = self.receiver.try_recv() {
156-
self.receiver_result = Some(result);
144+
if self.input_data.is_some() || !self.receiver_results.is_empty() {
157145
return Ok(Event::Sync);
158146
}
159147

@@ -175,28 +163,25 @@ impl Processor for TransformMergeBlock {
175163
}
176164

177165
fn process(&mut self) -> Result<()> {
166+
let mut blocks = vec![];
167+
for (idx, receive_result) in self.receiver_results.iter() {
168+
blocks.push(self.project_block(receive_result.clone(), Some(*idx))?);
169+
}
170+
self.receiver_results.clear();
178171
if let Some(input_data) = self.input_data.take() {
179-
if let Some(receiver_result) = self.receiver_result.take() {
180-
self.output_data = Some(DataBlock::concat(&[
181-
self.project_block(input_data, true)?,
182-
self.project_block(receiver_result, false)?,
183-
])?);
184-
} else {
185-
self.output_data = Some(self.project_block(input_data, true)?);
186-
}
187-
} else if let Some(receiver_result) = self.receiver_result.take() {
188-
self.output_data = Some(self.project_block(receiver_result, false)?);
172+
blocks.push(self.project_block(input_data, None)?);
189173
}
190-
174+
self.output_data = Some(DataBlock::concat(&blocks)?);
191175
Ok(())
192176
}
193177

194178
#[async_backtrace::framed]
195179
async fn async_process(&mut self) -> Result<()> {
196180
if !self.finished {
197-
if let Ok(result) = self.receiver.recv().await {
198-
self.receiver_result = Some(result);
199-
return Ok(());
181+
for (idx, receiver) in self.receivers.iter() {
182+
if let Ok(result) = receiver.recv().await {
183+
self.receiver_results.push((*idx, result));
184+
}
200185
}
201186
self.finished = true;
202187
}

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -259,27 +259,24 @@ impl PhysicalPlanReplacer for Fragmenter {
259259

260260
fn replace_union(&mut self, plan: &UnionAll) -> Result<PhysicalPlan> {
261261
let mut fragments = vec![];
262-
let left_input = self.replace(plan.left.as_ref())?;
263-
let left_state = self.state.clone();
264-
265-
// Consume current fragments to prevent them being consumed by `right_input`.
266-
fragments.append(&mut self.fragments);
267-
let right_input = self.replace(plan.right.as_ref())?;
268-
let right_state = self.state.clone();
269-
270-
fragments.append(&mut self.fragments);
262+
let mut children = vec![];
263+
let mut states = vec![];
264+
for child in plan.children.iter() {
265+
children.push(Box::new(self.replace(child)?));
266+
states.push(self.state.clone());
267+
fragments.append(&mut self.fragments);
268+
}
271269
self.fragments = fragments;
272270

273271
// If any of the input is a source fragment, the union all is a source fragment.
274-
if left_state == State::SelectLeaf || right_state == State::SelectLeaf {
272+
if states.iter().any(|state| state == &State::SelectLeaf) {
275273
self.state = State::SelectLeaf;
276274
} else {
277275
self.state = State::Other;
278276
}
279277

280278
Ok(PhysicalPlan::UnionAll(UnionAll {
281-
left: Box::new(left_input),
282-
right: Box::new(right_input),
279+
children,
283280
..plan.clone()
284281
}))
285282
}

src/query/sql/src/executor/format.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,14 @@ impl PhysicalPlan {
144144
))
145145
}
146146
PhysicalPlan::UnionAll(union_all) => {
147-
let left_child = union_all.left.format_join(metadata)?;
148-
let right_child = union_all.right.format_join(metadata)?;
149-
150-
let children = vec![
151-
FormatTreeNode::with_children("Left".to_string(), vec![left_child]),
152-
FormatTreeNode::with_children("Right".to_string(), vec![right_child]),
153-
];
154-
147+
let mut children = Vec::with_capacity(union_all.children.len());
148+
for (idx, child) in union_all.children.iter().enumerate() {
149+
let child = child.format_join(metadata)?;
150+
children.push(FormatTreeNode::with_children(
151+
format!("#{:?}", idx + 1),
152+
vec![child],
153+
))
154+
}
155155
Ok(FormatTreeNode::with_children(
156156
"UnionAll".to_string(),
157157
children,
@@ -975,10 +975,9 @@ fn union_all_to_format_tree(
975975

976976
append_profile_info(&mut children, profs, plan.plan_id);
977977

978-
children.extend(vec![
979-
to_format_tree(&plan.left, metadata, profs)?,
980-
to_format_tree(&plan.right, metadata, profs)?,
981-
]);
978+
for child in plan.children.iter() {
979+
children.push(to_format_tree(child, metadata, profs)?);
980+
}
982981

983982
Ok(FormatTreeNode::with_children(
984983
"UnionAll".to_string(),

src/query/sql/src/executor/physical_plan.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,9 @@ impl PhysicalPlan {
206206
PhysicalPlan::UnionAll(plan) => {
207207
plan.plan_id = *next_id;
208208
*next_id += 1;
209-
plan.left.adjust_plan_id(next_id);
210-
plan.right.adjust_plan_id(next_id);
209+
for child in plan.children.iter_mut() {
210+
child.adjust_plan_id(next_id);
211+
}
211212
}
212213
PhysicalPlan::CteScan(plan) => {
213214
plan.plan_id = *next_id;
@@ -460,9 +461,7 @@ impl PhysicalPlan {
460461
),
461462
PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())),
462463
PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())),
463-
PhysicalPlan::UnionAll(plan) => Box::new(
464-
std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())),
465-
),
464+
PhysicalPlan::UnionAll(plan) => Box::new(plan.children.iter().map(|child| &(**child))),
466465
PhysicalPlan::DistributedInsertSelect(plan) => {
467466
Box::new(std::iter::once(plan.input.as_ref()))
468467
}
@@ -688,11 +687,6 @@ impl PhysicalPlan {
688687
PhysicalPlan::CteScan(v) => {
689688
format!("CTE index: {}, sub index: {}", v.cte_idx.0, v.cte_idx.1)
690689
}
691-
PhysicalPlan::UnionAll(v) => v
692-
.pairs
693-
.iter()
694-
.map(|(l, r)| format!("#{} <- #{}", l, r))
695-
.join(", "),
696690
_ => String::new(),
697691
})
698692
}

src/query/sql/src/executor/physical_plan_visitor.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -349,14 +349,15 @@ pub trait PhysicalPlanReplacer {
349349
}
350350

351351
fn replace_union(&mut self, plan: &UnionAll) -> Result<PhysicalPlan> {
352-
let left = self.replace(&plan.left)?;
353-
let right = self.replace(&plan.right)?;
352+
let mut children = Vec::with_capacity(plan.children.len());
353+
for child in plan.children.iter() {
354+
children.push(Box::new(self.replace(child)?));
355+
}
354356
Ok(PhysicalPlan::UnionAll(UnionAll {
355357
plan_id: plan.plan_id,
356-
left: Box::new(left),
357-
right: Box::new(right),
358+
children,
358359
schema: plan.schema.clone(),
359-
pairs: plan.pairs.clone(),
360+
output_cols: plan.output_cols.clone(),
360361
stat_info: plan.stat_info.clone(),
361362
}))
362363
}
@@ -559,8 +560,9 @@ impl PhysicalPlan {
559560
Self::traverse(&plan.input, pre_visit, visit, post_visit);
560561
}
561562
PhysicalPlan::UnionAll(plan) => {
562-
Self::traverse(&plan.left, pre_visit, visit, post_visit);
563-
Self::traverse(&plan.right, pre_visit, visit, post_visit);
563+
for child in plan.children.iter() {
564+
Self::traverse(child, pre_visit, visit, post_visit);
565+
}
564566
}
565567
PhysicalPlan::DistributedInsertSelect(plan) => {
566568
Self::traverse(&plan.input, pre_visit, visit, post_visit);

0 commit comments

Comments
 (0)