Skip to content

Commit a5af6b7

Browse files
committed
build
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent 056c3d9 commit a5af6b7

File tree

10 files changed

+187
-759
lines changed

10 files changed

+187
-759
lines changed

src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs

Lines changed: 38 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use databend_common_exception::ErrorCode;
1615
use databend_common_exception::Result;
1716
use databend_common_expression::row::RowConverter as CommonConverter;
1817
use databend_common_expression::types::DataType;
@@ -45,10 +44,10 @@ pub fn convert_rows(
4544
schema: DataSchemaRef,
4645
sort_desc: &'a [SortColumnDescription],
4746
data: DataBlock,
48-
result: Result<Column>,
4947
}
5048

5149
impl RowsTypeVisitor for ConvertRowsVisitor<'_> {
50+
type Result = Result<Column>;
5251
fn schema(&self) -> DataSchemaRef {
5352
self.schema.clone()
5453
}
@@ -57,7 +56,7 @@ pub fn convert_rows(
5756
self.sort_desc
5857
}
5958

60-
fn visit_type<R, C>(&mut self)
59+
fn visit_type<R, C>(&mut self) -> Self::Result
6160
where
6261
R: Rows + 'static,
6362
C: RowConverter<R> + Send + 'static,
@@ -68,65 +67,63 @@ pub fn convert_rows(
6867
.map(|desc| self.data.get_by_offset(desc.offset).to_owned())
6968
.collect::<Vec<_>>();
7069

71-
self.result = try {
72-
let converter = C::create(self.sort_desc, self.schema.clone())?;
73-
let rows = C::convert(&converter, &columns, self.data.num_rows())?;
74-
rows.to_column()
75-
}
70+
let converter = C::create(self.sort_desc, self.schema.clone())?;
71+
let rows = C::convert(&converter, &columns, self.data.num_rows())?;
72+
Ok(rows.to_column())
7673
}
7774
}
7875

7976
let mut visitor = ConvertRowsVisitor {
8077
schema: schema.clone(),
8178
sort_desc,
8279
data,
83-
result: Err(ErrorCode::Internal("unreachable")),
8480
};
8581

86-
select_row_type(&mut visitor);
87-
visitor.result
82+
select_row_type(&mut visitor)
8883
}
8984

90-
pub fn select_row_type(visitor: &mut impl RowsTypeVisitor) {
91-
let sort_desc = visitor.sort_desc();
92-
if sort_desc.len() == 1 {
93-
let schema = visitor.schema();
94-
let sort_type = schema.field(sort_desc[0].offset).data_type();
95-
let asc = sort_desc[0].asc;
96-
97-
match_template! {
98-
T = [ Date => DateType, Timestamp => TimestampType, String => StringType ],
99-
match sort_type {
100-
DataType::T => {
101-
if asc {
102-
visitor.visit_type::<SimpleRowsAsc<T>, SimpleRowConverter<T>>()
103-
} else {
104-
visitor.visit_type::<SimpleRowsDesc<T>, SimpleRowConverter<T>>()
105-
}
106-
},
107-
DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty {
108-
NumberDataType::NUM_TYPE => {
85+
pub fn select_row_type<V>(visitor: &mut V) -> V::Result
86+
where V: RowsTypeVisitor {
87+
match &visitor.sort_desc() {
88+
&[desc] => {
89+
let schema = visitor.schema();
90+
let sort_type = schema.field(desc.offset).data_type();
91+
let asc = desc.asc;
92+
93+
match_template! {
94+
T = [ Date => DateType, Timestamp => TimestampType, String => StringType ],
95+
match sort_type {
96+
DataType::T => {
10997
if asc {
110-
visitor.visit_type::<SimpleRowsAsc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
98+
visitor.visit_type::<SimpleRowsAsc<T>, SimpleRowConverter<T>>()
11199
} else {
112-
visitor.visit_type::<SimpleRowsDesc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
100+
visitor.visit_type::<SimpleRowsDesc<T>, SimpleRowConverter<T>>()
101+
}
102+
},
103+
DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty {
104+
NumberDataType::NUM_TYPE => {
105+
if asc {
106+
visitor.visit_type::<SimpleRowsAsc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
107+
} else {
108+
visitor.visit_type::<SimpleRowsDesc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
109+
}
113110
}
111+
}),
112+
_ => visitor.visit_type::<CommonRows, CommonConverter>()
114113
}
115-
}),
116-
_ => visitor.visit_type::<CommonRows, CommonConverter>()
117114
}
118115
}
119-
} else {
120-
visitor.visit_type::<CommonRows, CommonConverter>()
116+
_ => visitor.visit_type::<CommonRows, CommonConverter>(),
121117
}
122118
}
123119

124120
pub trait RowsTypeVisitor {
121+
type Result;
125122
fn schema(&self) -> DataSchemaRef;
126123

127124
fn sort_desc(&self) -> &[SortColumnDescription];
128125

129-
fn visit_type<R, C>(&mut self)
126+
fn visit_type<R, C>(&mut self) -> Self::Result
130127
where
131128
R: Rows + 'static,
132129
C: RowConverter<R> + Send + 'static;
@@ -136,10 +133,10 @@ pub fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) ->
136133
struct OrderFieldTypeVisitor<'a> {
137134
schema: DataSchemaRef,
138135
sort_desc: &'a [SortColumnDescription],
139-
result: Option<DataType>,
140136
}
141137

142138
impl RowsTypeVisitor for OrderFieldTypeVisitor<'_> {
139+
type Result = DataType;
143140
fn schema(&self) -> DataSchemaRef {
144141
self.schema.clone()
145142
}
@@ -148,22 +145,20 @@ pub fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) ->
148145
self.sort_desc
149146
}
150147

151-
fn visit_type<R, C>(&mut self)
148+
fn visit_type<R, C>(&mut self) -> Self::Result
152149
where
153150
R: Rows + 'static,
154151
C: RowConverter<R> + Send + 'static,
155152
{
156-
self.result = Some(R::data_type());
153+
R::data_type()
157154
}
158155
}
159156

160157
assert!(!desc.is_empty());
161158
let mut visitor = OrderFieldTypeVisitor {
162159
schema: schema.clone().into(),
163160
sort_desc: desc,
164-
result: None,
165161
};
166162

167-
select_row_type(&mut visitor);
168-
visitor.result.unwrap()
163+
select_row_type(&mut visitor)
169164
}

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ use databend_common_storage::DataOperator;
3333
use databend_common_storages_fuse::TableContext;
3434

3535
use crate::pipelines::memory_settings::MemorySettingsExt;
36-
use crate::pipelines::processors::transforms::sort::add_range_shuffle;
37-
use crate::pipelines::processors::transforms::sort::add_range_shuffle_merge;
38-
use crate::pipelines::processors::transforms::sort::add_sort_sample;
39-
use crate::pipelines::processors::transforms::sort::SortSampleState;
36+
use crate::pipelines::processors::transforms::add_range_shuffle_exchange;
37+
use crate::pipelines::processors::transforms::add_range_shuffle_route;
38+
use crate::pipelines::processors::transforms::SortSampleState;
4039
use crate::pipelines::processors::transforms::TransformLimit;
4140
use crate::pipelines::processors::transforms::TransformSortBuilder;
4241
use crate::pipelines::PipelineBuilder;
@@ -142,7 +141,7 @@ impl PipelineBuilder {
142141
if k > 0 && self.main_pipeline.output_len() > 1 {
143142
builder
144143
.remove_order_col_at_last()
145-
.build_range_shuffle_sort_pipeline(&mut self.main_pipeline, k)
144+
.build_range_shuffle_sort_pipeline(&mut self.main_pipeline)
146145
} else {
147146
builder
148147
.remove_order_col_at_last()
@@ -211,18 +210,11 @@ impl SortPipelineBuilder {
211210
self.build_merge_sort_pipeline(pipeline, false)
212211
}
213212

214-
fn build_range_shuffle_sort_pipeline(self, pipeline: &mut Pipeline, k: usize) -> Result<()> {
213+
fn build_range_shuffle_sort_pipeline(self, pipeline: &mut Pipeline) -> Result<()> {
215214
let inputs = pipeline.output_len();
216215
let settings = self.ctx.get_settings();
217216
let max_threads = settings.get_max_threads()? as usize;
218-
let sample = SortSampleState::new(
219-
inputs,
220-
max_threads,
221-
self.schema.clone(),
222-
self.sort_desc.clone(),
223-
);
224-
225-
add_sort_sample(pipeline, sample.clone(), self.sort_desc.clone(), k)?;
217+
let max_block_size = settings.get_max_block_size()? as usize;
226218

227219
// Partial sort
228220
pipeline.add_transformer(|| {
@@ -232,20 +224,48 @@ impl SortPipelineBuilder {
232224
)
233225
});
234226

235-
self.build_merge_sort(pipeline, false)?;
227+
let spiller = {
228+
let location_prefix = self.ctx.query_id_spill_prefix();
229+
let config = SpillerConfig {
230+
spiller_type: SpillerType::OrderBy,
231+
location_prefix,
232+
disk_spill: None,
233+
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
234+
};
235+
let op = DataOperator::instance().spill_operator();
236+
Arc::new(Spiller::create(self.ctx.clone(), op, config)?)
237+
};
238+
239+
let memory_settings = MemorySettings::from_sort_settings(&self.ctx)?;
240+
let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?;
236241

237-
add_range_shuffle(
238-
pipeline,
239-
sample.clone(),
240-
self.sort_desc.clone(),
242+
let builder = TransformSortBuilder::create(
241243
self.schema.clone(),
242-
self.block_size,
243-
self.limit,
244-
self.remove_order_col_at_last,
245-
self.enable_loser_tree,
246-
)?;
244+
self.sort_desc.clone(),
245+
max_block_size,
246+
spiller,
247+
)
248+
.with_limit(self.limit)
249+
.with_order_col_generated(false)
250+
.with_output_order_col(false)
251+
.with_memory_settings(memory_settings)
252+
.with_enable_loser_tree(enable_loser_tree);
253+
254+
pipeline.add_transform(|input, output| {
255+
Ok(ProcessorPtr::create(builder.build_collect(input, output)?))
256+
})?;
257+
258+
let state = SortSampleState::new(inputs, max_threads, self.schema.clone(), max_block_size);
259+
260+
builder.add_shuffle(pipeline, state.clone())?;
261+
262+
add_range_shuffle_exchange(pipeline, max_threads)?;
263+
264+
pipeline.add_transform(|input, output| {
265+
Ok(ProcessorPtr::create(builder.build_exec(input, output)?))
266+
})?;
247267

248-
add_range_shuffle_merge(pipeline)?;
268+
add_range_shuffle_route(pipeline)?;
249269

250270
if self.limit.is_none() {
251271
return Ok(());

0 commit comments

Comments
 (0)