Skip to content

Commit 056c3d9

Browse files
committed
update
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent b7b15af commit 056c3d9

File tree

5 files changed

+105
-160
lines changed

5 files changed

+105
-160
lines changed

src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs

Lines changed: 69 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_exception::ErrorCode;
1516
use databend_common_exception::Result;
1617
use databend_common_expression::row::RowConverter as CommonConverter;
1718
use databend_common_expression::types::DataType;
@@ -21,7 +22,6 @@ use databend_common_expression::types::NumberType;
2122
use databend_common_expression::types::StringType;
2223
use databend_common_expression::types::TimestampType;
2324
use databend_common_expression::with_number_mapped_type;
24-
use databend_common_expression::BlockEntry;
2525
use databend_common_expression::Column;
2626
use databend_common_expression::DataBlock;
2727
use databend_common_expression::DataSchema;
@@ -41,55 +41,50 @@ pub fn convert_rows(
4141
sort_desc: &[SortColumnDescription],
4242
data: DataBlock,
4343
) -> Result<Column> {
44-
let num_rows = data.num_rows();
44+
struct ConvertRowsVisitor<'a> {
45+
schema: DataSchemaRef,
46+
sort_desc: &'a [SortColumnDescription],
47+
data: DataBlock,
48+
result: Result<Column>,
49+
}
4550

46-
if sort_desc.len() == 1 {
47-
let sort_type = schema.field(sort_desc[0].offset).data_type();
48-
let asc = sort_desc[0].asc;
51+
impl RowsTypeVisitor for ConvertRowsVisitor<'_> {
52+
fn schema(&self) -> DataSchemaRef {
53+
self.schema.clone()
54+
}
4955

50-
let offset = sort_desc[0].offset;
51-
let columns = &data.columns()[offset..offset + 1];
56+
fn sort_desc(&self) -> &[SortColumnDescription] {
57+
self.sort_desc
58+
}
5259

53-
match_template! {
54-
T = [ Date => DateType, Timestamp => TimestampType, String => StringType ],
55-
match sort_type {
56-
DataType::T => {
57-
if asc {
58-
convert_columns::<SimpleRowsAsc<T>,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows)
59-
} else {
60-
convert_columns::<SimpleRowsDesc<T>,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows)
61-
}
62-
},
63-
DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty {
64-
NumberDataType::NUM_TYPE => {
65-
if asc {
66-
convert_columns::<SimpleRowsAsc<NumberType<NUM_TYPE>>,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows)
67-
} else {
68-
convert_columns::<SimpleRowsDesc<NumberType<NUM_TYPE>>,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows)
69-
}
70-
}
71-
}),
72-
_ => convert_columns::<CommonRows, CommonConverter>(schema, sort_desc, columns, num_rows),
60+
fn visit_type<R, C>(&mut self)
61+
where
62+
R: Rows + 'static,
63+
C: RowConverter<R> + Send + 'static,
64+
{
65+
let columns = self
66+
.sort_desc
67+
.iter()
68+
.map(|desc| self.data.get_by_offset(desc.offset).to_owned())
69+
.collect::<Vec<_>>();
70+
71+
self.result = try {
72+
let converter = C::create(self.sort_desc, self.schema.clone())?;
73+
let rows = C::convert(&converter, &columns, self.data.num_rows())?;
74+
rows.to_column()
7375
}
7476
}
75-
} else {
76-
let columns = sort_desc
77-
.iter()
78-
.map(|desc| data.get_by_offset(desc.offset).to_owned())
79-
.collect::<Vec<_>>();
80-
convert_columns::<CommonRows, CommonConverter>(schema, sort_desc, &columns, num_rows)
8177
}
82-
}
8378

84-
fn convert_columns<R: Rows, C: RowConverter<R>>(
85-
schema: DataSchemaRef,
86-
sort_desc: &[SortColumnDescription],
87-
columns: &[BlockEntry],
88-
num_rows: usize,
89-
) -> Result<Column> {
90-
let converter = C::create(sort_desc, schema)?;
91-
let rows = C::convert(&converter, columns, num_rows)?;
92-
Ok(rows.to_column())
79+
let mut visitor = ConvertRowsVisitor {
80+
schema: schema.clone(),
81+
sort_desc,
82+
data,
83+
result: Err(ErrorCode::Internal("unreachable")),
84+
};
85+
86+
select_row_type(&mut visitor);
87+
visitor.result
9388
}
9489

9590
pub fn select_row_type(visitor: &mut impl RowsTypeVisitor) {
@@ -138,19 +133,37 @@ pub trait RowsTypeVisitor {
138133
}
139134

140135
pub fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) -> DataType {
141-
debug_assert!(!desc.is_empty());
142-
if desc.len() == 1 {
143-
let order_by_field = schema.field(desc[0].offset);
144-
if matches!(
145-
order_by_field.data_type(),
146-
DataType::Number(_)
147-
| DataType::Date
148-
| DataType::Timestamp
149-
| DataType::Binary
150-
| DataType::String
151-
) {
152-
return order_by_field.data_type().clone();
136+
struct OrderFieldTypeVisitor<'a> {
137+
schema: DataSchemaRef,
138+
sort_desc: &'a [SortColumnDescription],
139+
result: Option<DataType>,
140+
}
141+
142+
impl RowsTypeVisitor for OrderFieldTypeVisitor<'_> {
143+
fn schema(&self) -> DataSchemaRef {
144+
self.schema.clone()
145+
}
146+
147+
fn sort_desc(&self) -> &[SortColumnDescription] {
148+
self.sort_desc
149+
}
150+
151+
fn visit_type<R, C>(&mut self)
152+
where
153+
R: Rows + 'static,
154+
C: RowConverter<R> + Send + 'static,
155+
{
156+
self.result = Some(R::data_type());
153157
}
154158
}
155-
DataType::Binary
159+
160+
assert!(!desc.is_empty());
161+
let mut visitor = OrderFieldTypeVisitor {
162+
schema: schema.clone().into(),
163+
sort_desc: desc,
164+
result: None,
165+
};
166+
167+
select_row_type(&mut visitor);
168+
visitor.result.unwrap()
156169
}

src/query/service/src/pipelines/processors/transforms/sort/builder.rs

Lines changed: 25 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use super::execute::TransformSortExecute;
3636
use super::merge_sort::TransformSort;
3737
use super::shuffle::SortSampleState;
3838
use super::shuffle::TransformSortShuffle;
39+
use super::Base;
3940
use crate::spillers::Spiller;
4041

4142
enum SortType {
@@ -199,6 +200,17 @@ impl TransformSortBuilder {
199200
!self.schema.has_field(ORDER_COL_NAME)
200201
});
201202
}
203+
204+
fn new_base(&self) -> Base {
205+
let schema = add_order_field(self.schema.clone(), &self.sort_desc);
206+
let sort_row_offset = schema.fields().len() - 1;
207+
Base {
208+
sort_row_offset,
209+
schema,
210+
spiller: self.spiller.clone(),
211+
limit: self.limit,
212+
}
213+
}
202214
}
203215

204216
pub struct Build<'a> {
@@ -212,27 +224,7 @@ pub struct Build<'a> {
212224
}
213225

214226
impl Build<'_> {
215-
fn build_sort<A, C>(&mut self) -> Result<Box<dyn Processor>>
216-
where
217-
A: SortAlgorithm + 'static,
218-
C: RowConverter<A::Rows> + Send + 'static,
219-
{
220-
let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc);
221-
Ok(Box::new(TransformSort::<A, C>::new(
222-
self.input.clone(),
223-
self.output.clone(),
224-
schema,
225-
self.params.sort_desc.clone(),
226-
self.params.block_size,
227-
self.params.limit.map(|limit| (limit, false)),
228-
self.params.spiller.clone(),
229-
self.params.output_order_col,
230-
self.params.order_col_generated,
231-
self.params.memory_settings.clone(),
232-
)?))
233-
}
234-
235-
fn build_sort_limit<A, C>(&mut self) -> Result<Box<dyn Processor>>
227+
fn build_sort<A, C>(&mut self, limit_sort: bool) -> Result<Box<dyn Processor>>
236228
where
237229
A: SortAlgorithm + 'static,
238230
C: RowConverter<A::Rows> + Send + 'static,
@@ -244,63 +236,37 @@ impl Build<'_> {
244236
schema,
245237
self.params.sort_desc.clone(),
246238
self.params.block_size,
247-
Some((self.params.limit.unwrap(), true)),
239+
self.params.limit.map(|limit| (limit, limit_sort)),
248240
self.params.spiller.clone(),
249241
self.params.output_order_col,
250242
self.params.order_col_generated,
251243
self.params.memory_settings.clone(),
252244
)?))
253245
}
254246

255-
fn build_sort_collect<A, C>(&mut self) -> Result<Box<dyn Processor>>
247+
fn build_sort_collect<A, C>(&mut self, limit_sort: bool) -> Result<Box<dyn Processor>>
256248
where
257249
A: SortAlgorithm + 'static,
258250
C: RowConverter<A::Rows> + Send + 'static,
259251
{
260-
let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc);
261-
262252
Ok(Box::new(TransformSortCollect::<A, C>::new(
263253
self.input.clone(),
264254
self.output.clone(),
265-
schema,
255+
self.params.new_base(),
266256
self.params.sort_desc.clone(),
267257
self.params.block_size,
268-
self.params.limit.map(|limit| (limit, false)),
269-
self.params.spiller.clone(),
270-
self.params.order_col_generated,
271-
self.params.memory_settings.clone(),
272-
)?))
273-
}
274-
275-
fn build_sort_limit_collect<A, C>(&mut self) -> Result<Box<dyn Processor>>
276-
where
277-
A: SortAlgorithm + 'static,
278-
C: RowConverter<A::Rows> + Send + 'static,
279-
{
280-
let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc);
281-
Ok(Box::new(TransformSortCollect::<A, C>::new(
282-
self.input.clone(),
283-
self.output.clone(),
284-
schema,
285-
self.params.sort_desc.clone(),
286-
self.params.block_size,
287-
Some((self.params.limit.unwrap(), true)),
288-
self.params.spiller.clone(),
258+
limit_sort,
289259
self.params.order_col_generated,
290260
self.params.memory_settings.clone(),
291261
)?))
292262
}
293263

294264
fn build_sort_exec<A>(&mut self) -> Result<Box<dyn Processor>>
295265
where A: SortAlgorithm + 'static {
296-
let schema = add_order_field(self.params.schema.clone(), &self.params.sort_desc);
297-
298266
Ok(Box::new(TransformSortExecute::<A>::new(
299267
self.input.clone(),
300268
self.output.clone(),
301-
schema,
302-
self.params.limit,
303-
self.params.spiller.clone(),
269+
self.params.new_base(),
304270
self.params.output_order_col,
305271
)?))
306272
}
@@ -331,24 +297,15 @@ impl RowsTypeVisitor for Build<'_> {
331297
R: Rows + 'static,
332298
C: RowConverter<R> + Send + 'static,
333299
{
300+
let limit_sort = self.params.should_use_sort_limit();
334301
let processor = match self.typ {
335-
SortType::Sort => match (
336-
self.params.should_use_sort_limit(),
337-
self.params.enable_loser_tree,
338-
) {
339-
(true, true) => self.build_sort_limit::<LoserTreeSort<R>, C>(),
340-
(true, false) => self.build_sort_limit::<HeapSort<R>, C>(),
341-
(false, true) => self.build_sort::<LoserTreeSort<R>, C>(),
342-
(false, false) => self.build_sort::<HeapSort<R>, C>(),
302+
SortType::Sort => match self.params.enable_loser_tree {
303+
true => self.build_sort::<LoserTreeSort<R>, C>(limit_sort),
304+
false => self.build_sort::<HeapSort<R>, C>(limit_sort),
343305
},
344-
SortType::Collect => match (
345-
self.params.should_use_sort_limit(),
346-
self.params.enable_loser_tree,
347-
) {
348-
(true, true) => self.build_sort_limit_collect::<LoserTreeSort<R>, C>(),
349-
(true, false) => self.build_sort_limit_collect::<HeapSort<R>, C>(),
350-
(false, true) => self.build_sort_collect::<LoserTreeSort<R>, C>(),
351-
(false, false) => self.build_sort_collect::<HeapSort<R>, C>(),
306+
SortType::Collect => match self.params.enable_loser_tree {
307+
true => self.build_sort_collect::<LoserTreeSort<R>, C>(limit_sort),
308+
false => self.build_sort_collect::<HeapSort<R>, C>(limit_sort),
352309
},
353310
SortType::Execute => match self.params.enable_loser_tree {
354311
true => self.build_sort_exec::<LoserTreeSort<R>>(),

src/query/service/src/pipelines/processors/transforms/sort/collect.rs

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::sync::Arc;
2020
use databend_common_exception::Result;
2121
use databend_common_expression::BlockEntry;
2222
use databend_common_expression::DataBlock;
23-
use databend_common_expression::DataSchemaRef;
2423
use databend_common_expression::SortColumnDescription;
2524
use databend_common_expression::Value;
2625
use databend_common_pipeline_core::processors::Event;
@@ -38,7 +37,6 @@ use databend_common_pipeline_transforms::TransformSortMergeLimit;
3837
use super::sort_spill::SortSpill;
3938
use super::Base;
4039
use super::MemoryRows;
41-
use crate::spillers::Spiller;
4240

4341
enum Inner<A: SortAlgorithm> {
4442
Collect(Vec<DataBlock>),
@@ -77,28 +75,20 @@ where
7775
pub(super) fn new(
7876
input: Arc<InputPort>,
7977
output: Arc<OutputPort>,
80-
schema: DataSchemaRef,
78+
base: Base,
8179
sort_desc: Arc<[SortColumnDescription]>,
8280
max_block_size: usize,
83-
limit: Option<(usize, bool)>,
84-
spiller: Arc<Spiller>,
81+
sort_limit: bool,
8582
order_col_generated: bool,
8683
memory_settings: MemorySettings,
8784
) -> Result<Self> {
88-
let sort_row_offset = schema.fields().len() - 1;
89-
let row_converter = C::create(&sort_desc, schema.clone())?;
90-
let (name, inner, limit) = match limit {
91-
Some((limit, true)) => (
85+
let row_converter = C::create(&sort_desc, base.schema.clone())?;
86+
let (name, inner) = match base.limit {
87+
Some(limit) if sort_limit => (
9288
"TransformSortMergeCollectLimit",
9389
Inner::Limit(TransformSortMergeLimit::create(max_block_size, limit)),
94-
Some(limit),
9590
),
96-
Some((limit, false)) => (
97-
"TransformSortMergeCollect",
98-
Inner::Collect(vec![]),
99-
Some(limit),
100-
),
101-
None => ("TransformSortMergeCollect", Inner::Collect(vec![]), None),
91+
_ => ("TransformSortMergeCollect", Inner::Collect(vec![])),
10292
};
10393
Ok(Self {
10494
input,
@@ -108,12 +98,7 @@ where
10898
output_data: None,
10999
sort_desc,
110100
order_col_generated,
111-
base: Base {
112-
schema,
113-
spiller,
114-
sort_row_offset,
115-
limit,
116-
},
101+
base,
117102
inner,
118103
aborting: AtomicBool::new(false),
119104
memory_settings,

0 commit comments

Comments
 (0)