Skip to content

Commit c5db85b

Browse files
committed
BoundedMultiSortMergeProcessor
1 parent c672b69 commit c5db85b

File tree

1 file changed

+167
-10
lines changed

1 file changed

+167
-10
lines changed

src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs

Lines changed: 167 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,180 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::any::Any;
16+
use std::cmp::Ordering;
1517
use std::marker::PhantomData;
1618
use std::sync::Arc;
1719

1820
use databend_common_exception::Result;
1921
use databend_common_expression::BlockMetaInfoDowncast;
2022
use databend_common_expression::Column;
2123
use databend_common_expression::DataBlock;
24+
use databend_common_expression::DataSchemaRef;
2225
use databend_common_expression::Scalar;
26+
use databend_common_pipeline_core::processors::Event;
2327
use databend_common_pipeline_core::processors::InputPort;
28+
use databend_common_pipeline_core::processors::OutputPort;
29+
use databend_common_pipeline_core::processors::Processor;
30+
use databend_common_pipeline_transforms::sort::algorithm::SortAlgorithm;
31+
use databend_common_pipeline_transforms::sort::Merger;
2432
use databend_common_pipeline_transforms::sort::Rows;
2533
use databend_common_pipeline_transforms::sort::SortedStream;
2634

2735
use crate::pipelines::processors::transforms::SortBound;
2836

29-
/// InputBoundStream is a stream of blocks that are cutoff less or equal than bound.
30-
struct InputBoundStream<R: Rows> {
37+
type Stream<A> = BoundedInputStream<<A as SortAlgorithm>::Rows>;
38+
39+
pub struct BoundedMultiSortMergeProcessor<A>
40+
where A: SortAlgorithm
41+
{
42+
inputs: Vec<Arc<InputPort>>,
43+
output: Arc<OutputPort>,
44+
schema: DataSchemaRef,
45+
block_size: usize,
46+
limit: Option<usize>,
47+
48+
output_data: Option<DataBlock>,
49+
bound: Option<Scalar>,
50+
inner: std::result::Result<Merger<A, Stream<A>>, Vec<Stream<A>>>,
51+
}
52+
53+
impl<A> BoundedMultiSortMergeProcessor<A>
54+
where A: SortAlgorithm
55+
{
56+
pub fn create(
57+
inputs: Vec<Arc<InputPort>>,
58+
output: Arc<OutputPort>,
59+
schema: DataSchemaRef,
60+
block_size: usize,
61+
limit: Option<usize>,
62+
remove_order_col: bool,
63+
) -> Result<Self> {
64+
let streams = inputs
65+
.iter()
66+
.map(|input| BoundedInputStream {
67+
data: None,
68+
input: input.clone(),
69+
remove_order_col,
70+
bound: None,
71+
sort_row_offset: schema.fields().len() - 1,
72+
_r: PhantomData,
73+
})
74+
.collect();
75+
76+
Ok(Self {
77+
inputs,
78+
output,
79+
schema,
80+
block_size,
81+
limit,
82+
output_data: None,
83+
bound: None,
84+
inner: Err(streams),
85+
})
86+
}
87+
}
88+
89+
impl<A> Processor for BoundedMultiSortMergeProcessor<A>
90+
where A: SortAlgorithm + 'static
91+
{
92+
fn name(&self) -> String {
93+
"BoundedMultiSortMerge".to_string()
94+
}
95+
96+
fn as_any(&mut self) -> &mut dyn Any {
97+
self
98+
}
99+
100+
fn event(&mut self) -> Result<Event> {
101+
if self.output.is_finished() {
102+
for input in self.inputs.iter() {
103+
input.finish();
104+
}
105+
return Ok(Event::Finished);
106+
}
107+
108+
if !self.output.can_push() {
109+
return Ok(Event::NeedConsume);
110+
}
111+
112+
if let Some(block) = self.output_data.take() {
113+
self.output.push_data(Ok(block));
114+
return Ok(Event::NeedConsume);
115+
}
116+
117+
self.next_event()
118+
}
119+
120+
fn process(&mut self) -> Result<()> {
121+
if let Some(block) = self.inner.as_mut().ok().unwrap().next_block()? {
122+
let meta = SortBound {
123+
bound: self.bound.clone(),
124+
}
125+
.boxed();
126+
self.output_data = Some(block.add_meta(Some(meta))?);
127+
};
128+
Ok(())
129+
}
130+
}
131+
132+
impl<A> BoundedMultiSortMergeProcessor<A>
133+
where A: SortAlgorithm + 'static
134+
{
135+
fn next_event(&mut self) -> Result<Event> {
136+
let streams = match &mut self.inner {
137+
inner @ Ok(_) => {
138+
let merger = inner.as_ref().ok().unwrap();
139+
if !merger.is_finished() {
140+
return Ok(Event::Sync);
141+
}
142+
let merger = std::mem::replace(inner, Err(vec![])).ok().unwrap();
143+
self.inner = Err(merger.streams());
144+
self.inner.as_mut().err().unwrap()
145+
}
146+
Err(streams) => streams,
147+
};
148+
149+
let mut bounds = Vec::with_capacity(streams.len());
150+
for stream in streams.iter_mut() {
151+
if stream.pull()? {
152+
return Ok(Event::NeedData);
153+
}
154+
let Some(data) = &stream.data else {
155+
continue;
156+
};
157+
let meta = data
158+
.get_meta()
159+
.and_then(SortBound::downcast_ref_from)
160+
.expect("require a SortBound");
161+
bounds.push(&meta.bound)
162+
}
163+
164+
self.bound = match bounds.iter().min_by(|a, b| match (a, b) {
165+
(None, None) => Ordering::Equal,
166+
(None, Some(_)) => Ordering::Greater,
167+
(Some(_), None) => Ordering::Less,
168+
(Some(a), Some(b)) => A::Rows::scalar_as_item(a).cmp(&A::Rows::scalar_as_item(b)),
169+
}) {
170+
Some(bound) => (*bound).clone(),
171+
None => return Ok(Event::Finished),
172+
};
173+
for stream in streams.iter_mut() {
174+
stream.bound = self.bound.clone();
175+
}
176+
177+
let streams = std::mem::take(streams);
178+
self.inner = Ok(Merger::create(
179+
self.schema.clone(),
180+
streams,
181+
self.block_size,
182+
self.limit,
183+
));
184+
Ok(Event::Sync)
185+
}
186+
}
187+
188+
struct BoundedInputStream<R: Rows> {
31189
data: Option<DataBlock>,
32190
input: Arc<InputPort>,
33191
remove_order_col: bool,
@@ -36,7 +194,7 @@ struct InputBoundStream<R: Rows> {
36194
_r: PhantomData<R>,
37195
}
38196

39-
impl<R: Rows> SortedStream for InputBoundStream<R> {
197+
impl<R: Rows> SortedStream for BoundedInputStream<R> {
40198
fn next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> {
41199
if self.pull()? {
42200
return Ok((None, true));
@@ -59,7 +217,7 @@ fn sort_column(data: &DataBlock, sort_row_offset: usize) -> &Column {
59217
data.get_by_offset(sort_row_offset).as_column().unwrap()
60218
}
61219

62-
impl<R: Rows> InputBoundStream<R> {
220+
impl<R: Rows> BoundedInputStream<R> {
63221
fn pull(&mut self) -> Result<bool> {
64222
if self.data.is_some() {
65223
return Ok(false);
@@ -79,19 +237,18 @@ impl<R: Rows> InputBoundStream<R> {
79237
}
80238

81239
fn take_next_bounded_block(&mut self) -> Option<DataBlock> {
82-
let Some(bound) = &self.bound else {
83-
return self.data.take();
84-
};
85-
86240
let meta = self
87241
.data
88242
.as_ref()?
89243
.get_meta()
90244
.and_then(SortBound::downcast_ref_from)
91245
.expect("require a SortBound");
92246

93-
if meta.bound.as_ref() == Some(bound) {
94-
self.data.take()
247+
if meta.bound == self.bound {
248+
self.data.take().map(|mut data| {
249+
let _ = data.take_meta();
250+
data
251+
})
95252
} else {
96253
None
97254
}

0 commit comments

Comments
 (0)