Skip to content

Commit 43a11e9

Browse files
committed
SortBoundEdge
1 parent 0944c51 commit 43a11e9

File tree

6 files changed

+153
-42
lines changed

6 files changed

+153
-42
lines changed

src/query/expression/src/block.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,11 @@ pub trait BlockMetaInfoDowncast: Sized + BlockMetaInfo {
340340
Err(boxed)
341341
}
342342
}
343+
344+
fn downcast_mut_from(boxed: &mut BlockMetaInfoPtr) -> Option<&mut Self> {
345+
let boxed = boxed.as_mut() as &mut dyn Any;
346+
boxed.downcast_mut()
347+
}
343348
}
344349

345350
impl<T: BlockMetaInfo> BlockMetaInfoDowncast for T {}
@@ -450,11 +455,6 @@ impl DataBlock {
450455
DataBlock::new_with_meta(vec![], 0, Some(meta))
451456
}
452457

453-
#[inline]
454-
pub fn take_meta(&mut self) -> Option<BlockMetaInfoPtr> {
455-
self.meta.take()
456-
}
457-
458458
#[inline]
459459
pub fn columns(&self) -> &[BlockEntry] {
460460
&self.entries
@@ -721,6 +721,16 @@ impl DataBlock {
721721
})
722722
}
723723

724+
#[inline]
725+
pub fn take_meta(&mut self) -> Option<BlockMetaInfoPtr> {
726+
self.meta.take()
727+
}
728+
729+
#[inline]
730+
pub fn mut_meta(&mut self) -> Option<&mut BlockMetaInfoPtr> {
731+
self.meta.as_mut()
732+
}
733+
724734
#[inline]
725735
pub fn replace_meta(&mut self, meta: BlockMetaInfoPtr) {
726736
self.meta.replace(meta);

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,13 @@ impl BlockMetaInfo for SortExchangeMeta {
107107
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
108108
pub struct SortBound {
109109
bound_index: u32,
110+
more: bool,
110111
}
111112

112113
impl SortBound {
113-
fn create(bound_index: u32) -> Box<dyn BlockMetaInfo> {
114+
fn create(bound_index: u32, more: bool) -> Box<dyn BlockMetaInfo> {
114115
debug_assert!(bound_index != u32::MAX);
115-
SortBound { bound_index }.boxed()
116+
SortBound { bound_index, more }.boxed()
116117
}
117118
}
118119

src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ where A: SortAlgorithm + 'static
117117

118118
fn process(&mut self) -> Result<()> {
119119
if let Some(block) = self.inner.as_mut().ok().unwrap().next_block()? {
120-
self.output_data = Some(block.add_meta(Some(SortBound::create(self.bound_index)))?);
120+
self.output_data =
121+
Some(block.add_meta(Some(SortBound::create(self.bound_index, todo!())))?);
121122
};
122123
Ok(())
123124
}

src/query/service/src/pipelines/processors/transforms/sort/sort_restore.rs

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::any::Any;
1516
use std::sync::Arc;
1617

1718
use databend_common_exception::Result;
@@ -20,6 +21,7 @@ use databend_common_expression::DataBlock;
2021
use databend_common_pipeline_core::processors::Event;
2122
use databend_common_pipeline_core::processors::InputPort;
2223
use databend_common_pipeline_core::processors::OutputPort;
24+
use databend_common_pipeline_core::processors::Processor;
2325
use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm;
2426
use databend_common_pipeline_transforms::HookTransform;
2527
use databend_common_pipeline_transforms::HookTransformer;
@@ -119,7 +121,7 @@ where
119121
finish,
120122
} = spill_sort.on_restore().await?;
121123
if let Some(block) = block {
122-
let mut block = block.add_meta(Some(SortBound::create(bound_index)))?;
124+
let mut block = block.add_meta(Some(SortBound::create(bound_index, true)))?;
123125
if self.remove_order_col {
124126
block.pop_columns(1);
125127
}
@@ -131,3 +133,90 @@ where
131133
Ok(())
132134
}
133135
}
136+
137+
struct SortBoundEdge {
138+
input: Arc<InputPort>,
139+
output: Arc<OutputPort>,
140+
data: Option<DataBlock>,
141+
}
142+
143+
impl SortBoundEdge {
144+
pub fn new(input: Arc<InputPort>, output: Arc<OutputPort>) -> Self {
145+
Self {
146+
input,
147+
output,
148+
data: None,
149+
}
150+
}
151+
}
152+
153+
impl Processor for SortBoundEdge {
154+
fn name(&self) -> String {
155+
String::from("SortBoundEdgeTransform")
156+
}
157+
158+
fn as_any(&mut self) -> &mut dyn Any {
159+
self
160+
}
161+
162+
fn event(&mut self) -> Result<Event> {
163+
if self.output.is_finished() {
164+
self.input.finish();
165+
return Ok(Event::Finished);
166+
}
167+
168+
if !self.output.can_push() {
169+
self.input.set_not_need_data();
170+
return Ok(Event::NeedConsume);
171+
}
172+
173+
if self.data.is_none() {
174+
if self.input.is_finished() {
175+
self.output.finish();
176+
return Ok(Event::Finished);
177+
}
178+
let Some(block) = self.input.pull_data().transpose()? else {
179+
self.input.set_need_data();
180+
return Ok(Event::NeedData);
181+
};
182+
self.data = Some(block);
183+
}
184+
185+
if self.input.is_finished() {
186+
let mut block = self.data.take().unwrap();
187+
let mut meta = block
188+
.take_meta()
189+
.and_then(SortBound::downcast_from)
190+
.expect("require a SortBound");
191+
meta.more = false;
192+
self.output
193+
.push_data(Ok(block.add_meta(Some(meta.boxed()))?));
194+
self.output.finish();
195+
return Ok(Event::Finished);
196+
}
197+
198+
let Some(incoming) = self.input.pull_data().transpose()? else {
199+
self.input.set_need_data();
200+
return Ok(Event::NeedData);
201+
};
202+
203+
let incoming_index = incoming
204+
.get_meta()
205+
.and_then(SortBound::downcast_ref_from)
206+
.expect("require a SortBound")
207+
.bound_index;
208+
209+
let mut output = self.data.replace(incoming).unwrap();
210+
let mut output_meta = output
211+
.mut_meta()
212+
.and_then(SortBound::downcast_mut_from)
213+
.expect("require a SortBound");
214+
215+
if output_meta.bound_index != incoming_index {
216+
output_meta.more = false;
217+
}
218+
219+
self.output.push_data(Ok(output));
220+
Ok(Event::NeedConsume)
221+
}
222+
}

src/query/service/src/pipelines/processors/transforms/sort/sort_route.rs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use std::any::Any;
1616
use std::sync::Arc;
1717

1818
use databend_common_exception::Result;
19+
use databend_common_expression::BlockMetaInfoDowncast;
20+
use databend_common_expression::DataBlock;
1921
use databend_common_pipeline_core::processors::Event;
2022
use databend_common_pipeline_core::processors::InputPort;
2123
use databend_common_pipeline_core::processors::OutputPort;
@@ -25,42 +27,51 @@ use databend_common_pipeline_core::Pipe;
2527
use databend_common_pipeline_core::PipeItem;
2628
use databend_common_pipeline_core::Pipeline;
2729

30+
use crate::pipelines::processors::transforms::SortBound;
31+
2832
struct TransformSortRoute {
2933
inputs: Vec<Arc<InputPort>>,
3034
output: Arc<OutputPort>,
31-
cur_input: usize,
35+
36+
input_data: Vec<Option<(DataBlock, u32)>>,
37+
cur_index: u32,
3238
}
3339

3440
impl TransformSortRoute {
3541
fn new(inputs: Vec<Arc<InputPort>>, output: Arc<OutputPort>) -> Self {
3642
Self {
43+
input_data: vec![None; inputs.len()],
44+
cur_index: 0,
3745
inputs,
3846
output,
39-
cur_input: 0,
4047
}
4148
}
4249

43-
fn process(&mut self) -> Result<()> {
44-
for (i, input) in self.inputs.iter().enumerate() {
45-
if i != self.cur_input {
46-
if !input.is_finished() && !input.has_data() {
47-
input.set_need_data();
48-
}
49-
continue;
50-
}
51-
52-
if input.is_finished() {
53-
self.cur_input = i + 1;
54-
continue;
55-
}
50+
fn process(&mut self) -> Result<Event> {
51+
for input in &self.inputs {
52+
input.set_need_data();
53+
}
5654

57-
match input.pull_data() {
58-
Some(data) => self.output.push_data(data),
59-
None => input.set_need_data(),
60-
}
55+
for (input, data) in self.inputs.iter().zip(self.input_data.iter_mut()) {
56+
if data.is_none() {
57+
let Some(mut block) = input.pull_data().transpose()? else {
58+
continue;
59+
};
60+
61+
let bound_index = block
62+
.take_meta()
63+
.and_then(SortBound::downcast_from)
64+
.expect("require a SortBound")
65+
.bound_index;
66+
if bound_index == self.cur_index {
67+
self.output.push_data(Ok(block));
68+
return Ok(Event::NeedConsume);
69+
}
70+
*data = Some((block, bound_index));
71+
};
6172
}
6273

63-
Ok(())
74+
Ok(Event::NeedData)
6475
}
6576
}
6677

src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -170,16 +170,16 @@ where A: SortAlgorithm
170170
sort.choice_streams_by_bound();
171171
}
172172

173-
if sort.current.len() > sort.params.num_merge {
174-
sort.merge_current(&self.base).await?;
175-
Ok(OutputData {
176-
block: None,
177-
bound: (u32::MAX, None),
178-
finish: false,
179-
})
180-
} else {
181-
sort.restore_and_output(&self.base).await
173+
if sort.current.len() <= sort.params.num_merge {
174+
return sort.restore_and_output(&self.base).await;
182175
}
176+
177+
sort.merge_current(&self.base).await?;
178+
Ok(OutputData {
179+
block: None,
180+
bound: (u32::MAX, None),
181+
finish: false,
182+
})
183183
}
184184

185185
pub fn max_rows(&self) -> usize {
@@ -380,10 +380,9 @@ impl<A: SortAlgorithm> StepSort<A> {
380380
let bound = (self.bound_index as _, s.bound.clone());
381381

382382
if !s.is_empty() {
383-
if s.should_include_first() {
384-
self.current.push(s);
385-
} else {
386-
self.subsequent.push(s);
383+
match s.should_include_first() {
384+
true => self.current.push(s),
385+
false => self.subsequent.push(s),
387386
}
388387
return Ok(OutputData {
389388
block,

0 commit comments

Comments
 (0)