Skip to content

Commit c672b69

Browse files
committed
InputBoundStream
1 parent 56ae36e commit c672b69

File tree

3 files changed

+101
-1
lines changed

3 files changed

+101
-1
lines changed

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mod sort_collect;
3333
mod sort_combine;
3434
mod sort_exchange;
3535
mod sort_exchange_injector;
36+
mod sort_merge_stream;
3637
mod sort_restore;
3738
mod sort_route;
3839
mod sort_shuffle;
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::marker::PhantomData;
16+
use std::sync::Arc;
17+
18+
use databend_common_exception::Result;
19+
use databend_common_expression::BlockMetaInfoDowncast;
20+
use databend_common_expression::Column;
21+
use databend_common_expression::DataBlock;
22+
use databend_common_expression::Scalar;
23+
use databend_common_pipeline_core::processors::InputPort;
24+
use databend_common_pipeline_transforms::sort::Rows;
25+
use databend_common_pipeline_transforms::sort::SortedStream;
26+
27+
use crate::pipelines::processors::transforms::SortBound;
28+
29+
/// InputBoundStream is a stream of blocks that are cutoff less or equal than bound.
30+
struct InputBoundStream<R: Rows> {
31+
data: Option<DataBlock>,
32+
input: Arc<InputPort>,
33+
remove_order_col: bool,
34+
bound: Option<Scalar>,
35+
sort_row_offset: usize,
36+
_r: PhantomData<R>,
37+
}
38+
39+
impl<R: Rows> SortedStream for InputBoundStream<R> {
40+
fn next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> {
41+
if self.pull()? {
42+
return Ok((None, true));
43+
}
44+
45+
match self.take_next_bounded_block() {
46+
None => Ok((None, false)),
47+
Some(mut block) => {
48+
let col = sort_column(&block, self.sort_row_offset).clone();
49+
if self.remove_order_col {
50+
block.remove_column(self.sort_row_offset);
51+
}
52+
Ok((Some((block, col)), false))
53+
}
54+
}
55+
}
56+
}
57+
58+
fn sort_column(data: &DataBlock, sort_row_offset: usize) -> &Column {
59+
data.get_by_offset(sort_row_offset).as_column().unwrap()
60+
}
61+
62+
impl<R: Rows> InputBoundStream<R> {
63+
fn pull(&mut self) -> Result<bool> {
64+
if self.data.is_some() {
65+
return Ok(false);
66+
}
67+
68+
if self.input.has_data() {
69+
let block = self.input.pull_data().unwrap()?;
70+
self.input.set_need_data();
71+
self.data = Some(block);
72+
Ok(false)
73+
} else if self.input.is_finished() {
74+
Ok(false)
75+
} else {
76+
self.input.set_need_data();
77+
Ok(true)
78+
}
79+
}
80+
81+
fn take_next_bounded_block(&mut self) -> Option<DataBlock> {
82+
let Some(bound) = &self.bound else {
83+
return self.data.take();
84+
};
85+
86+
let meta = self
87+
.data
88+
.as_ref()?
89+
.get_meta()
90+
.and_then(SortBound::downcast_ref_from)
91+
.expect("require a SortBound");
92+
93+
if meta.bound.as_ref() == Some(bound) {
94+
self.data.take()
95+
} else {
96+
None
97+
}
98+
}
99+
}

src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -865,7 +865,7 @@ impl<R: Rows, S: Spill> BoundBlockStream<R, S> {
865865
}
866866
}
867867

868-
fn block_split_off_position<R: Rows>(
868+
pub fn block_split_off_position<R: Rows>(
869869
data: &DataBlock,
870870
bound: &Scalar,
871871
sort_row_offset: usize,

0 commit comments

Comments
 (0)