Skip to content

Commit 570d3c5

Browse files
committed
fix
1 parent 27a1b4c commit 570d3c5

File tree

8 files changed

+35
-20
lines changed

8 files changed

+35
-20
lines changed

src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub fn try_add_multi_sort_merge(
5757
remove_order_col: bool,
5858
enable_loser_tree: bool,
5959
) -> Result<()> {
60-
debug_assert!(remove_order_col == schema.has_field(ORDER_COL_NAME));
60+
debug_assert!(!remove_order_col == schema.has_field(ORDER_COL_NAME));
6161

6262
if pipeline.is_empty() {
6363
return Err(ErrorCode::Internal("Cannot resize empty pipe."));

src/query/service/src/pipelines/builders/builder_sort.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,6 @@ impl PipelineBuilder {
9494
});
9595
}
9696

97-
let max_threads = self.settings.get_max_threads()? as usize;
98-
99-
// TODO(Winter): the query will hang in MultiSortMergeProcessor when max_threads == 1 and output_len != 1
100-
if self.main_pipeline.output_len() == 1 || max_threads == 1 {
101-
self.main_pipeline.try_resize(max_threads)?;
102-
}
103-
10497
let builder = SortPipelineBuilder::create(
10598
self.ctx.clone(),
10699
output_schema,
@@ -109,10 +102,14 @@ impl PipelineBuilder {
109102
)?
110103
.with_limit(sort.limit);
111104

105+
let max_threads = self.settings.get_max_threads()? as usize;
112106
match sort.step {
113107
SortStep::Single => {
114108
// Build for single node mode.
115109
// We build the full sort pipeline for it.
110+
if max_threads == 1 {
111+
self.main_pipeline.try_resize(1)?;
112+
}
116113
builder
117114
.remove_order_col_at_last()
118115
.build_full_sort_pipeline(&mut self.main_pipeline)
@@ -122,6 +119,9 @@ impl PipelineBuilder {
122119
// Build for each cluster node.
123120
// We build the full sort pipeline for it.
124121
// Don't remove the order column at last.
122+
if max_threads == 1 {
123+
self.main_pipeline.try_resize(1)?;
124+
}
125125
builder.build_full_sort_pipeline(&mut self.main_pipeline)
126126
}
127127
SortStep::Final => {
@@ -130,6 +130,10 @@ impl PipelineBuilder {
130130
// as the data is already sorted in each cluster node.
131131
// The input number of the transform is equal to the number of cluster nodes.
132132
if self.main_pipeline.output_len() > 1 {
133+
if self.main_pipeline.output_len() != 1 && max_threads == 1 {
134+
// TODO(Winter): the query will hang in MultiSortMergeProcessor when max_threads == 1 and output_len != 1
135+
self.main_pipeline.try_resize(1)?;
136+
}
133137
builder
134138
.remove_order_col_at_last()
135139
.build_multi_merge(&mut self.main_pipeline)
@@ -141,6 +145,9 @@ impl PipelineBuilder {
141145
}
142146

143147
SortStep::Sample => {
148+
if max_threads == 1 {
149+
self.main_pipeline.try_resize(1)?;
150+
}
144151
builder.build_sample(&mut self.main_pipeline)?;
145152
self.exchange_injector = TransformSortBuilder::exchange_injector();
146153
Ok(())
@@ -155,6 +162,10 @@ impl PipelineBuilder {
155162
self.build_pipeline(&sort.input)?;
156163
}
157164

165+
if self.main_pipeline.output_len() != 1 && max_threads == 1 {
166+
// TODO(Winter): the query will hang in MultiSortMergeProcessor when max_threads == 1 and output_len != 1
167+
unimplemented!();
168+
}
158169
builder
159170
.remove_order_col_at_last()
160171
.build_bounded_merge_sort(&mut self.main_pipeline)

src/query/sql/src/executor/physical_plans/physical_sort.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ impl PhysicalPlanBuilder {
228228
}));
229229
};
230230

231-
if !self.ctx.get_settings().get_enable_shuffle_sort()? {
231+
let settings = self.ctx.get_settings();
232+
if settings.get_max_threads()? == 1 || !settings.get_enable_shuffle_sort()? {
232233
let input_plan = self.build(s_expr.unary_child(), required).await?;
233234
return if !after_exchange {
234235
Ok(PhysicalPlan::Sort(Sort {

tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ use i15760;
77
statement ok
88
set auto_compaction_imperfect_blocks_threshold = 3;
99

10+
statement ok
11+
set enable_shuffle_sort = 0;
12+
1013
statement ok
1114
set enable_parallel_multi_merge_sort = 0;
1215

tests/sqllogictests/suites/mode/cluster/lazy_read.test

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ create or replace table t_lazy (a int not null, b float not null, c string not n
44
statement ok
55
set max_threads = 4;
66

7+
statement ok
8+
set enable_shuffle_sort = 0;
9+
710
statement ok
811
insert into t_lazy select number + 1, 1.1, '1.1', (1,2), '2020-01-01' from numbers(100)
912

@@ -30,14 +33,14 @@ RowFetch
3033
├── limit: 2
3134
├── offset: 0
3235
├── estimated rows: 2.00
33-
└── Sort
36+
└── Sort(Final)
3437
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7)]
3538
├── sort keys: [a DESC NULLS LAST]
3639
├── estimated rows: 300.00
3740
└── Exchange
3841
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7), #_order_col]
3942
├── exchange type: Merge
40-
└── Sort
43+
└── Sort(Partial)
4144
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7), #_order_col]
4245
├── sort keys: [a DESC NULLS LAST]
4346
├── estimated rows: 300.00
@@ -98,14 +101,14 @@ Limit
98101
├── limit: 2
99102
├── offset: 0
100103
├── estimated rows: 2.00
101-
└── Sort
104+
└── Sort(Final)
102105
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
103106
├── sort keys: [a DESC NULLS LAST]
104107
├── estimated rows: 300.00
105108
└── Exchange
106109
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6), #_order_col]
107110
├── exchange type: Merge
108-
└── Sort
111+
└── Sort(Partial)
109112
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6), #_order_col]
110113
├── sort keys: [a DESC NULLS LAST]
111114
├── estimated rows: 300.00

tests/sqllogictests/suites/mode/cluster/shuffle.test

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ set enable_shuffle_sort = 0;
2222
query T
2323
explain select * from (select t1.number, t2.number from t1 right outer join (SELECT number FROM t2 QUALIFY row_number() OVER (PARTITION BY number ORDER BY number DESC ) = 1) AS t2 ON t1.number = t2.number) as tt(a, b) order by a;
2424
----
25-
Sort
25+
Sort(Final)
2626
├── output columns: [t1.number (#0), t2.number (#1)]
2727
├── sort keys: [number ASC NULLS LAST]
2828
├── estimated rows: 0.01
2929
└── Exchange
3030
├── output columns: [t1.number (#0), t2.number (#1), #_order_col]
3131
├── exchange type: Merge
32-
└── Sort
32+
└── Sort(Partial)
3333
├── output columns: [t1.number (#0), t2.number (#1), #_order_col]
3434
├── sort keys: [number ASC NULLS LAST]
3535
├── estimated rows: 0.01

tests/sqllogictests/suites/mode/cluster/window.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ Limit
200200
├── limit: 10
201201
├── offset: 0
202202
├── estimated rows: 0.00
203-
└── Sort
203+
└── Sort(Single)
204204
├── output columns: [sales.customer_id (#2), customer_avg (#8), diff_from_overall_avg (#9)]
205205
├── sort keys: [diff_from_overall_avg DESC NULLS LAST, customer_id ASC NULLS LAST]
206206
├── estimated rows: 0.00

tests/sqllogictests/suites/query/functions/02_0062_function_unnest.test

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,7 @@ NULL [1,2] 3
158158
5 [3,4,5] 3
159159

160160
statement ok
161-
drop table t;
162-
163-
statement ok
164-
create table t (a array(int));
161+
create or replace table t (a array(int));
165162

166163
statement ok
167164
insert into t values ([1,2])

0 commit comments

Comments
 (0)