Skip to content

Commit 108889c

Browse files
committed
build
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent 10c2441 commit 108889c

File tree

10 files changed

+187
-759
lines changed

10 files changed

+187
-759
lines changed

src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs

Lines changed: 38 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use databend_common_exception::ErrorCode;
1615
use databend_common_exception::Result;
1716
use databend_common_expression::row::RowConverter as CommonConverter;
1817
use databend_common_expression::types::DataType;
@@ -45,10 +44,10 @@ pub fn convert_rows(
4544
schema: DataSchemaRef,
4645
sort_desc: &'a [SortColumnDescription],
4746
data: DataBlock,
48-
result: Result<Column>,
4947
}
5048

5149
impl RowsTypeVisitor for ConvertRowsVisitor<'_> {
50+
type Result = Result<Column>;
5251
fn schema(&self) -> DataSchemaRef {
5352
self.schema.clone()
5453
}
@@ -57,7 +56,7 @@ pub fn convert_rows(
5756
self.sort_desc
5857
}
5958

60-
fn visit_type<R, C>(&mut self)
59+
fn visit_type<R, C>(&mut self) -> Self::Result
6160
where
6261
R: Rows + 'static,
6362
C: RowConverter<R> + Send + 'static,
@@ -68,65 +67,63 @@ pub fn convert_rows(
6867
.map(|desc| self.data.get_by_offset(desc.offset).to_owned())
6968
.collect::<Vec<_>>();
7069

71-
self.result = try {
72-
let converter = C::create(self.sort_desc, self.schema.clone())?;
73-
let rows = C::convert(&converter, &columns, self.data.num_rows())?;
74-
rows.to_column()
75-
}
70+
let converter = C::create(self.sort_desc, self.schema.clone())?;
71+
let rows = C::convert(&converter, &columns, self.data.num_rows())?;
72+
Ok(rows.to_column())
7673
}
7774
}
7875

7976
let mut visitor = ConvertRowsVisitor {
8077
schema: schema.clone(),
8178
sort_desc,
8279
data,
83-
result: Err(ErrorCode::Internal("unreachable")),
8480
};
8581

86-
select_row_type(&mut visitor);
87-
visitor.result
82+
select_row_type(&mut visitor)
8883
}
8984

90-
pub fn select_row_type(visitor: &mut impl RowsTypeVisitor) {
91-
let sort_desc = visitor.sort_desc();
92-
if sort_desc.len() == 1 {
93-
let schema = visitor.schema();
94-
let sort_type = schema.field(sort_desc[0].offset).data_type();
95-
let asc = sort_desc[0].asc;
96-
97-
match_template! {
98-
T = [ Date => DateType, Timestamp => TimestampType, String => StringType ],
99-
match sort_type {
100-
DataType::T => {
101-
if asc {
102-
visitor.visit_type::<SimpleRowsAsc<T>, SimpleRowConverter<T>>()
103-
} else {
104-
visitor.visit_type::<SimpleRowsDesc<T>, SimpleRowConverter<T>>()
105-
}
106-
},
107-
DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty {
108-
NumberDataType::NUM_TYPE => {
85+
pub fn select_row_type<V>(visitor: &mut V) -> V::Result
86+
where V: RowsTypeVisitor {
87+
match &visitor.sort_desc() {
88+
&[desc] => {
89+
let schema = visitor.schema();
90+
let sort_type = schema.field(desc.offset).data_type();
91+
let asc = desc.asc;
92+
93+
match_template! {
94+
T = [ Date => DateType, Timestamp => TimestampType, String => StringType ],
95+
match sort_type {
96+
DataType::T => {
10997
if asc {
110-
visitor.visit_type::<SimpleRowsAsc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
98+
visitor.visit_type::<SimpleRowsAsc<T>, SimpleRowConverter<T>>()
11199
} else {
112-
visitor.visit_type::<SimpleRowsDesc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
100+
visitor.visit_type::<SimpleRowsDesc<T>, SimpleRowConverter<T>>()
101+
}
102+
},
103+
DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty {
104+
NumberDataType::NUM_TYPE => {
105+
if asc {
106+
visitor.visit_type::<SimpleRowsAsc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
107+
} else {
108+
visitor.visit_type::<SimpleRowsDesc<NumberType<NUM_TYPE>>, SimpleRowConverter<NumberType<NUM_TYPE>>>()
109+
}
113110
}
111+
}),
112+
_ => visitor.visit_type::<CommonRows, CommonConverter>()
114113
}
115-
}),
116-
_ => visitor.visit_type::<CommonRows, CommonConverter>()
117114
}
118115
}
119-
} else {
120-
visitor.visit_type::<CommonRows, CommonConverter>()
116+
_ => visitor.visit_type::<CommonRows, CommonConverter>(),
121117
}
122118
}
123119

124120
pub trait RowsTypeVisitor {
121+
type Result;
125122
fn schema(&self) -> DataSchemaRef;
126123

127124
fn sort_desc(&self) -> &[SortColumnDescription];
128125

129-
fn visit_type<R, C>(&mut self)
126+
fn visit_type<R, C>(&mut self) -> Self::Result
130127
where
131128
R: Rows + 'static,
132129
C: RowConverter<R> + Send + 'static;
@@ -136,10 +133,10 @@ pub fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) ->
136133
struct OrderFieldTypeVisitor<'a> {
137134
schema: DataSchemaRef,
138135
sort_desc: &'a [SortColumnDescription],
139-
result: Option<DataType>,
140136
}
141137

142138
impl RowsTypeVisitor for OrderFieldTypeVisitor<'_> {
139+
type Result = DataType;
143140
fn schema(&self) -> DataSchemaRef {
144141
self.schema.clone()
145142
}
@@ -148,22 +145,20 @@ pub fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) ->
148145
self.sort_desc
149146
}
150147

151-
fn visit_type<R, C>(&mut self)
148+
fn visit_type<R, C>(&mut self) -> Self::Result
152149
where
153150
R: Rows + 'static,
154151
C: RowConverter<R> + Send + 'static,
155152
{
156-
self.result = Some(R::data_type());
153+
R::data_type()
157154
}
158155
}
159156

160157
assert!(!desc.is_empty());
161158
let mut visitor = OrderFieldTypeVisitor {
162159
schema: schema.clone().into(),
163160
sort_desc: desc,
164-
result: None,
165161
};
166162

167-
select_row_type(&mut visitor);
168-
visitor.result.unwrap()
163+
select_row_type(&mut visitor)
169164
}

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,9 @@ use databend_common_storages_fuse::TableContext;
3434
use databend_storages_common_cache::TempDirManager;
3535

3636
use crate::pipelines::memory_settings::MemorySettingsExt;
37-
use crate::pipelines::processors::transforms::sort::add_range_shuffle;
38-
use crate::pipelines::processors::transforms::sort::add_range_shuffle_merge;
39-
use crate::pipelines::processors::transforms::sort::add_sort_sample;
40-
use crate::pipelines::processors::transforms::sort::SortSampleState;
37+
use crate::pipelines::processors::transforms::add_range_shuffle_exchange;
38+
use crate::pipelines::processors::transforms::add_range_shuffle_route;
39+
use crate::pipelines::processors::transforms::SortSampleState;
4140
use crate::pipelines::processors::transforms::TransformLimit;
4241
use crate::pipelines::processors::transforms::TransformSortBuilder;
4342
use crate::pipelines::PipelineBuilder;
@@ -144,7 +143,7 @@ impl PipelineBuilder {
144143
if k > 0 && self.main_pipeline.output_len() > 1 {
145144
builder
146145
.remove_order_col_at_last()
147-
.build_range_shuffle_sort_pipeline(&mut self.main_pipeline, k)
146+
.build_range_shuffle_sort_pipeline(&mut self.main_pipeline)
148147
} else {
149148
builder
150149
.remove_order_col_at_last()
@@ -213,18 +212,11 @@ impl SortPipelineBuilder {
213212
self.build_merge_sort_pipeline(pipeline, false)
214213
}
215214

216-
fn build_range_shuffle_sort_pipeline(self, pipeline: &mut Pipeline, k: usize) -> Result<()> {
215+
fn build_range_shuffle_sort_pipeline(self, pipeline: &mut Pipeline) -> Result<()> {
217216
let inputs = pipeline.output_len();
218217
let settings = self.ctx.get_settings();
219218
let max_threads = settings.get_max_threads()? as usize;
220-
let sample = SortSampleState::new(
221-
inputs,
222-
max_threads,
223-
self.schema.clone(),
224-
self.sort_desc.clone(),
225-
);
226-
227-
add_sort_sample(pipeline, sample.clone(), self.sort_desc.clone(), k)?;
219+
let max_block_size = settings.get_max_block_size()? as usize;
228220

229221
// Partial sort
230222
pipeline.add_transformer(|| {
@@ -234,20 +226,48 @@ impl SortPipelineBuilder {
234226
)
235227
});
236228

237-
self.build_merge_sort(pipeline, false)?;
229+
let spiller = {
230+
let location_prefix = self.ctx.query_id_spill_prefix();
231+
let config = SpillerConfig {
232+
spiller_type: SpillerType::OrderBy,
233+
location_prefix,
234+
disk_spill: None,
235+
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
236+
};
237+
let op = DataOperator::instance().spill_operator();
238+
Arc::new(Spiller::create(self.ctx.clone(), op, config)?)
239+
};
238240

239-
add_range_shuffle(
240-
pipeline,
241-
sample.clone(),
242-
self.sort_desc.clone(),
241+
let memory_settings = MemorySettings::from_sort_settings(&self.ctx)?;
242+
let enable_loser_tree = settings.get_enable_loser_tree_merge_sort()?;
243+
244+
let builder = TransformSortBuilder::create(
243245
self.schema.clone(),
244-
self.block_size,
245-
self.limit,
246-
self.remove_order_col_at_last,
247-
self.enable_loser_tree,
248-
)?;
246+
self.sort_desc.clone(),
247+
max_block_size,
248+
spiller,
249+
)
250+
.with_limit(self.limit)
251+
.with_order_col_generated(false)
252+
.with_output_order_col(false)
253+
.with_memory_settings(memory_settings)
254+
.with_enable_loser_tree(enable_loser_tree);
255+
256+
pipeline.add_transform(|input, output| {
257+
Ok(ProcessorPtr::create(builder.build_collect(input, output)?))
258+
})?;
259+
260+
let state = SortSampleState::new(inputs, max_threads, self.schema.clone(), max_block_size);
261+
262+
builder.add_shuffle(pipeline, state.clone())?;
263+
264+
add_range_shuffle_exchange(pipeline, max_threads)?;
265+
266+
pipeline.add_transform(|input, output| {
267+
Ok(ProcessorPtr::create(builder.build_exec(input, output)?))
268+
})?;
249269

250-
add_range_shuffle_merge(pipeline)?;
270+
add_range_shuffle_route(pipeline)?;
251271

252272
if self.limit.is_none() {
253273
return Ok(());

0 commit comments

Comments
 (0)