Skip to content

Commit bfa0ed6

Browse files
committed
route
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent 370dfc2 commit bfa0ed6

File tree

4 files changed

+122
-45
lines changed

4 files changed

+122
-45
lines changed

src/query/service/src/pipelines/processors/transforms/sort/exchange.rs

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,16 @@
1313
// limitations under the License.
1414

1515
use std::marker::PhantomData;
16-
use std::sync::Arc;
1716

1817
use databend_common_exception::Result;
18+
use databend_common_expression::BlockMetaInfoDowncast;
1919
use databend_common_expression::DataBlock;
2020
use databend_common_pipeline_core::processors::Exchange;
2121
use databend_common_pipeline_transforms::processors::sort::Rows;
2222

23-
use super::wait::SortSampleState;
23+
use super::SortScatteredMeta;
2424

2525
pub struct SortRangeExchange<R: Rows> {
26-
state: Arc<SortSampleState>,
2726
_r: PhantomData<R>,
2827
}
2928

@@ -33,41 +32,22 @@ unsafe impl<R: Rows> Sync for SortRangeExchange<R> {}
3332

3433
impl<R: Rows + 'static> Exchange for SortRangeExchange<R> {
3534
const NAME: &'static str = "SortRange";
36-
fn partition(&self, data: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
37-
if data.is_empty() {
38-
return Ok(vec![]);
39-
}
35+
fn partition(&self, mut data: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
36+
let Some(meta) = data.take_meta() else {
37+
unreachable!();
38+
};
4039

41-
let bounds = self.state.bounds();
42-
// debug_assert_eq!(n, self.state.partitions());
43-
debug_assert!(bounds.len() < n);
40+
let Some(SortScatteredMeta(scattered)) = SortScatteredMeta::downcast_from(meta) else {
41+
unreachable!();
42+
};
4443

45-
if bounds.is_empty() {
46-
return Ok(vec![data]);
47-
}
44+
assert!(scattered.len() <= n);
4845

49-
todo!()
46+
let blocks = scattered
47+
.into_iter()
48+
.map(|meta| DataBlock::empty_with_meta(Box::new(meta)))
49+
.collect();
5050

51-
// let bounds = R::from_column(&bounds.0)?;
52-
// let rows = R::from_column(data.get_last_column())?;
53-
54-
// let mut i = 0;
55-
// let mut j = 0;
56-
// let mut bound = bounds.row(j);
57-
// let mut indices = Vec::new();
58-
// while i < rows.len() {
59-
// match rows.row(i).cmp(&bound) {
60-
// Ordering::Less => indices.push(j as u32),
61-
// Ordering::Greater if j + 1 < bounds.len() => {
62-
// j += 1;
63-
// bound = bounds.row(j);
64-
// continue;
65-
// }
66-
// _ => indices.push(j as u32 + 1),
67-
// }
68-
// i += 1;
69-
// }
70-
71-
// DataBlock::scatter(&data, &indices, n)
51+
Ok(blocks)
7252
}
7353
}

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mod collect;
3636
mod exchange;
3737
mod execute;
3838
mod merge_sort;
39+
mod route;
3940
mod sort_spill;
4041
mod wait;
4142

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::any::Any;
16+
use std::sync::Arc;
17+
18+
use databend_common_exception::Result;
19+
use databend_common_pipeline_core::processors::Event;
20+
use databend_common_pipeline_core::processors::InputPort;
21+
use databend_common_pipeline_core::processors::OutputPort;
22+
use databend_common_pipeline_core::processors::Processor;
23+
24+
pub struct TransformSortRoute {
25+
inputs: Vec<Arc<InputPort>>,
26+
output: Arc<OutputPort>,
27+
cur_input: usize,
28+
}
29+
30+
impl TransformSortRoute {
31+
pub fn new(inputs: Vec<Arc<InputPort>>, output: Arc<OutputPort>) -> Self {
32+
Self {
33+
inputs,
34+
output,
35+
cur_input: 0,
36+
}
37+
}
38+
39+
fn process_input(&mut self) -> Result<()> {
40+
for (i, input) in self.inputs.iter().enumerate() {
41+
if i != self.cur_input {
42+
if !input.is_finished() && !input.has_data() {
43+
input.set_need_data();
44+
}
45+
continue;
46+
}
47+
48+
if input.is_finished() {
49+
self.cur_input = i + 1;
50+
continue;
51+
}
52+
53+
match input.pull_data() {
54+
Some(data) => self.output.push_data(data),
55+
None => input.set_need_data(),
56+
}
57+
}
58+
59+
Ok(())
60+
}
61+
}
62+
63+
impl Processor for TransformSortRoute {
64+
fn name(&self) -> String {
65+
"SortRoute".to_string()
66+
}
67+
68+
fn as_any(&mut self) -> &mut dyn Any {
69+
self
70+
}
71+
72+
fn event(&mut self) -> Result<Event> {
73+
if self.output.is_finished() {
74+
for input in &self.inputs {
75+
input.finish();
76+
}
77+
return Ok(Event::Finished);
78+
}
79+
80+
if !self.output.can_push() {
81+
for input in &self.inputs {
82+
input.set_not_need_data();
83+
}
84+
return Ok(Event::NeedConsume);
85+
}
86+
87+
self.process_input()?;
88+
89+
if self.inputs.iter().all(|input| input.is_finished()) {
90+
self.output.finish();
91+
return Ok(Event::Finished);
92+
}
93+
94+
Ok(Event::NeedData)
95+
}
96+
}

src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -947,13 +947,13 @@ mod tests {
947947

948948
async fn run_bound_block_stream<R: Rows>(
949949
spiller: impl Spill + Clone,
950-
sort_desc: Arc<[SortColumnDescription]>,
950+
sort_desc: &[SortColumnDescription],
951951
bound: Scalar,
952952
block_part: usize,
953953
want: Column,
954954
) -> Result<()> {
955955
let (schema, block) = test_data();
956-
let block = DataBlock::sort(&block, &sort_desc, None)?;
956+
let block = DataBlock::sort(&block, sort_desc, None)?;
957957
let bound = Some(bound);
958958
let sort_row_offset = schema.fields().len();
959959

@@ -963,7 +963,7 @@ mod tests {
963963
]
964964
.into_iter()
965965
.map(|mut data| {
966-
let col = convert_rows(schema.clone(), &sort_desc, data.clone()).unwrap();
966+
let col = convert_rows(schema.clone(), sort_desc, data.clone()).unwrap();
967967
data.add_column(BlockEntry::new(col.data_type(), Value::Column(col)));
968968
SpillableBlock::new(data, sort_row_offset)
969969
})
@@ -991,15 +991,15 @@ mod tests {
991991
};
992992

993993
{
994-
let sort_desc = Arc::new([SortColumnDescription {
994+
let sort_desc = [SortColumnDescription {
995995
offset: 0,
996996
asc: true,
997997
nulls_first: false,
998-
}]);
998+
}];
999999

10001000
run_bound_block_stream::<SimpleRowsAsc<Int32Type>>(
10011001
spiller.clone(),
1002-
sort_desc.clone(),
1002+
&sort_desc,
10031003
Scalar::Number(NumberScalar::Int32(5)),
10041004
4,
10051005
Int32Type::from_data(vec![3, 5]),
@@ -1008,7 +1008,7 @@ mod tests {
10081008

10091009
run_bound_block_stream::<SimpleRowsAsc<Int32Type>>(
10101010
spiller.clone(),
1011-
sort_desc.clone(),
1011+
&sort_desc,
10121012
Scalar::Number(NumberScalar::Int32(8)),
10131013
4,
10141014
Int32Type::from_data(vec![3, 5, 7, 7]),
@@ -1017,15 +1017,15 @@ mod tests {
10171017
}
10181018

10191019
{
1020-
let sort_desc = Arc::new([SortColumnDescription {
1020+
let sort_desc = [SortColumnDescription {
10211021
offset: 1,
10221022
asc: false,
10231023
nulls_first: false,
1024-
}]);
1024+
}];
10251025

10261026
run_bound_block_stream::<SimpleRowsDesc<StringType>>(
10271027
spiller.clone(),
1028-
sort_desc.clone(),
1028+
&sort_desc,
10291029
Scalar::String("f".to_string()),
10301030
4,
10311031
StringType::from_data(vec!["w", "h", "g", "f"]),

0 commit comments

Comments
 (0)