Skip to content

Commit 46a3e42

Browse files
committed
fix
1 parent 476ffde commit 46a3e42

File tree

14 files changed

+217
-81
lines changed

14 files changed

+217
-81
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,7 @@ pub fn try_add_multi_sort_merge(
5757
remove_order_col: bool,
5858
enable_loser_tree: bool,
5959
) -> Result<()> {
60-
debug_assert!(if !remove_order_col {
61-
schema.has_field(ORDER_COL_NAME)
62-
} else {
63-
!schema.has_field(ORDER_COL_NAME)
64-
});
60+
debug_assert!(remove_order_col == schema.has_field(ORDER_COL_NAME));
6561

6662
if pipeline.is_empty() {
6763
return Err(ErrorCode::Internal("Cannot resize empty pipe."));

src/query/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ databend-storages-common-session = { workspace = true }
116116
databend-storages-common-stage = { workspace = true }
117117
databend-storages-common-table-meta = { workspace = true }
118118
derive-visitor = { workspace = true }
119+
enum-as-inner = { workspace = true }
119120
ethnum = { workspace = true }
120121
fastrace = { workspace = true }
121122
flatbuffers = { workspace = true }

src/query/service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#![feature(get_mut_unchecked)]
4040
#![allow(clippy::diverging_sub_expression)]
4141
#![allow(clippy::arc_with_non_send_sync)]
42+
#![feature(debug_closure_helpers)]
4243

4344
extern crate core;
4445

src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ use databend_common_pipeline_transforms::TransformSortMergeLimit;
3737

3838
use super::sort_spill::create_memory_merger;
3939
use super::sort_spill::MemoryMerger;
40+
use super::sort_spill::OutputData;
4041
use super::sort_spill::SortSpill;
4142
use super::Base;
4243
use super::MemoryRows;
43-
use crate::pipelines::processors::transforms::sort::sort_spill::OutputData;
4444
use crate::spillers::Spiller;
4545

4646
#[derive(Debug)]

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_expression::BlockMetaInfoDowncast;
2121
use databend_common_expression::DataBlock;
2222
use databend_common_expression::DataSchemaRef;
2323
use databend_common_pipeline_transforms::SortSpillParams;
24+
use enum_as_inner::EnumAsInner;
2425
use sort_spill::SpillableBlock;
2526

2627
use crate::spillers::Spiller;
@@ -95,11 +96,29 @@ impl BlockMetaInfo for SortExchangeMeta {
9596
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
9697
pub struct SortBound {
9798
index: u32,
98-
next: Option<u32>,
99+
next: SortBoundNext,
100+
}
101+
102+
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, EnumAsInner)]
103+
pub enum SortBoundNext {
104+
Uinit,
105+
Next(u32),
106+
More,
107+
Skip,
108+
Last,
109+
}
110+
111+
impl SortBound {
112+
pub fn has_more(&self) -> bool {
113+
match self.next {
114+
SortBoundNext::Next(next) => self.index == next,
115+
_ => unreachable!(),
116+
}
117+
}
99118
}
100119

101120
impl SortBound {
102-
fn create(index: u32, next: Option<u32>) -> Box<dyn BlockMetaInfo> {
121+
fn create(index: u32, next: SortBoundNext) -> Box<dyn BlockMetaInfo> {
103122
debug_assert!(index != u32::MAX);
104123
SortBound { index, next }.boxed()
105124
}

src/query/service/src/pipelines/processors/transforms/sort/sort_broadcast.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@ use databend_common_exception::Result;
2020
use databend_common_expression::BlockMetaInfoDowncast;
2121
use databend_common_expression::DataBlock;
2222
use databend_common_pipeline_core::processors::Event;
23+
use databend_common_pipeline_core::processors::InputPort;
24+
use databend_common_pipeline_core::processors::OutputPort;
2325
use databend_common_pipeline_core::processors::Processor;
2426
use databend_common_pipeline_transforms::processors::sort::Rows;
2527
use databend_common_pipeline_transforms::HookTransform;
2628
use databend_common_pipeline_transforms::HookTransformer;
2729

2830
use super::bounds::Bounds;
2931
use super::SortCollectedMeta;
30-
use crate::pipelines::processors::transforms::sort::SortExchangeMeta;
31-
use crate::pipelines::processors::InputPort;
32-
use crate::pipelines::processors::OutputPort;
32+
use super::SortExchangeMeta;
3333
use crate::sessions::QueryContext;
3434

3535
pub struct TransformSortBoundBroadcast<R: Rows> {

src/query/service/src/pipelines/processors/transforms/sort/sort_exchange_injector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use databend_common_expression::DataBlock;
2020
use databend_common_pipeline_core::Pipeline;
2121
use databend_common_settings::FlightCompression;
2222

23-
use crate::pipelines::processors::transforms::SortBound;
23+
use super::SortBound;
2424
use crate::servers::flight::v1::exchange::DataExchange;
2525
use crate::servers::flight::v1::exchange::DefaultExchangeInjector;
2626
use crate::servers::flight::v1::exchange::ExchangeInjector;

src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16+
use std::collections::VecDeque;
1617
use std::marker::PhantomData;
1718
use std::sync::Arc;
1819

@@ -31,6 +32,7 @@ use databend_common_pipeline_transforms::sort::Rows;
3132
use databend_common_pipeline_transforms::sort::SortedStream;
3233

3334
use super::SortBound;
35+
use super::SortBoundNext;
3436

3537
type Stream<A> = BoundedInputStream<<A as SortAlgorithm>::Rows>;
3638

@@ -42,8 +44,9 @@ where A: SortAlgorithm
4244
schema: DataSchemaRef,
4345
block_size: usize,
4446

45-
output_data: Option<DataBlock>,
47+
output_data: VecDeque<DataBlock>,
4648
cur_index: u32,
49+
cur_output: bool,
4750
inner: std::result::Result<Merger<A, Stream<A>>, Vec<Stream<A>>>,
4851
}
4952

@@ -56,6 +59,8 @@ where A: SortAlgorithm
5659
schema: DataSchemaRef,
5760
block_size: usize,
5861
) -> Result<Self> {
62+
assert!(inputs.len() != 1);
63+
5964
let streams = inputs
6065
.iter()
6166
.map(|input| BoundedInputStream {
@@ -73,8 +78,9 @@ where A: SortAlgorithm
7378
output,
7479
schema,
7580
block_size,
76-
output_data: None,
81+
output_data: VecDeque::new(),
7782
cur_index: 0,
83+
cur_output: false,
7884
inner: Err(streams),
7985
})
8086
}
@@ -103,7 +109,7 @@ where A: SortAlgorithm + 'static
103109
return Ok(Event::NeedConsume);
104110
}
105111

106-
if let Some(block) = self.output_data.take() {
112+
if let Some(block) = self.output_data.pop_front() {
107113
self.output.push_data(Ok(block));
108114
return Ok(Event::NeedConsume);
109115
}
@@ -114,10 +120,16 @@ where A: SortAlgorithm + 'static
114120
fn process(&mut self) -> Result<()> {
115121
let merger = self.inner.as_mut().ok().unwrap();
116122
if let Some(block) = merger.next_block()? {
117-
self.output_data = Some(block.add_meta(Some(SortBound::create(
118-
self.cur_index,
119-
(!merger.is_finished()).then_some(self.cur_index),
120-
)))?);
123+
self.cur_output = true;
124+
self.output_data
125+
.push_back(block.add_meta(Some(SortBound::create(
126+
self.cur_index,
127+
if merger.is_finished() {
128+
SortBoundNext::Last
129+
} else {
130+
SortBoundNext::More
131+
},
132+
)))?);
121133
};
122134
Ok(())
123135
}
@@ -133,6 +145,19 @@ where A: SortAlgorithm + 'static
133145
if !merger.is_finished() {
134146
return Ok(Event::Sync);
135147
}
148+
149+
if !self.cur_output {
150+
let empty = DataBlock::empty_with_schema(self.schema.clone());
151+
self.output_data.push_back(
152+
empty.add_meta(Some(SortBound::create(
153+
self.cur_index,
154+
SortBoundNext::Skip,
155+
)))?,
156+
);
157+
} else {
158+
self.cur_output = false;
159+
}
160+
136161
self.cur_index += 1;
137162
let merger = std::mem::replace(inner, Err(vec![])).ok().unwrap();
138163
self.inner = Err(merger.streams());
@@ -235,7 +260,7 @@ impl<R: Rows> BoundedInputStream<R> {
235260
"meta: {meta:?}, bound: {bound:?}",
236261
);
237262
if meta.index == bound.bound_index {
238-
bound.more = meta.next.is_some_and(|next| next == meta.index);
263+
bound.more = meta.has_more();
239264
self.data.take().map(|mut data| {
240265
data.take_meta().unwrap();
241266
data
@@ -268,11 +293,11 @@ mod tests {
268293

269294
use super::*;
270295

271-
fn create_block(empty: bool, index: u32, next: Option<u32>) -> DataBlock {
296+
fn create_block(empty: bool, index: u32, next: u32) -> DataBlock {
272297
let block = DataBlock::new_from_columns(vec![Int32Type::from_data(vec![1, 2, 3])]);
273298
let block = if empty { block.slice(0..0) } else { block };
274299
block
275-
.add_meta(Some(SortBound::create(index, next)))
300+
.add_meta(Some(SortBound::create(index, SortBoundNext::Next(next))))
276301
.unwrap()
277302
}
278303

@@ -311,7 +336,7 @@ mod tests {
311336
}
312337

313338
{
314-
let block = create_block(true, 0, Some(0));
339+
let block = create_block(true, 0, 0);
315340
output.push_data(Ok(block));
316341

317342
let (_, pending) = stream.next().unwrap();
@@ -321,7 +346,7 @@ mod tests {
321346
}
322347

323348
{
324-
let block = create_block(false, 0, Some(0));
349+
let block = create_block(false, 0, 0);
325350
output.push_data(Ok(block));
326351

327352
let (data, pending) = stream.next().unwrap();
@@ -332,7 +357,7 @@ mod tests {
332357
}
333358

334359
{
335-
let block = create_block(true, 0, Some(1));
360+
let block = create_block(true, 0, 1);
336361
output.push_data(Ok(block));
337362

338363
let (data, pending) = stream.next().unwrap();
@@ -341,7 +366,7 @@ mod tests {
341366
assert!(!stream.bound.unwrap().more);
342367
assert!(pending);
343368

344-
let block = create_block(false, 1, Some(1));
369+
let block = create_block(false, 1, 1);
345370
output.push_data(Ok(block));
346371

347372
let (data, pending) = stream.next().unwrap();

src/query/service/src/pipelines/processors/transforms/sort/sort_restore.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgori
2626
use databend_common_pipeline_transforms::HookTransform;
2727
use databend_common_pipeline_transforms::HookTransformer;
2828

29+
use super::sort_spill::OutputData;
2930
use super::sort_spill::SortSpill;
3031
use super::Base;
3132
use super::SortBound;
33+
use super::SortBoundNext;
3234
use super::SortCollectedMeta;
33-
use crate::pipelines::processors::transforms::sort::sort_spill::OutputData;
3435

3536
pub struct TransformSortRestore<A: SortAlgorithm> {
3637
input: Vec<SortCollectedMeta>,
@@ -121,7 +122,8 @@ where
121122
finish,
122123
} = spill_sort.on_restore().await?;
123124
if let Some(block) = block {
124-
let mut block = block.add_meta(Some(SortBound::create(bound_index, None)))?;
125+
let mut block =
126+
block.add_meta(Some(SortBound::create(bound_index, SortBoundNext::Uinit)))?;
125127
if self.remove_order_col {
126128
block.pop_columns(1);
127129
}
@@ -188,7 +190,7 @@ impl Processor for SortBoundEdge {
188190
.take_meta()
189191
.and_then(SortBound::downcast_from)
190192
.expect("require a SortBound");
191-
meta.next = None;
193+
meta.next = SortBoundNext::Uinit;
192194
self.output
193195
.push_data(Ok(block.add_meta(Some(meta.boxed()))?));
194196
self.output.finish();
@@ -212,7 +214,7 @@ impl Processor for SortBoundEdge {
212214
.and_then(SortBound::downcast_mut)
213215
.expect("require a SortBound");
214216

215-
output_meta.next = Some(incoming_index);
217+
output_meta.next = SortBoundNext::Next(incoming_index);
216218

217219
log::debug!("output_meta: {:?}", output_meta);
218220

0 commit comments

Comments
 (0)