Skip to content

Commit 61383d0

Browse files
committed
fix
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent 5039da3 commit 61383d0

File tree

14 files changed

+200
-100
lines changed

14 files changed

+200
-100
lines changed

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ use databend_common_storage::DataOperator;
3333
use databend_common_storages_fuse::TableContext;
3434

3535
use crate::pipelines::memory_settings::MemorySettingsExt;
36-
use crate::pipelines::processors::transforms::add_range_shuffle_exchange;
3736
use crate::pipelines::processors::transforms::add_range_shuffle_route;
37+
use crate::pipelines::processors::transforms::SortRangeExchange;
3838
use crate::pipelines::processors::transforms::SortSampleState;
3939
use crate::pipelines::processors::transforms::TransformLimit;
4040
use crate::pipelines::processors::transforms::TransformSortBuilder;
@@ -137,8 +137,9 @@ impl PipelineBuilder {
137137
None => {
138138
// Build for single node mode.
139139
// We build the full sort pipeline for it.
140-
let k = self.settings.get_range_shuffle_sort_simple_size()?;
141-
if k > 0 && self.main_pipeline.output_len() > 1 {
140+
if self.settings.get_enable_range_shuffle_sort()?
141+
&& self.main_pipeline.output_len() > 1
142+
{
142143
builder
143144
.remove_order_col_at_last()
144145
.build_range_shuffle_sort_pipeline(&mut self.main_pipeline)
@@ -259,7 +260,11 @@ impl SortPipelineBuilder {
259260

260261
builder.add_shuffle(pipeline, state.clone())?;
261262

262-
add_range_shuffle_exchange(pipeline, max_threads)?;
263+
pipeline.exchange(max_threads, Arc::new(SortRangeExchange));
264+
265+
pipeline.add_transform(|input, output| {
266+
Ok(ProcessorPtr::create(builder.build_combine(input, output)?))
267+
})?;
263268

264269
pipeline.add_transform(|input, output| {
265270
Ok(ProcessorPtr::create(builder.build_exec(input, output)?))

src/query/service/src/pipelines/executor/executor_graph.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ impl RunningGraph {
789789
true => Ok(()),
790790
false => Err(ErrorCode::Internal(format!(
791791
"Pipeline graph is not finished, details: {}",
792-
self.format_graph_nodes()
792+
self.format_graph_nodes(true)
793793
))),
794794
}
795795
}
@@ -855,7 +855,7 @@ impl RunningGraph {
855855
self.0.finished_notify.clone()
856856
}
857857

858-
pub fn format_graph_nodes(&self) -> String {
858+
pub fn format_graph_nodes(&self, pretty: bool) -> String {
859859
pub struct NodeDisplay {
860860
id: usize,
861861
name: String,
@@ -955,7 +955,11 @@ impl RunningGraph {
955955
}
956956
}
957957

958-
format!("{:?}", nodes_display)
958+
if pretty {
959+
format!("{:#?}", nodes_display)
960+
} else {
961+
format!("{:?}", nodes_display)
962+
}
959963
}
960964

961965
/// Change the priority

src/query/service/src/pipelines/executor/pipeline_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ impl PipelineExecutor {
268268
pub fn format_graph_nodes(&self) -> String {
269269
match self {
270270
PipelineExecutor::QueryPipelineExecutor(executor) => executor.format_graph_nodes(),
271-
PipelineExecutor::QueriesPipelineExecutor(v) => v.graph.format_graph_nodes(),
271+
PipelineExecutor::QueriesPipelineExecutor(v) => v.graph.format_graph_nodes(false),
272272
}
273273
}
274274

src/query/service/src/pipelines/executor/processor_async_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ impl ProcessorAsyncTask {
157157
processor_name,
158158
elapsed,
159159
active_workers,
160-
graph_clone.format_graph_nodes()
160+
graph_clone.format_graph_nodes(false)
161161
);
162162
}
163163
};

src/query/service/src/pipelines/executor/query_pipeline_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ impl QueryPipelineExecutor {
431431
}
432432

433433
pub fn format_graph_nodes(&self) -> String {
434-
self.graph.format_graph_nodes()
434+
self.graph.format_graph_nodes(false)
435435
}
436436

437437
pub fn fetch_plans_profile(&self, collect_metrics: bool) -> HashMap<u32, PlanProfile> {

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mod bounds;
2828
mod merge_sort;
2929
mod sort_builder;
3030
mod sort_collect;
31+
mod sort_combine;
3132
mod sort_exchange;
3233
mod sort_execute;
3334
mod sort_route;
@@ -60,7 +61,7 @@ local_block_meta_serde!(SortCollectedMeta);
6061
impl BlockMetaInfo for SortCollectedMeta {}
6162

6263
#[derive(Debug)]
63-
struct SortScatteredMeta(pub Vec<SortCollectedMeta>);
64+
struct SortScatteredMeta(pub Vec<Option<SortCollectedMeta>>);
6465

6566
local_block_meta_serde!(SortScatteredMeta);
6667

src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ use databend_common_pipeline_transforms::sort::utils::ORDER_COL_NAME;
3131
use databend_common_pipeline_transforms::sort::RowConverter;
3232
use databend_common_pipeline_transforms::sort::Rows;
3333
use databend_common_pipeline_transforms::sort::RowsTypeVisitor;
34+
use databend_common_pipeline_transforms::AccumulatingTransformer;
3435
use databend_common_pipeline_transforms::MemorySettings;
3536

3637
use super::merge_sort::TransformSort;
3738
use super::sort_collect::TransformSortCollect;
39+
use super::sort_combine::TransformSortCombine;
3840
use super::sort_execute::TransformSortExecute;
3941
use super::sort_shuffle::SortSampleState;
4042
use super::sort_shuffle::TransformSortShuffle;
@@ -46,6 +48,7 @@ enum SortType {
4648
Collect,
4749
Execute,
4850
Shuffle,
51+
Combine,
4952
}
5053

5154
pub struct TransformSortBuilder {
@@ -183,6 +186,25 @@ impl TransformSortBuilder {
183186
select_row_type(&mut build)
184187
}
185188

189+
pub fn build_combine(
190+
&self,
191+
input: Arc<InputPort>,
192+
output: Arc<OutputPort>,
193+
) -> Result<Box<dyn Processor>> {
194+
self.check();
195+
196+
let mut build = Build {
197+
params: self,
198+
input,
199+
output,
200+
typ: SortType::Combine,
201+
id: 0,
202+
state: None,
203+
};
204+
205+
select_row_type(&mut build)
206+
}
207+
186208
fn should_use_sort_limit(&self) -> bool {
187209
self.limit.map(|limit| limit < 10000).unwrap_or_default()
188210
}
@@ -288,6 +310,15 @@ impl Build<'_> {
288310
self.params.spiller.clone(),
289311
)))
290312
}
313+
314+
fn build_sort_combine<R>(&mut self) -> Result<Box<dyn Processor>>
315+
where R: Rows + 'static {
316+
Ok(AccumulatingTransformer::create(
317+
self.input.clone(),
318+
self.output.clone(),
319+
TransformSortCombine::<R>::new(self.params.block_size),
320+
))
321+
}
291322
}
292323

293324
impl RowsTypeVisitor for Build<'_> {
@@ -320,6 +351,7 @@ impl RowsTypeVisitor for Build<'_> {
320351
false => self.build_sort_exec::<HeapSort<R>>(),
321352
},
322353
SortType::Shuffle => self.build_sort_shuffle::<R>(),
354+
SortType::Combine => self.build_sort_combine::<R>(),
323355
}
324356
}
325357
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::Result;
16+
use databend_common_expression::BlockMetaInfoDowncast;
17+
use databend_common_expression::DataBlock;
18+
use databend_common_pipeline_transforms::sort::Rows;
19+
use databend_common_pipeline_transforms::AccumulatingTransform;
20+
21+
use super::bounds::Bounds;
22+
use super::SortCollectedMeta;
23+
24+
pub struct TransformSortCombine<R: Rows> {
25+
batch_rows: usize,
26+
metas: Vec<SortCollectedMeta>,
27+
_r: std::marker::PhantomData<R>,
28+
}
29+
30+
impl<R: Rows> TransformSortCombine<R> {
31+
pub fn new(batch_rows: usize) -> Self {
32+
Self {
33+
batch_rows,
34+
metas: vec![],
35+
_r: Default::default(),
36+
}
37+
}
38+
}
39+
40+
impl<R: Rows> AccumulatingTransform for TransformSortCombine<R> {
41+
const NAME: &'static str = "TransformSortCombine";
42+
43+
fn transform(&mut self, mut data: DataBlock) -> Result<Vec<DataBlock>> {
44+
self.metas.push(
45+
data.take_meta()
46+
.and_then(SortCollectedMeta::downcast_from)
47+
.expect("require a SortCollectedMeta"),
48+
);
49+
Ok(vec![])
50+
}
51+
52+
fn on_finish(&mut self, output: bool) -> Result<Vec<DataBlock>> {
53+
if !output || self.metas.is_empty() {
54+
return Ok(vec![]);
55+
}
56+
57+
let params = self.metas.first().map(|meta| meta.params).unwrap();
58+
59+
let bounds = self
60+
.metas
61+
.iter_mut()
62+
.map(|meta| std::mem::take(&mut meta.bounds))
63+
.collect();
64+
let bounds = Bounds::merge::<R>(bounds, self.batch_rows)?;
65+
66+
let blocks = self
67+
.metas
68+
.drain(..)
69+
.flat_map(|meta| meta.blocks.into_iter())
70+
.collect();
71+
72+
Ok(vec![DataBlock::empty_with_meta(Box::new(
73+
SortCollectedMeta {
74+
params,
75+
bounds,
76+
blocks,
77+
},
78+
))])
79+
}
80+
}

src/query/service/src/pipelines/processors/transforms/sort/sort_exchange.rs

Lines changed: 10 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -12,76 +12,33 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::iter;
16-
use std::sync::Arc;
17-
1815
use databend_common_exception::Result;
1916
use databend_common_expression::BlockMetaInfoDowncast;
2017
use databend_common_expression::DataBlock;
2118
use databend_common_pipeline_core::processors::Exchange;
22-
use databend_common_pipeline_core::processors::InputPort;
23-
use databend_common_pipeline_core::processors::OutputPort;
24-
use databend_common_pipeline_core::processors::PartitionProcessor;
25-
use databend_common_pipeline_core::Pipe;
26-
use databend_common_pipeline_core::PipeItem;
27-
use databend_common_pipeline_core::Pipeline;
2819

2920
use super::SortScatteredMeta;
3021

31-
struct SortRangeExchange;
22+
pub struct SortRangeExchange;
3223

3324
impl Exchange for SortRangeExchange {
3425
const NAME: &'static str = "SortRange";
3526
fn partition(&self, mut data: DataBlock, n: usize) -> Result<Vec<DataBlock>> {
36-
let Some(meta) = data.take_meta() else {
37-
unreachable!();
38-
};
39-
40-
let Some(SortScatteredMeta(scattered)) = SortScatteredMeta::downcast_from(meta) else {
41-
unreachable!();
42-
};
43-
27+
let scattered = data
28+
.take_meta()
29+
.and_then(SortScatteredMeta::downcast_from)
30+
.expect("require a SortScatteredMeta")
31+
.0;
4432
assert!(scattered.len() <= n);
4533

4634
let blocks = scattered
4735
.into_iter()
48-
.map(|meta| DataBlock::empty_with_meta(Box::new(meta)))
36+
.map(|meta| {
37+
meta.map(|meta| DataBlock::empty_with_meta(Box::new(meta)))
38+
.unwrap_or_else(DataBlock::empty)
39+
})
4940
.collect();
5041

5142
Ok(blocks)
5243
}
5344
}
54-
55-
fn create_exchange_pipe(num_input: usize, num_output: usize) -> Pipe {
56-
let items = iter::repeat_with(|| {
57-
let input = InputPort::create();
58-
let outputs = iter::repeat_with(OutputPort::create)
59-
.take(num_output)
60-
.collect::<Vec<_>>();
61-
62-
PipeItem::create(
63-
PartitionProcessor::create(input.clone(), outputs.clone(), Arc::new(SortRangeExchange)),
64-
vec![input],
65-
outputs,
66-
)
67-
})
68-
.take(num_input)
69-
.collect::<Vec<_>>();
70-
71-
Pipe::create(num_input, num_input * num_output, items)
72-
}
73-
74-
pub fn add_range_shuffle_exchange(pipeline: &mut Pipeline, num_output: usize) -> Result<()> {
75-
let num_input = pipeline.output_len();
76-
77-
pipeline.add_pipe(create_exchange_pipe(num_input, num_output));
78-
79-
let n = num_output;
80-
let reorder_edges = (0..num_input * n)
81-
.map(|i| (i % n) * num_input + (i / n))
82-
.collect::<Vec<_>>();
83-
84-
pipeline.reorder_inputs(reorder_edges);
85-
86-
Ok(())
87-
}

src/query/service/src/pipelines/processors/transforms/sort/sort_execute.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ where
9191
return Ok(Event::NeedConsume);
9292
}
9393

94+
if self.input.is_finished() && self.inner.is_none() {
95+
self.output.finish();
96+
return Ok(Event::Finished);
97+
}
98+
9499
if let Some(mut block) = self.input.pull_data().transpose()? {
95100
assert!(self.inner.is_none());
96101
let meta = block
@@ -102,12 +107,12 @@ where
102107
return Ok(Event::Async);
103108
}
104109

105-
if self.input.is_finished() {
106-
Ok(Event::Async)
107-
} else {
108-
self.input.set_need_data();
109-
Ok(Event::NeedData)
110+
if self.inner.is_some() {
111+
return Ok(Event::Async);
110112
}
113+
114+
self.input.set_need_data();
115+
Ok(Event::NeedData)
111116
}
112117

113118
#[async_backtrace::framed]
@@ -122,6 +127,7 @@ where
122127
}
123128
if finish {
124129
self.output.finish();
130+
self.inner = None;
125131
}
126132
Ok(())
127133
}

0 commit comments

Comments
 (0)