Skip to content

Commit 06d7ed3

Browse files
authored
fix(query): fix cse projection (#15409)
1 parent ff0af2f commit 06d7ed3

File tree

3 files changed

+22
-52
lines changed

3 files changed

+22
-52
lines changed

src/query/sql/src/evaluator/block_operator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ impl BlockOperator {
7171
let evaluator = Evaluator::new(&input, func_ctx, &BUILTIN_FUNCTIONS);
7272
let result = evaluator.run(expr)?;
7373
let col = BlockEntry::new(expr.data_type().clone(), result);
74+
7475
input.add_column(col);
7576
}
7677
match projections {

src/query/sql/src/evaluator/cse.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,36 +74,31 @@ pub fn apply_cse(
7474
temp_var_counter += 1;
7575
}
7676

77-
let mut new_projections: Option<ColumnSet> =
78-
projections.as_ref().map(|projections| {
79-
projections
80-
.iter()
81-
.filter(|idx| **idx < input_num_columns)
82-
.copied()
83-
.collect::<ColumnSet>()
84-
});
85-
86-
let has_projections = new_projections.is_some();
77+
let projections = projections
78+
.unwrap_or((0..input_num_columns + exprs.len()).collect::<ColumnSet>());
79+
80+
// Regenerate the projections based on the replacements
81+
// 1. Initialize the new_projections with the original projections with unchanged indexes
82+
let mut new_projections = projections
83+
.iter()
84+
.filter(|idx| **idx < input_num_columns)
85+
.copied()
86+
.collect::<ColumnSet>();
87+
8788
for mut expr in exprs {
8889
perform_cse_replacement(&mut expr, &cse_replacements);
8990
new_exprs.push(expr);
9091

91-
if has_projections {
92-
// Safe to unwrap().
93-
if projections
94-
.as_ref()
95-
.unwrap()
96-
.contains(&(temp_var_counter - candidates_nums))
97-
{
98-
new_projections.as_mut().unwrap().insert(temp_var_counter);
99-
}
92+
// 2. Increment projection index because the position is occupied by the cse
93+
if projections.contains(&(temp_var_counter - candidates_nums)) {
94+
new_projections.insert(temp_var_counter);
10095
}
10196
temp_var_counter += 1;
10297
}
10398

10499
results.push(BlockOperator::Map {
105100
exprs: new_exprs,
106-
projections: new_projections,
101+
projections: Some(new_projections),
107102
});
108103
} else {
109104
results.push(BlockOperator::Map { exprs, projections });

tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -743,50 +743,24 @@ select row_count from fuse_block('db_09_0008', 't13')
743743
7
744744
14
745745

746-
statement ok
747-
DROP TABLE m
748-
749-
statement ok
750-
DROP TABLE t
751-
752-
statement ok
753-
DROP TABLE t1
754-
755-
statement ok
756-
DROP TABLE t2
757-
758-
statement ok
759-
DROP TABLE t3
760-
761-
statement ok
762-
DROP TABLE t4
763-
764-
statement ok
765-
DROP TABLE t5
766-
767-
statement ok
768-
DROP TABLE t6
769-
770-
statement ok
771-
DROP TABLE t7
772746

773747
statement ok
774-
DROP TABLE t8
748+
create table test_abc(a string, b string, c string) cluster by ( substr(a, 1, 2), substr(b, 1, 3) );
775749

776750
statement ok
777-
DROP TABLE t9
751+
create table test_abc_random(a string, b string, c string) engine = random;
778752

779753
statement ok
780-
DROP TABLE t10
754+
insert into test_abc select * from test_abc_random limit 1000;
781755

782756
statement ok
783-
DROP TABLE t11
757+
insert into test_abc select * from test_abc_random limit 1000;
784758

785759
statement ok
786-
DROP TABLE t12
760+
insert into test_abc select * from test_abc_random limit 1000;
787761

788762
statement ok
789-
DROP TABLE t13
763+
alter table test_abc recluster final;
790764

791765
statement ok
792766
DROP DATABASE db_09_0008

0 commit comments

Comments
 (0)