Skip to content

Commit a4fc585

Browse files
committed
route
1 parent 43a11e9 commit a4fc585

File tree

6 files changed

+107
-69
lines changed

6 files changed

+107
-69
lines changed

src/query/expression/src/block.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ pub trait BlockMetaInfoDowncast: Sized + BlockMetaInfo {
341341
}
342342
}
343343

344-
fn downcast_mut_from(boxed: &mut BlockMetaInfoPtr) -> Option<&mut Self> {
344+
fn downcast_mut(boxed: &mut BlockMetaInfoPtr) -> Option<&mut Self> {
345345
let boxed = boxed.as_mut() as &mut dyn Any;
346346
boxed.downcast_mut()
347347
}

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,16 @@ impl BlockMetaInfo for SortExchangeMeta {
104104
}
105105
}
106106

107-
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
107+
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
108108
pub struct SortBound {
109-
bound_index: u32,
110-
more: bool,
109+
index: u32,
110+
next: Option<u32>,
111111
}
112112

113113
impl SortBound {
114-
fn create(bound_index: u32, more: bool) -> Box<dyn BlockMetaInfo> {
115-
debug_assert!(bound_index != u32::MAX);
116-
SortBound { bound_index, more }.boxed()
114+
fn create(index: u32, next: Option<u32>) -> Box<dyn BlockMetaInfo> {
115+
debug_assert!(index != u32::MAX);
116+
SortBound { index, next }.boxed()
117117
}
118118
}
119119

@@ -124,6 +124,6 @@ impl BlockMetaInfo for SortBound {
124124
}
125125

126126
fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
127-
Box::new(self.clone())
127+
Box::new(*self)
128128
}
129129
}

src/query/service/src/pipelines/processors/transforms/sort/sort_exchange_injector.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl ExchangeInjector for SortInjector {
4242
DataExchange::Merge(_) | DataExchange::Broadcast(_) => unreachable!(),
4343
DataExchange::ShuffleDataExchange(exchange) => {
4444
Ok(Arc::new(Box::new(SortBoundScatter {
45-
partitions: exchange.destination_ids.len() as _,
45+
partitions: exchange.destination_ids.len(),
4646
})))
4747
}
4848
}
@@ -84,7 +84,7 @@ impl ExchangeInjector for SortInjector {
8484
}
8585

8686
pub struct SortBoundScatter {
87-
partitions: u32,
87+
partitions: usize,
8888
}
8989

9090
impl FlightScatter for SortBoundScatter {
@@ -97,17 +97,17 @@ impl FlightScatter for SortBoundScatter {
9797
}
9898
}
9999

100-
pub(super) fn bound_scatter(data_block: DataBlock, n: u32) -> Result<Vec<DataBlock>> {
101-
let meta = data_block
100+
pub(super) fn bound_scatter(data_block: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
101+
let meta = *data_block
102102
.get_meta()
103103
.and_then(SortBound::downcast_ref_from)
104104
.unwrap();
105105

106-
let index = meta.bound_index % n;
106+
let empty = data_block.slice(0..0);
107+
let mut result = vec![empty; n];
108+
result[meta.index as usize % n] = data_block;
107109

108-
Ok(std::iter::repeat_n(DataBlock::empty(), index as _)
109-
.chain(Some(data_block))
110-
.collect())
110+
Ok(result)
111111
}
112112

113113
// pub struct TransformExchangeSortSerializer {

src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs

Lines changed: 62 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ where A: SortAlgorithm
6565
data: None,
6666
input: input.clone(),
6767
remove_order_col,
68-
bound_index: None,
68+
bound: None,
6969
sort_row_offset: schema.fields().len() - 1,
7070
_r: PhantomData,
7171
})
@@ -78,7 +78,7 @@ where A: SortAlgorithm
7878
block_size,
7979
limit,
8080
output_data: None,
81-
bound_index: u32::MAX,
81+
bound_index: 0,
8282
inner: Err(streams),
8383
})
8484
}
@@ -116,9 +116,12 @@ where A: SortAlgorithm + 'static
116116
}
117117

118118
fn process(&mut self) -> Result<()> {
119-
if let Some(block) = self.inner.as_mut().ok().unwrap().next_block()? {
120-
self.output_data =
121-
Some(block.add_meta(Some(SortBound::create(self.bound_index, todo!())))?);
119+
let merger = self.inner.as_mut().ok().unwrap();
120+
if let Some(block) = merger.next_block()? {
121+
self.output_data = Some(block.add_meta(Some(SortBound::create(
122+
self.bound_index,
123+
(!merger.is_finished()).then_some(self.bound_index),
124+
)))?);
122125
};
123126
Ok(())
124127
}
@@ -134,42 +137,52 @@ where A: SortAlgorithm + 'static
134137
if !merger.is_finished() {
135138
return Ok(Event::Sync);
136139
}
140+
self.bound_index += 1;
137141
let merger = std::mem::replace(inner, Err(vec![])).ok().unwrap();
138142
self.inner = Err(merger.streams());
139143
self.inner.as_mut().err().unwrap()
140144
}
141145
Err(streams) => streams,
142146
};
143147

144-
let mut bounds = Vec::with_capacity(streams.len());
145-
for stream in streams.iter_mut() {
146-
if stream.pull()? {
147-
return Ok(Event::NeedData);
148-
}
149-
let Some(data) = &stream.data else {
150-
continue;
151-
};
152-
let meta = data
153-
.get_meta()
154-
.and_then(SortBound::downcast_ref_from)
155-
.expect("require a SortBound");
156-
bounds.push(meta.bound_index)
148+
if streams.iter().all(|stream| stream.input.is_finished()) {
149+
return Ok(Event::Finished);
157150
}
158151

159-
let bound_index = match bounds.iter().min() {
160-
Some(index) => *index,
161-
None => return Ok(Event::Finished),
162-
};
163-
assert!(self.bound_index != u32::MAX || bound_index > self.bound_index);
164-
self.bound_index = bound_index;
152+
// {
153+
// for stream in streams.iter_mut() {
154+
// stream.pull()?;
155+
// }
156+
157+
// if streams
158+
// .iter_mut()
159+
// .map(|stream| stream.pull())
160+
// .try_fold(false, |acc, pending| acc || pending?)?
161+
// {
162+
// return Ok(Event::NeedData);
163+
// }
164+
165+
// if bounds.is_empty() {
166+
// return Ok(Event::Finished);
167+
// }
168+
169+
// if bounds.iter().all(|meta| meta.index != self.bound_index) {
170+
// let meta = match bounds.iter().min_by_key(|meta| meta.index) {
171+
// Some(index) => *index,
172+
// None => return Ok(Event::Finished),
173+
// };
174+
// assert!(meta.index > self.bound_index);
175+
// self.bound_index = meta.index;
176+
// }
177+
// }
178+
165179
for stream in streams.iter_mut() {
166-
stream.bound_index = Some(self.bound_index);
180+
stream.update_bound_index(self.bound_index);
167181
}
168182

169-
let streams = std::mem::take(streams);
170183
self.inner = Ok(Merger::create(
171184
self.schema.clone(),
172-
streams,
185+
std::mem::take(streams),
173186
self.block_size,
174187
self.limit,
175188
));
@@ -181,20 +194,29 @@ struct BoundedInputStream<R: Rows> {
181194
data: Option<DataBlock>,
182195
input: Arc<InputPort>,
183196
remove_order_col: bool,
184-
bound_index: Option<u32>,
185197
sort_row_offset: usize,
198+
bound: Option<Bound>,
186199
_r: PhantomData<R>,
187200
}
188201

202+
#[derive(Debug, Clone, Copy)]
203+
struct Bound {
204+
bound_index: u32,
205+
more: bool,
206+
}
207+
189208
impl<R: Rows> SortedStream for BoundedInputStream<R> {
190209
fn next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> {
191-
if self.pull()? {
210+
if self.bound.unwrap().more && self.pull()? {
192211
return Ok((None, true));
193212
}
194213

195214
match self.take_next_bounded_block() {
196215
None => Ok((None, false)),
197216
Some(mut block) => {
217+
if block.is_empty() {
218+
return Ok((None, true));
219+
}
198220
let col = sort_column(&block, self.sort_row_offset).clone();
199221
if self.remove_order_col {
200222
block.remove_column(self.sort_row_offset);
@@ -236,13 +258,23 @@ impl<R: Rows> BoundedInputStream<R> {
236258
.and_then(SortBound::downcast_ref_from)
237259
.expect("require a SortBound");
238260

239-
if meta.bound_index == self.bound_index.unwrap() {
261+
let bound = self.bound.as_mut().unwrap();
262+
if meta.index == bound.bound_index {
263+
bound.more = meta.next.is_some_and(|next| next == meta.index);
240264
self.data.take().map(|mut data| {
241265
data.take_meta().unwrap();
242266
data
243267
})
244268
} else {
269+
assert!(meta.index > bound.bound_index);
245270
None
246271
}
247272
}
273+
274+
fn update_bound_index(&mut self, bound_index: u32) {
275+
self.bound = Some(Bound {
276+
bound_index,
277+
more: true,
278+
});
279+
}
248280
}

src/query/service/src/pipelines/processors/transforms/sort/sort_restore.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ where
121121
finish,
122122
} = spill_sort.on_restore().await?;
123123
if let Some(block) = block {
124-
let mut block = block.add_meta(Some(SortBound::create(bound_index, true)))?;
124+
let mut block = block.add_meta(Some(SortBound::create(bound_index, None)))?;
125125
if self.remove_order_col {
126126
block.pop_columns(1);
127127
}
@@ -188,7 +188,7 @@ impl Processor for SortBoundEdge {
188188
.take_meta()
189189
.and_then(SortBound::downcast_from)
190190
.expect("require a SortBound");
191-
meta.more = false;
191+
meta.next = None;
192192
self.output
193193
.push_data(Ok(block.add_meta(Some(meta.boxed()))?));
194194
self.output.finish();
@@ -204,17 +204,15 @@ impl Processor for SortBoundEdge {
204204
.get_meta()
205205
.and_then(SortBound::downcast_ref_from)
206206
.expect("require a SortBound")
207-
.bound_index;
207+
.index;
208208

209209
let mut output = self.data.replace(incoming).unwrap();
210-
let mut output_meta = output
210+
let output_meta = output
211211
.mut_meta()
212-
.and_then(SortBound::downcast_mut_from)
212+
.and_then(SortBound::downcast_mut)
213213
.expect("require a SortBound");
214214

215-
if output_meta.bound_index != incoming_index {
216-
output_meta.more = false;
217-
}
215+
output_meta.next = Some(incoming_index);
218216

219217
self.output.push_data(Ok(output));
220218
Ok(Event::NeedConsume)

src/query/service/src/pipelines/processors/transforms/sort/sort_route.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ struct TransformSortRoute {
3333
inputs: Vec<Arc<InputPort>>,
3434
output: Arc<OutputPort>,
3535

36-
input_data: Vec<Option<(DataBlock, u32)>>,
36+
input_data: Vec<Option<(DataBlock, SortBound)>>,
3737
cur_index: u32,
3838
}
3939

@@ -53,22 +53,30 @@ impl TransformSortRoute {
5353
}
5454

5555
for (input, data) in self.inputs.iter().zip(self.input_data.iter_mut()) {
56-
if data.is_none() {
57-
let Some(mut block) = input.pull_data().transpose()? else {
58-
continue;
59-
};
60-
61-
let bound_index = block
62-
.take_meta()
63-
.and_then(SortBound::downcast_from)
64-
.expect("require a SortBound")
65-
.bound_index;
66-
if bound_index == self.cur_index {
67-
self.output.push_data(Ok(block));
68-
return Ok(Event::NeedConsume);
56+
let meta = match data {
57+
Some((_, meta)) => *meta,
58+
None => {
59+
let Some(mut block) = input.pull_data().transpose()? else {
60+
continue;
61+
};
62+
63+
let meta = block
64+
.take_meta()
65+
.and_then(SortBound::downcast_from)
66+
.expect("require a SortBound");
67+
68+
data.insert((block, meta)).1
6969
}
70-
*data = Some((block, bound_index));
7170
};
71+
72+
if meta.index == self.cur_index {
73+
let (block, meta) = data.take().unwrap();
74+
self.output.push_data(Ok(block));
75+
if meta.next.is_none() {
76+
self.cur_index += 1;
77+
}
78+
return Ok(Event::NeedConsume);
79+
}
7280
}
7381

7482
Ok(Event::NeedData)

0 commit comments

Comments
 (0)