Skip to content

Commit 6adc65a

Browse files
committed
fix
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent d340e57 commit 6adc65a

File tree

15 files changed

+208
-109
lines changed

15 files changed

+208
-109
lines changed

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ use databend_common_storages_fuse::TableContext;
3434
use databend_storages_common_cache::TempDirManager;
3535

3636
use crate::pipelines::memory_settings::MemorySettingsExt;
37-
use crate::pipelines::processors::transforms::add_range_shuffle_exchange;
3837
use crate::pipelines::processors::transforms::add_range_shuffle_route;
38+
use crate::pipelines::processors::transforms::SortRangeExchange;
3939
use crate::pipelines::processors::transforms::SortSampleState;
4040
use crate::pipelines::processors::transforms::TransformLimit;
4141
use crate::pipelines::processors::transforms::TransformSortBuilder;
@@ -139,8 +139,9 @@ impl PipelineBuilder {
139139
None => {
140140
// Build for single node mode.
141141
// We build the full sort pipeline for it.
142-
let k = self.settings.get_range_shuffle_sort_simple_size()?;
143-
if k > 0 && self.main_pipeline.output_len() > 1 {
142+
if self.settings.get_enable_range_shuffle_sort()?
143+
&& self.main_pipeline.output_len() > 1
144+
{
144145
builder
145146
.remove_order_col_at_last()
146147
.build_range_shuffle_sort_pipeline(&mut self.main_pipeline)
@@ -261,7 +262,11 @@ impl SortPipelineBuilder {
261262

262263
builder.add_shuffle(pipeline, state.clone())?;
263264

264-
add_range_shuffle_exchange(pipeline, max_threads)?;
265+
pipeline.exchange(max_threads, Arc::new(SortRangeExchange));
266+
267+
pipeline.add_transform(|input, output| {
268+
Ok(ProcessorPtr::create(builder.build_combine(input, output)?))
269+
})?;
265270

266271
pipeline.add_transform(|input, output| {
267272
Ok(ProcessorPtr::create(builder.build_exec(input, output)?))

src/query/service/src/pipelines/executor/executor_graph.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ impl RunningGraph {
796796
true => Ok(()),
797797
false => Err(ErrorCode::Internal(format!(
798798
"Pipeline graph is not finished, details: {}",
799-
self.format_graph_nodes()
799+
self.format_graph_nodes(true)
800800
))),
801801
}
802802
}
@@ -862,7 +862,7 @@ impl RunningGraph {
862862
self.0.finished_notify.clone()
863863
}
864864

865-
pub fn format_graph_nodes(&self) -> String {
865+
pub fn format_graph_nodes(&self, pretty: bool) -> String {
866866
pub struct NodeDisplay {
867867
id: usize,
868868
name: String,
@@ -962,7 +962,11 @@ impl RunningGraph {
962962
}
963963
}
964964

965-
format!("{:?}", nodes_display)
965+
if pretty {
966+
format!("{:#?}", nodes_display)
967+
} else {
968+
format!("{:?}", nodes_display)
969+
}
966970
}
967971

968972
/// Change the priority

src/query/service/src/pipelines/executor/pipeline_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ impl PipelineExecutor {
268268
pub fn format_graph_nodes(&self) -> String {
269269
match self {
270270
PipelineExecutor::QueryPipelineExecutor(executor) => executor.format_graph_nodes(),
271-
PipelineExecutor::QueriesPipelineExecutor(v) => v.graph.format_graph_nodes(),
271+
PipelineExecutor::QueriesPipelineExecutor(v) => v.graph.format_graph_nodes(false),
272272
}
273273
}
274274

src/query/service/src/pipelines/executor/query_pipeline_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ impl QueryPipelineExecutor {
431431
}
432432

433433
pub fn format_graph_nodes(&self) -> String {
434-
self.graph.format_graph_nodes()
434+
self.graph.format_graph_nodes(false)
435435
}
436436

437437
pub fn fetch_plans_profile(&self, collect_metrics: bool) -> HashMap<u32, PlanProfile> {

src/query/service/src/pipelines/processors/transforms/sort/bounds.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl Bounds {
9696
self.0.iter().all(|col| col.len() == 0)
9797
}
9898

99-
pub fn reduce(&self, n: usize, data_type: DataType) -> Option<Self> {
99+
pub fn reduce(&self, n: usize) -> Option<Self> {
100100
if n == 0 {
101101
return Some(Self::default());
102102
}
@@ -126,7 +126,6 @@ impl Bounds {
126126

127127
Some(Bounds(vec![Column::take_column_indices(
128128
&self.0,
129-
data_type,
130129
&indices,
131130
indices.len(),
132131
)]))
@@ -219,16 +218,16 @@ mod tests {
219218
.collect::<Result<Vec<_>>>()?;
220219
let bounds = Bounds::merge::<SimpleRowsDesc<Int32Type>>(data, 2)?;
221220

222-
let got = bounds.reduce(4, Int32Type::data_type()).unwrap();
221+
let got = bounds.reduce(4).unwrap();
223222
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![8, 6, 2, 1])])); // 77 _8 7 _6 3 _2 1 _1 -2
224223

225-
let got = bounds.reduce(3, Int32Type::data_type()).unwrap();
224+
let got = bounds.reduce(3).unwrap();
226225
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![8, 3, 1])])); // 77 _8 7 6 _3 2 1 _1 -2
227226

228-
let got = bounds.reduce(2, Int32Type::data_type()).unwrap();
227+
let got = bounds.reduce(2).unwrap();
229228
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![7, 1])])); // 77 8 _7 6 3 2 _1 1 -2
230229

231-
let got = bounds.reduce(1, Int32Type::data_type()).unwrap();
230+
let got = bounds.reduce(1).unwrap();
232231
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![3])])); // 77 8 7 6 _3 2 1 1 -2
233232

234233
Ok(())

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mod bounds;
2828
mod merge_sort;
2929
mod sort_builder;
3030
mod sort_collect;
31+
mod sort_combine;
3132
mod sort_exchange;
3233
mod sort_execute;
3334
mod sort_route;
@@ -60,7 +61,7 @@ local_block_meta_serde!(SortCollectedMeta);
6061
impl BlockMetaInfo for SortCollectedMeta {}
6162

6263
#[derive(Debug)]
63-
struct SortScatteredMeta(pub Vec<SortCollectedMeta>);
64+
struct SortScatteredMeta(pub Vec<Option<SortCollectedMeta>>);
6465

6566
local_block_meta_serde!(SortScatteredMeta);
6667

src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ use databend_common_pipeline_transforms::sort::utils::ORDER_COL_NAME;
3131
use databend_common_pipeline_transforms::sort::RowConverter;
3232
use databend_common_pipeline_transforms::sort::Rows;
3333
use databend_common_pipeline_transforms::sort::RowsTypeVisitor;
34+
use databend_common_pipeline_transforms::AccumulatingTransformer;
3435
use databend_common_pipeline_transforms::MemorySettings;
3536

3637
use super::merge_sort::TransformSort;
3738
use super::sort_collect::TransformSortCollect;
39+
use super::sort_combine::TransformSortCombine;
3840
use super::sort_execute::TransformSortExecute;
3941
use super::sort_shuffle::SortSampleState;
4042
use super::sort_shuffle::TransformSortShuffle;
@@ -46,6 +48,7 @@ enum SortType {
4648
Collect,
4749
Execute,
4850
Shuffle,
51+
Combine,
4952
}
5053

5154
pub struct TransformSortBuilder {
@@ -183,6 +186,25 @@ impl TransformSortBuilder {
183186
select_row_type(&mut build)
184187
}
185188

189+
pub fn build_combine(
190+
&self,
191+
input: Arc<InputPort>,
192+
output: Arc<OutputPort>,
193+
) -> Result<Box<dyn Processor>> {
194+
self.check();
195+
196+
let mut build = Build {
197+
params: self,
198+
input,
199+
output,
200+
typ: SortType::Combine,
201+
id: 0,
202+
state: None,
203+
};
204+
205+
select_row_type(&mut build)
206+
}
207+
186208
fn should_use_sort_limit(&self) -> bool {
187209
self.limit.map(|limit| limit < 10000).unwrap_or_default()
188210
}
@@ -288,6 +310,15 @@ impl Build<'_> {
288310
self.params.spiller.clone(),
289311
)))
290312
}
313+
314+
fn build_sort_combine<R>(&mut self) -> Result<Box<dyn Processor>>
315+
where R: Rows + 'static {
316+
Ok(AccumulatingTransformer::create(
317+
self.input.clone(),
318+
self.output.clone(),
319+
TransformSortCombine::<R>::new(self.params.block_size),
320+
))
321+
}
291322
}
292323

293324
impl RowsTypeVisitor for Build<'_> {
@@ -320,6 +351,7 @@ impl RowsTypeVisitor for Build<'_> {
320351
false => self.build_sort_exec::<HeapSort<R>>(),
321352
},
322353
SortType::Shuffle => self.build_sort_shuffle::<R>(),
354+
SortType::Combine => self.build_sort_combine::<R>(),
323355
}
324356
}
325357
}

src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,9 @@ where
333333
.await?;
334334
}
335335
if input > max || finished && input > 0 {
336-
spill_sort.sort_input_data(std::mem::take(input_data), &self.aborting)?;
336+
spill_sort
337+
.sort_input_data(std::mem::take(input_data), &self.aborting)
338+
.await?;
337339
}
338340
if finished {
339341
self.create_output()
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_expression::BlockMetaInfoDowncast;
17+
use databend_common_expression::DataBlock;
18+
use databend_common_pipeline_transforms::sort::Rows;
19+
use databend_common_pipeline_transforms::AccumulatingTransform;
20+
21+
use super::bounds::Bounds;
22+
use super::SortCollectedMeta;
23+
24+
pub struct TransformSortCombine<R: Rows> {
25+
batch_rows: usize,
26+
metas: Vec<SortCollectedMeta>,
27+
_r: std::marker::PhantomData<R>,
28+
}
29+
30+
impl<R: Rows> TransformSortCombine<R> {
31+
pub fn new(batch_rows: usize) -> Self {
32+
Self {
33+
batch_rows,
34+
metas: vec![],
35+
_r: Default::default(),
36+
}
37+
}
38+
}
39+
40+
impl<R: Rows> AccumulatingTransform for TransformSortCombine<R> {
41+
const NAME: &'static str = "TransformSortCombine";
42+
43+
fn transform(&mut self, mut data: DataBlock) -> Result<Vec<DataBlock>> {
44+
self.metas.push(
45+
data.take_meta()
46+
.and_then(SortCollectedMeta::downcast_from)
47+
.expect("require a SortCollectedMeta"),
48+
);
49+
Ok(vec![])
50+
}
51+
52+
fn on_finish(&mut self, output: bool) -> Result<Vec<DataBlock>> {
53+
if !output || self.metas.is_empty() {
54+
return Ok(vec![]);
55+
}
56+
57+
let params = self.metas.first().map(|meta| meta.params).unwrap();
58+
59+
let bounds = self
60+
.metas
61+
.iter_mut()
62+
.map(|meta| std::mem::take(&mut meta.bounds))
63+
.collect();
64+
let bounds = Bounds::merge::<R>(bounds, self.batch_rows)?;
65+
66+
let blocks = self
67+
.metas
68+
.drain(..)
69+
.flat_map(|meta| meta.blocks.into_iter())
70+
.collect();
71+
72+
Ok(vec![DataBlock::empty_with_meta(Box::new(
73+
SortCollectedMeta {
74+
params,
75+
bounds,
76+
blocks,
77+
},
78+
))])
79+
}
80+
}

src/query/service/src/pipelines/processors/transforms/sort/sort_exchange.rs

Lines changed: 10 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -12,76 +12,33 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::iter;
16-
use std::sync::Arc;
17-
1815
use databend_common_exception::Result;
1916
use databend_common_expression::BlockMetaInfoDowncast;
2017
use databend_common_expression::DataBlock;
2118
use databend_common_pipeline_core::processors::Exchange;
22-
use databend_common_pipeline_core::processors::InputPort;
23-
use databend_common_pipeline_core::processors::OutputPort;
24-
use databend_common_pipeline_core::processors::PartitionProcessor;
25-
use databend_common_pipeline_core::Pipe;
26-
use databend_common_pipeline_core::PipeItem;
27-
use databend_common_pipeline_core::Pipeline;
2819

2920
use super::SortScatteredMeta;
3021

31-
struct SortRangeExchange;
22+
pub struct SortRangeExchange;
3223

3324
impl Exchange for SortRangeExchange {
3425
const NAME: &'static str = "SortRange";
3526
fn partition(&self, mut data: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
36-
let Some(meta) = data.take_meta() else {
37-
unreachable!();
38-
};
39-
40-
let Some(SortScatteredMeta(scattered)) = SortScatteredMeta::downcast_from(meta) else {
41-
unreachable!();
42-
};
43-
27+
let scattered = data
28+
.take_meta()
29+
.and_then(SortScatteredMeta::downcast_from)
30+
.expect("require a SortScatteredMeta")
31+
.0;
4432
assert!(scattered.len() <= n);
4533

4634
let blocks = scattered
4735
.into_iter()
48-
.map(|meta| DataBlock::empty_with_meta(Box::new(meta)))
36+
.map(|meta| {
37+
meta.map(|meta| DataBlock::empty_with_meta(Box::new(meta)))
38+
.unwrap_or_else(DataBlock::empty)
39+
})
4940
.collect();
5041

5142
Ok(blocks)
5243
}
5344
}
54-
55-
fn create_exchange_pipe(num_input: usize, num_output: usize) -> Pipe {
56-
let items = iter::repeat_with(|| {
57-
let input = InputPort::create();
58-
let outputs = iter::repeat_with(OutputPort::create)
59-
.take(num_output)
60-
.collect::<Vec<_>>();
61-
62-
PipeItem::create(
63-
PartitionProcessor::create(input.clone(), outputs.clone(), Arc::new(SortRangeExchange)),
64-
vec![input],
65-
outputs,
66-
)
67-
})
68-
.take(num_input)
69-
.collect::<Vec<_>>();
70-
71-
Pipe::create(num_input, num_input * num_output, items)
72-
}
73-
74-
pub fn add_range_shuffle_exchange(pipeline: &mut Pipeline, num_output: usize) -> Result<()> {
75-
let num_input = pipeline.output_len();
76-
77-
pipeline.add_pipe(create_exchange_pipe(num_input, num_output));
78-
79-
let n = num_output;
80-
let reorder_edges = (0..num_input * n)
81-
.map(|i| (i % n) * num_input + (i / n))
82-
.collect::<Vec<_>>();
83-
84-
pipeline.reorder_inputs(reorder_edges);
85-
86-
Ok(())
87-
}

0 commit comments

Comments
 (0)