Skip to content

Commit 89dd885

Browse files
committed
fix
1 parent fdac251 commit 89dd885

File tree

10 files changed

+43
-46
lines changed

10 files changed

+43
-46
lines changed

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl PipelineBuilder {
6666
.collect::<Result<Vec<_>>>()?;
6767
let sort_desc = sort_desc.into();
6868

69-
if sort.step != SortStep::SortShuffled {
69+
if sort.step != SortStep::Shuffled {
7070
self.build_pipeline(&sort.input)?;
7171
}
7272

@@ -124,7 +124,7 @@ impl PipelineBuilder {
124124
// Don't remove the order column at last.
125125
builder.build_full_sort_pipeline(&mut self.main_pipeline)
126126
}
127-
SortStep::FinalMerge => {
127+
SortStep::Final => {
128128
// Build for the coordinator node.
129129
// We only build a `MultiSortMergeTransform`,
130130
// as the data is already sorted in each cluster node.
@@ -145,7 +145,7 @@ impl PipelineBuilder {
145145
self.exchange_injector = TransformSortBuilder::exchange_injector();
146146
Ok(())
147147
}
148-
SortStep::SortShuffled => {
148+
SortStep::Shuffled => {
149149
if matches!(*sort.input, PhysicalPlan::ExchangeSource(_)) {
150150
let exchange = TransformSortBuilder::exchange_injector();
151151
let old_inject = std::mem::replace(&mut self.exchange_injector, exchange);
@@ -383,7 +383,7 @@ impl SortPipelineBuilder {
383383
let builder =
384384
TransformSortBuilder::new(self.schema.clone(), self.sort_desc.clone(), self.block_size)
385385
.with_limit(self.limit)
386-
.with_order_column(true, self.remove_order_col_at_last)
386+
.with_order_column(true, !self.remove_order_col_at_last)
387387
.with_enable_loser_tree(self.enable_loser_tree);
388388

389389
let inputs_port: Vec<_> = (0..pipeline.output_len())

src/query/service/src/pipelines/builders/builder_window.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ impl PipelineBuilder {
201201

202202
let have_order_col = match window_partition.sort_step {
203203
SortStep::Single | SortStep::Partial => false,
204-
SortStep::FinalMerge => true,
204+
SortStep::Final => true,
205205
_ => unimplemented!(),
206206
};
207207

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,23 @@ use crate::spillers::Spiller;
2727

2828
mod bounds;
2929
mod merge_sort;
30+
mod sort_broadcast;
3031
mod sort_builder;
3132
mod sort_collect;
3233
mod sort_exchange_injector;
3334
mod sort_merge_stream;
3435
mod sort_restore;
3536
mod sort_route;
36-
mod sort_shuffle;
3737
mod sort_spill;
3838

3939
pub use merge_sort::*;
40+
pub use sort_broadcast::*;
4041
pub use sort_builder::*;
4142
pub use sort_collect::*;
4243
pub use sort_exchange_injector::*;
4344
pub use sort_merge_stream::*;
4445
pub use sort_restore::*;
4546
pub use sort_route::*;
46-
pub use sort_shuffle::*;
4747

4848
#[derive(Clone)]
4949
struct Base {

src/query/service/src/pipelines/processors/transforms/sort/sort_shuffle.rs renamed to src/query/service/src/pipelines/processors/transforms/sort/sort_broadcast.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use databend_common_exception::ErrorCode;
1919
use databend_common_exception::Result;
2020
use databend_common_expression::BlockMetaInfoDowncast;
2121
use databend_common_expression::DataBlock;
22-
use databend_common_expression::DataSchemaRef;
2322
use databend_common_pipeline_core::processors::Event;
2423
use databend_common_pipeline_core::processors::Processor;
2524
use databend_common_pipeline_transforms::processors::sort::Rows;
@@ -62,22 +61,14 @@ impl<R: Rows + 'static> TransformSortBoundBroadcast<R> {
6261
pub struct SortSampleState {
6362
ctx: Arc<QueryContext>,
6463
broadcast_id: u32,
65-
#[expect(dead_code)]
66-
schema: DataSchemaRef,
6764
batch_rows: usize,
6865
}
6966

7067
impl SortSampleState {
71-
pub fn new(
72-
schema: DataSchemaRef,
73-
batch_rows: usize,
74-
ctx: Arc<QueryContext>,
75-
broadcast_id: u32,
76-
) -> SortSampleState {
68+
pub fn new(batch_rows: usize, ctx: Arc<QueryContext>, broadcast_id: u32) -> SortSampleState {
7769
SortSampleState {
7870
ctx,
7971
broadcast_id,
80-
schema,
8172
batch_rows,
8273
}
8374
}
@@ -110,7 +101,7 @@ impl SortSampleState {
110101

111102
#[async_trait::async_trait]
112103
impl<R: Rows + 'static> HookTransform for TransformSortBoundBroadcast<R> {
113-
const NAME: &'static str = "TransformSortBoundBroadcast";
104+
const NAME: &'static str = "SortBoundBroadcast";
114105

115106
fn on_input(&mut self, mut data: DataBlock) -> Result<()> {
116107
let meta = data

src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ enum SortType {
4444
Sort(Arc<InputPort>),
4545

4646
Collect(Arc<InputPort>),
47-
BoundBroadcast(Arc<InputPort>),
47+
BoundBroadcast {
48+
input: Arc<InputPort>,
49+
state: SortSampleState,
50+
},
4851
Restore(Arc<InputPort>),
4952

5053
BoundedMergeSort(Vec<Arc<InputPort>>),
@@ -118,7 +121,6 @@ impl TransformSortBuilder {
118121
params: self,
119122
output,
120123
typ: Some(SortType::Sort(input)),
121-
state: None,
122124
};
123125

124126
select_row_type(&mut build)
@@ -135,7 +137,6 @@ impl TransformSortBuilder {
135137
params: self,
136138
output,
137139
typ: Some(SortType::Collect(input)),
138-
state: None,
139140
};
140141

141142
select_row_type(&mut build)
@@ -152,8 +153,7 @@ impl TransformSortBuilder {
152153
let mut build = Build {
153154
params: self,
154155
output,
155-
typ: Some(SortType::BoundBroadcast(input)),
156-
state: Some(state),
156+
typ: Some(SortType::BoundBroadcast { input, state }),
157157
};
158158

159159
select_row_type(&mut build)
@@ -170,7 +170,6 @@ impl TransformSortBuilder {
170170
params: self,
171171
output,
172172
typ: Some(SortType::Restore(input)),
173-
state: None,
174173
};
175174

176175
select_row_type(&mut build)
@@ -197,7 +196,6 @@ impl TransformSortBuilder {
197196
params: self,
198197
output,
199198
typ: Some(SortType::BoundedMergeSort(inputs)),
200-
state: None,
201199
};
202200

203201
select_row_type(&mut build)
@@ -233,7 +231,7 @@ impl TransformSortBuilder {
233231
ctx: Arc<QueryContext>,
234232
broadcast_id: u32,
235233
) -> Result<()> {
236-
let state = SortSampleState::new(self.inner_schema(), batch_rows, ctx, broadcast_id);
234+
let state = SortSampleState::new(batch_rows, ctx, broadcast_id);
237235

238236
pipeline.resize(1, false)?;
239237
pipeline.add_transform(|input, output| {
@@ -274,7 +272,6 @@ struct Build<'a> {
274272
params: &'a TransformSortBuilder,
275273
typ: Option<SortType>,
276274
output: Arc<OutputPort>,
277-
state: Option<SortSampleState>,
278275
}
279276

280277
impl Build<'_> {
@@ -333,12 +330,18 @@ impl Build<'_> {
333330
)?))
334331
}
335332

336-
fn build_bound_broadcast<R>(&mut self, input: Arc<InputPort>) -> Result<Box<dyn Processor>>
337-
where R: Rows + 'static {
333+
fn build_bound_broadcast<R>(
334+
&mut self,
335+
input: Arc<InputPort>,
336+
state: SortSampleState,
337+
) -> Result<Box<dyn Processor>>
338+
where
339+
R: Rows + 'static,
340+
{
338341
Ok(TransformSortBoundBroadcast::<R>::create(
339342
input,
340343
self.output.clone(),
341-
self.state.clone().unwrap(),
344+
state,
342345
))
343346
}
344347

@@ -386,7 +389,9 @@ impl RowsTypeVisitor for Build<'_> {
386389
true => self.build_sort_collect::<LoserTreeSort<R>, C>(limit_sort, input),
387390
false => self.build_sort_collect::<HeapSort<R>, C>(limit_sort, input),
388391
},
389-
SortType::BoundBroadcast(input) => self.build_bound_broadcast::<R>(input),
392+
SortType::BoundBroadcast { input, state } => {
393+
self.build_bound_broadcast::<R>(input, state)
394+
}
390395
SortType::Restore(input) => match self.params.enable_loser_tree {
391396
true => self.build_sort_restore::<LoserTreeSort<R>>(input),
392397
false => self.build_sort_restore::<HeapSort<R>>(input),

src/query/service/src/pipelines/processors/transforms/sort/sort_restore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ impl SortBoundEdge {
152152

153153
impl Processor for SortBoundEdge {
154154
fn name(&self) -> String {
155-
String::from("SortBoundEdgeTransform")
155+
String::from("SortBoundEdge")
156156
}
157157

158158
fn as_any(&mut self) -> &mut dyn Any {

src/query/service/tests/it/sql/planner/optimizer/data/results/tpcds/Q01_physical.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ Limit
33
├── limit: 100
44
├── offset: 0
55
├── estimated rows: 0.00
6-
└── Sort
6+
└── Sort(Final)
77
├── output columns: [customer.c_customer_id (#79)]
88
├── sort keys: [c_customer_id ASC NULLS LAST]
99
├── estimated rows: 0.00
1010
└── Exchange
1111
├── output columns: [customer.c_customer_id (#79), #_order_col]
1212
├── exchange type: Merge
13-
└── Sort
13+
└── Sort(Partial)
1414
├── output columns: [customer.c_customer_id (#79), #_order_col]
1515
├── sort keys: [c_customer_id ASC NULLS LAST]
1616
├── estimated rows: 0.00

src/query/service/tests/it/sql/planner/optimizer/data/results/tpcds/Q03_physical.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ Limit
33
├── limit: 100
44
├── offset: 0
55
├── estimated rows: 100.00
6-
└── Sort
6+
└── Sort(Final)
77
├── output columns: [SUM(ss_ext_sales_price) (#73), dt.d_year (#6), item.i_brand (#59), item.i_brand_id (#58)]
88
├── sort keys: [d_year ASC NULLS LAST, SUM(ss_ext_sales_price) DESC NULLS LAST, i_brand_id ASC NULLS LAST]
99
├── estimated rows: 143057683321996.78
1010
└── Exchange
1111
├── output columns: [SUM(ss_ext_sales_price) (#73), dt.d_year (#6), item.i_brand (#59), item.i_brand_id (#58), #_order_col]
1212
├── exchange type: Merge
13-
└── Sort
13+
└── Sort(Partial)
1414
├── output columns: [SUM(ss_ext_sales_price) (#73), dt.d_year (#6), item.i_brand (#59), item.i_brand_id (#58), #_order_col]
1515
├── sort keys: [d_year ASC NULLS LAST, SUM(ss_ext_sales_price) DESC NULLS LAST, i_brand_id ASC NULLS LAST]
1616
├── estimated rows: 143057683321996.78

src/query/service/tests/it/sql/planner/optimizer/optimizer_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,7 @@ fn configure_optimizer(ctx: &Arc<QueryContext>, auto_stats: bool) -> Result<()>
606606
settings.set_setting("enable_dphyp".to_string(), "1".to_string())?;
607607
settings.set_setting("max_push_down_limit".to_string(), "10000".to_string())?;
608608
settings.set_setting("enable_optimizer_trace".to_string(), "1".to_string())?;
609+
settings.set_setting("enable_shuffle_sort".to_string(), "0".to_string())?;
609610

610611
if auto_stats {
611612
settings.set_optimizer_skip_list("".to_string())

src/query/sql/src/executor/physical_plans/physical_sort.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ pub enum SortStep {
5858
Single,
5959

6060
// cluster mode
61-
Partial, // before the exchange plan
62-
FinalMerge, // after the exchange plan
61+
Partial, // before the exchange plan
62+
Final, // after the exchange plan
6363

6464
// range shuffle mode
6565
Sample,
66-
SortShuffled,
66+
Shuffled,
6767
Route,
6868
}
6969

@@ -72,9 +72,9 @@ impl Display for SortStep {
7272
match self {
7373
SortStep::Single => write!(f, "Single"),
7474
SortStep::Partial => write!(f, "Partial"),
75-
SortStep::FinalMerge => write!(f, "FinalMerge"),
75+
SortStep::Final => write!(f, "Final"),
7676
SortStep::Sample => write!(f, "Sample"),
77-
SortStep::SortShuffled => write!(f, "SortShuffled"),
77+
SortStep::Shuffled => write!(f, "Shuffled"),
7878
SortStep::Route => write!(f, "Route"),
7979
}
8080
}
@@ -97,7 +97,7 @@ impl Sort {
9797
pub fn output_schema(&self) -> Result<DataSchemaRef> {
9898
let input_schema = self.input.output_schema()?;
9999
match self.step {
100-
SortStep::FinalMerge | SortStep::Route => {
100+
SortStep::Final | SortStep::Route => {
101101
let mut fields = input_schema.fields().clone();
102102
// If the plan is after exchange plan in cluster mode,
103103
// the order column is at the last of the input schema.
@@ -109,7 +109,7 @@ impl Sort {
109109
fields.pop();
110110
Ok(DataSchemaRefExt::create(fields))
111111
}
112-
SortStep::SortShuffled => Ok(input_schema),
112+
SortStep::Shuffled => Ok(input_schema),
113113
SortStep::Single | SortStep::Partial | SortStep::Sample => {
114114
let mut fields = self
115115
.pre_projection
@@ -188,7 +188,7 @@ impl PhysicalPlanBuilder {
188188

189189
let sort_step = match sort.after_exchange {
190190
Some(false) => SortStep::Partial,
191-
Some(true) => SortStep::FinalMerge,
191+
Some(true) => SortStep::Final,
192192
None => SortStep::Single,
193193
};
194194

@@ -247,7 +247,7 @@ impl PhysicalPlanBuilder {
247247
input: Box::new(input_plan),
248248
order_by,
249249
limit: sort.limit,
250-
step: SortStep::FinalMerge,
250+
step: SortStep::Final,
251251
pre_projection: None,
252252
broadcast_id: None,
253253
stat_info: Some(stat_info),
@@ -293,7 +293,7 @@ impl PhysicalPlanBuilder {
293293
input: Box::new(exchange),
294294
order_by,
295295
limit: sort.limit,
296-
step: SortStep::SortShuffled,
296+
step: SortStep::Shuffled,
297297
pre_projection: None,
298298
broadcast_id: None,
299299
stat_info: Some(stat_info),

0 commit comments

Comments
 (0)