Skip to content

Commit b7a0c59

Browse files
committed
update
1 parent 35e8a0e commit b7a0c59

File tree

11 files changed

+238
-197
lines changed

11 files changed

+238
-197
lines changed

src/query/service/src/pipelines/builders/builder_union_all.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use async_channel::Receiver;
1616
use databend_common_exception::Result;
1717
use databend_common_expression::DataBlock;
18+
use databend_common_expression::DataSchemaRef;
1819
use databend_common_pipeline_core::processors::ProcessorPtr;
1920
use databend_common_pipeline_sinks::UnionReceiveSink;
2021
use databend_common_sql::executor::physical_plans::UnionAll;
@@ -26,17 +27,24 @@ use crate::sessions::QueryContext;
2627

2728
impl PipelineBuilder {
2829
pub fn build_union_all(&mut self, union_all: &UnionAll) -> Result<()> {
29-
self.build_pipeline(&union_all.left)?;
30-
let union_all_receiver = self.expand_union_all(&union_all.right)?;
30+
self.build_pipeline(&union_all.children[0])?;
31+
let mut remain_children_receivers = vec![];
32+
for (idx, remaining_child) in union_all.children.iter().skip(1).enumerate() {
33+
remain_children_receivers.push((idx + 1, self.expand_union_all(remaining_child)?));
34+
}
35+
let schemas: Vec<DataSchemaRef> = union_all
36+
.children
37+
.iter()
38+
.map(|plan| plan.output_schema())
39+
.collect::<Result<_>>()?;
3140
self.main_pipeline
3241
.add_transform(|transform_input_port, transform_output_port| {
3342
Ok(ProcessorPtr::create(TransformMergeBlock::try_create(
3443
transform_input_port,
3544
transform_output_port,
36-
union_all.left.output_schema()?,
37-
union_all.right.output_schema()?,
38-
union_all.pairs.clone(),
39-
union_all_receiver.clone(),
45+
schemas.clone(),
46+
union_all.output_cols.clone(),
47+
remain_children_receivers.clone(),
4048
)?))
4149
})?;
4250
Ok(())

src/query/service/src/pipelines/processors/transforms/transform_merge_block.rs

Lines changed: 61 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_pipeline_core::processors::Event;
2727
use databend_common_pipeline_core::processors::InputPort;
2828
use databend_common_pipeline_core::processors::OutputPort;
2929
use databend_common_pipeline_core::processors::Processor;
30+
use databend_common_sql::IndexType;
3031

3132
pub struct TransformMergeBlock {
3233
finished: bool,
@@ -35,90 +36,82 @@ pub struct TransformMergeBlock {
3536

3637
input_data: Option<DataBlock>,
3738
output_data: Option<DataBlock>,
38-
left_schema: DataSchemaRef,
39-
right_schema: DataSchemaRef,
40-
pairs: Vec<(String, String)>,
39+
schemas: Vec<DataSchemaRef>,
40+
output_cols: Vec<Vec<IndexType>>,
4141

42-
receiver: Receiver<DataBlock>,
43-
receiver_result: Option<DataBlock>,
42+
receivers: Vec<(usize, Receiver<DataBlock>)>,
43+
receiver_results: Vec<(usize, DataBlock)>,
4444
}
4545

4646
impl TransformMergeBlock {
4747
pub fn try_create(
4848
input: Arc<InputPort>,
4949
output: Arc<OutputPort>,
50-
left_schema: DataSchemaRef,
51-
right_schema: DataSchemaRef,
52-
pairs: Vec<(String, String)>,
53-
receiver: Receiver<DataBlock>,
50+
schemas: Vec<DataSchemaRef>,
51+
output_cols: Vec<Vec<IndexType>>,
52+
receivers: Vec<(usize, Receiver<DataBlock>)>,
5453
) -> Result<Box<dyn Processor>> {
5554
Ok(Box::new(TransformMergeBlock {
5655
finished: false,
5756
input,
5857
output,
5958
input_data: None,
6059
output_data: None,
61-
left_schema,
62-
right_schema,
63-
pairs,
64-
receiver,
65-
receiver_result: None,
60+
schemas,
61+
output_cols,
62+
receivers,
63+
receiver_results: vec![],
6664
}))
6765
}
6866

69-
fn project_block(&self, block: DataBlock, is_left: bool) -> Result<DataBlock> {
67+
fn project_block(&self, block: DataBlock, idx: Option<usize>) -> Result<DataBlock> {
7068
let num_rows = block.num_rows();
71-
let columns = self
72-
.pairs
73-
.iter()
74-
.map(|(left, right)| {
75-
if is_left {
69+
let columns = if let Some(idx) = idx {
70+
self.check_type(idx, &block)?
71+
} else {
72+
self.output_cols[0]
73+
.iter()
74+
.map(|idx| {
7675
Ok(block
77-
.get_by_offset(self.left_schema.index_of(left)?)
76+
.get_by_offset(self.schemas[0].index_of(&idx.to_string())?)
7877
.clone())
79-
} else {
80-
// If block from right, check if block schema matches self scheme(left schema)
81-
// If unmatched, covert block columns types or report error
82-
self.check_type(left, right, &block)
83-
}
84-
})
85-
.collect::<Result<Vec<_>>>()?;
78+
})
79+
.collect::<Result<Vec<_>>>()?
80+
};
8681
Ok(DataBlock::new(columns, num_rows))
8782
}
8883

89-
fn check_type(
90-
&self,
91-
left_name: &str,
92-
right_name: &str,
93-
block: &DataBlock,
94-
) -> Result<BlockEntry> {
95-
let left_field = self.left_schema.field_with_name(left_name)?;
96-
let left_data_type = left_field.data_type();
84+
fn check_type(&self, idx: usize, block: &DataBlock) -> Result<Vec<BlockEntry>> {
85+
let mut columns = vec![];
86+
for (left_idx, right_idx) in self.output_cols[0].iter().zip(self.output_cols[idx].iter()) {
87+
let left_field = self.schemas[0].field_with_name(&left_idx.to_string())?;
88+
let left_data_type = left_field.data_type();
9789

98-
let right_field = self.right_schema.field_with_name(right_name)?;
99-
let right_data_type = right_field.data_type();
90+
let right_field = self.schemas[idx].field_with_name(&right_idx.to_string())?;
91+
let right_data_type = right_field.data_type();
10092

101-
let index = self.right_schema.index_of(right_name)?;
102-
103-
if left_data_type == right_data_type {
104-
return Ok(block.get_by_offset(index).clone());
105-
}
93+
let offset = self.schemas[idx].index_of(&right_idx.to_string())?;
94+
if left_data_type == right_data_type {
95+
columns.push(block.get_by_offset(offset).clone());
96+
}
10697

107-
if left_data_type.remove_nullable() == right_data_type.remove_nullable() {
108-
let origin_column = block.get_by_offset(index).clone();
109-
let mut builder = ColumnBuilder::with_capacity(left_data_type, block.num_rows());
110-
let value = origin_column.value.as_ref();
111-
for idx in 0..block.num_rows() {
112-
let scalar = value.index(idx).unwrap();
113-
builder.push(scalar);
98+
if left_data_type.remove_nullable() == right_data_type.remove_nullable() {
99+
let origin_column = block.get_by_offset(offset).clone();
100+
let mut builder = ColumnBuilder::with_capacity(left_data_type, block.num_rows());
101+
let value = origin_column.value.as_ref();
102+
for idx in 0..block.num_rows() {
103+
let scalar = value.index(idx).unwrap();
104+
builder.push(scalar);
105+
}
106+
let col = builder.build();
107+
columns.push(BlockEntry::new(left_data_type.clone(), Value::Column(col)));
108+
} else {
109+
return Err(ErrorCode::IllegalDataType(
110+
"The data type on both sides of the union does not match",
111+
));
114112
}
115-
let col = builder.build();
116-
Ok(BlockEntry::new(left_data_type.clone(), Value::Column(col)))
117-
} else {
118-
Err(ErrorCode::IllegalDataType(
119-
"The data type on both sides of the union does not match",
120-
))
121113
}
114+
Ok(columns)
122115
}
123116
}
124117

@@ -148,12 +141,7 @@ impl Processor for TransformMergeBlock {
148141
return Ok(Event::NeedConsume);
149142
}
150143

151-
if self.input_data.is_some() || self.receiver_result.is_some() {
152-
return Ok(Event::Sync);
153-
}
154-
155-
if let Ok(result) = self.receiver.try_recv() {
156-
self.receiver_result = Some(result);
144+
if self.input_data.is_some() || !self.receiver_results.is_empty() {
157145
return Ok(Event::Sync);
158146
}
159147

@@ -175,28 +163,25 @@ impl Processor for TransformMergeBlock {
175163
}
176164

177165
fn process(&mut self) -> Result<()> {
166+
let mut blocks = vec![];
167+
for (idx, receive_result) in self.receiver_results.iter() {
168+
blocks.push(self.project_block(receive_result.clone(), Some(*idx))?);
169+
}
170+
self.receiver_results.clear();
178171
if let Some(input_data) = self.input_data.take() {
179-
if let Some(receiver_result) = self.receiver_result.take() {
180-
self.output_data = Some(DataBlock::concat(&[
181-
self.project_block(input_data, true)?,
182-
self.project_block(receiver_result, false)?,
183-
])?);
184-
} else {
185-
self.output_data = Some(self.project_block(input_data, true)?);
186-
}
187-
} else if let Some(receiver_result) = self.receiver_result.take() {
188-
self.output_data = Some(self.project_block(receiver_result, false)?);
172+
blocks.push(self.project_block(input_data, None)?);
189173
}
190-
174+
self.output_data = Some(DataBlock::concat(&blocks)?);
191175
Ok(())
192176
}
193177

194178
#[async_backtrace::framed]
195179
async fn async_process(&mut self) -> Result<()> {
196180
if !self.finished {
197-
if let Ok(result) = self.receiver.recv().await {
198-
self.receiver_result = Some(result);
199-
return Ok(());
181+
for (idx, receiver) in self.receivers.iter() {
182+
if let Ok(result) = receiver.recv().await {
183+
self.receiver_results.push((*idx, result));
184+
}
200185
}
201186
self.finished = true;
202187
}

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -255,27 +255,24 @@ impl PhysicalPlanReplacer for Fragmenter {
255255

256256
fn replace_union(&mut self, plan: &UnionAll) -> Result<PhysicalPlan> {
257257
let mut fragments = vec![];
258-
let left_input = self.replace(plan.left.as_ref())?;
259-
let left_state = self.state.clone();
260-
261-
// Consume current fragments to prevent them being consumed by `right_input`.
262-
fragments.append(&mut self.fragments);
263-
let right_input = self.replace(plan.right.as_ref())?;
264-
let right_state = self.state.clone();
265-
266-
fragments.append(&mut self.fragments);
258+
let mut children = vec![];
259+
let mut states = vec![];
260+
for child in plan.children.iter() {
261+
children.push(Box::new(self.replace(child)?));
262+
states.push(self.state.clone());
263+
fragments.append(&mut self.fragments);
264+
}
267265
self.fragments = fragments;
268266

269267
// If any of the input is a source fragment, the union all is a source fragment.
270-
if left_state == State::SelectLeaf || right_state == State::SelectLeaf {
268+
if states.iter().any(|state| state == &State::SelectLeaf) {
271269
self.state = State::SelectLeaf;
272270
} else {
273271
self.state = State::Other;
274272
}
275273

276274
Ok(PhysicalPlan::UnionAll(UnionAll {
277-
left: Box::new(left_input),
278-
right: Box::new(right_input),
275+
children,
279276
..plan.clone()
280277
}))
281278
}

src/query/sql/src/executor/format.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,14 @@ impl PhysicalPlan {
159159
))
160160
}
161161
PhysicalPlan::UnionAll(union_all) => {
162-
let left_child = union_all.left.format_join(metadata)?;
163-
let right_child = union_all.right.format_join(metadata)?;
164-
165-
let children = vec![
166-
FormatTreeNode::with_children("Left".to_string(), vec![left_child]),
167-
FormatTreeNode::with_children("Right".to_string(), vec![right_child]),
168-
];
169-
162+
let mut children = Vec::with_capacity(union_all.children.len());
163+
for (idx, child) in union_all.children.iter().enumerate() {
164+
let child = child.format_join(metadata)?;
165+
children.push(FormatTreeNode::with_children(
166+
format!("#{:?}", idx + 1),
167+
vec![child],
168+
))
169+
}
170170
Ok(FormatTreeNode::with_children(
171171
"UnionAll".to_string(),
172172
children,
@@ -1014,10 +1014,9 @@ fn union_all_to_format_tree(
10141014

10151015
append_profile_info(&mut children, profs, plan.plan_id);
10161016

1017-
children.extend(vec![
1018-
to_format_tree(&plan.left, metadata, profs)?,
1019-
to_format_tree(&plan.right, metadata, profs)?,
1020-
]);
1017+
for child in plan.children.iter() {
1018+
children.push(to_format_tree(child, metadata, profs)?);
1019+
}
10211020

10221021
Ok(FormatTreeNode::with_children(
10231022
"UnionAll".to_string(),

src/query/sql/src/executor/physical_plan.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,9 @@ impl PhysicalPlan {
230230
PhysicalPlan::UnionAll(plan) => {
231231
plan.plan_id = *next_id;
232232
*next_id += 1;
233-
plan.left.adjust_plan_id(next_id);
234-
plan.right.adjust_plan_id(next_id);
233+
for child in plan.children.iter_mut() {
234+
child.adjust_plan_id(next_id);
235+
}
235236
}
236237
PhysicalPlan::CteScan(plan) => {
237238
plan.plan_id = *next_id;
@@ -573,9 +574,7 @@ impl PhysicalPlan {
573574
),
574575
PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())),
575576
PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())),
576-
PhysicalPlan::UnionAll(plan) => Box::new(
577-
std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())),
578-
),
577+
PhysicalPlan::UnionAll(plan) => Box::new(plan.children.iter().map(|child| &(**child))),
579578
PhysicalPlan::DistributedInsertSelect(plan) => {
580579
Box::new(std::iter::once(plan.input.as_ref()))
581580
}
@@ -823,11 +822,6 @@ impl PhysicalPlan {
823822
PhysicalPlan::CteScan(v) => {
824823
format!("CTE index: {}, sub index: {}", v.cte_idx.0, v.cte_idx.1)
825824
}
826-
PhysicalPlan::UnionAll(v) => v
827-
.pairs
828-
.iter()
829-
.map(|(l, r)| format!("#{} <- #{}", l, r))
830-
.join(", "),
831825
_ => String::new(),
832826
})
833827
}

src/query/sql/src/executor/physical_plan_visitor.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -370,14 +370,15 @@ pub trait PhysicalPlanReplacer {
370370
}
371371

372372
fn replace_union(&mut self, plan: &UnionAll) -> Result<PhysicalPlan> {
373-
let left = self.replace(&plan.left)?;
374-
let right = self.replace(&plan.right)?;
373+
let mut children = Vec::with_capacity(plan.children.len());
374+
for child in plan.children.iter() {
375+
children.push(Box::new(self.replace(child)?));
376+
}
375377
Ok(PhysicalPlan::UnionAll(UnionAll {
376378
plan_id: plan.plan_id,
377-
left: Box::new(left),
378-
right: Box::new(right),
379+
children,
379380
schema: plan.schema.clone(),
380-
pairs: plan.pairs.clone(),
381+
output_cols: plan.output_cols.clone(),
381382
stat_info: plan.stat_info.clone(),
382383
}))
383384
}
@@ -671,8 +672,9 @@ impl PhysicalPlan {
671672
Self::traverse(&plan.input, pre_visit, visit, post_visit);
672673
}
673674
PhysicalPlan::UnionAll(plan) => {
674-
Self::traverse(&plan.left, pre_visit, visit, post_visit);
675-
Self::traverse(&plan.right, pre_visit, visit, post_visit);
675+
for child in plan.children.iter() {
676+
Self::traverse(child, pre_visit, visit, post_visit);
677+
}
676678
}
677679
PhysicalPlan::DistributedInsertSelect(plan) => {
678680
Self::traverse(&plan.input, pre_visit, visit, post_visit);

0 commit comments

Comments
 (0)