Skip to content

Commit d7eb8cb

Browse files
authored
fix: cluster table with change tracking enabled append panic (#14956)
* fix cluster table with change tracking * add sqllogic test
1 parent 45e15cc commit d7eb8cb

File tree

3 files changed

+67
-5
lines changed

3 files changed

+67
-5
lines changed

src/query/service/src/pipelines/builders/builder_replace_into.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,9 @@ impl PipelineBuilder {
117117
.ctx
118118
.build_table_by_table_info(catalog_info, table_info, None)?;
119119
let table = FuseTable::try_from_table(table.as_ref())?;
120+
let schema = DataSchema::from(table.schema()).into();
120121
let cluster_stats_gen =
121-
table.get_cluster_stats_gen(self.ctx.clone(), 0, *block_thresholds, None)?;
122+
table.get_cluster_stats_gen(self.ctx.clone(), 0, *block_thresholds, Some(schema))?;
122123
self.build_pipeline(input)?;
123124
// connect to broadcast processor and append transform
124125
let serialize_block_transform = TransformSerializeBlock::try_create(
@@ -282,7 +283,7 @@ impl PipelineBuilder {
282283
let table = FuseTable::try_from_table(tbl.as_ref())?;
283284
self.build_pipeline(input)?;
284285
let mut delete_column_idx = 0;
285-
let mut opt_modified_schema = None;
286+
let mut modified_schema = DataSchema::from(target_schema.clone()).into();
286287
if let Some(ReplaceSelectCtx {
287288
select_column_bindings,
288289
select_schema,
@@ -303,7 +304,7 @@ impl PipelineBuilder {
303304
target_schema
304305
.fields
305306
.insert(delete_column_idx, delete_column);
306-
opt_modified_schema = Some(Arc::new(target_schema.clone()));
307+
modified_schema = Arc::new(target_schema.clone());
307308
}
308309
let target_schema = Arc::new(target_schema.clone());
309310
if target_schema.fields().len() != select_schema.fields().len() {
@@ -337,7 +338,7 @@ impl PipelineBuilder {
337338
self.ctx.clone(),
338339
&mut self.main_pipeline,
339340
table.get_block_thresholds(),
340-
opt_modified_schema,
341+
Some(modified_schema),
341342
)?;
342343
// 1. resize input to 1, since the UpsertTransform need to de-duplicate inputs "globally"
343344
self.main_pipeline.try_resize(1)?;

src/query/storages/fuse/src/operations/append.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ impl FuseTable {
7272
}
7373
}
7474

75+
let schema = DataSchema::from(self.schema()).into();
7576
let cluster_stats_gen =
76-
self.cluster_gen_for_append(ctx.clone(), pipeline, block_thresholds, None)?;
77+
self.cluster_gen_for_append(ctx.clone(), pipeline, block_thresholds, Some(schema))?;
7778
pipeline.add_transform(|input, output| {
7879
let proc = TransformSerializeBlock::try_create(
7980
ctx.clone(),

tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,66 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n
135135
7 0 0 1 0
136136
8 0 0 0 0
137137

138+
###############
139+
# issue 14955 #
140+
###############
141+
142+
statement ok
143+
create table t3(a int, b int) cluster by(a+1) change_tracking=true
144+
145+
statement ok
146+
insert into t3 values(1, 1), (3, 3), (2, 3)
147+
148+
statement ok
149+
update t3 set b = 2 where a = 2
150+
151+
statement ok
152+
delete from t3 where a = 3
153+
154+
statement ok
155+
insert into t3 values(4, 4)
156+
157+
statement ok
158+
alter table t3 recluster
159+
160+
statement ok
161+
insert into t3 values(0, 0)
162+
163+
statement ok
164+
optimize table t3 compact
165+
166+
statement ok
167+
create table t4(a int, b int, c int)
168+
169+
statement ok
170+
insert into t4 values(0, 1, 0), (3, 4, 3), (4, 5, 4)
171+
172+
statement ok
173+
replace into t3 on(a) delete when c = 0 select * from t4
174+
175+
query III
176+
merge into t3 using t4 on t3.a = t4.a when matched and t4.a = 4 then delete when matched then update set t3.b = t4.c when not matched then insert values(t4.a, t4.c)
177+
----
178+
1 1 1
179+
180+
query IIBBII
181+
select a, b, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t3
182+
----
183+
0 0 0 0 0 0
184+
1 1 0 0 0 0
185+
2 2 0 0 1 1
186+
3 3 0 0 0 1
187+
188+
statement ok
189+
drop table t4 all
190+
191+
statement ok
192+
drop table t3 all
193+
194+
######################
195+
# end of issue 14955 #
196+
######################
197+
138198
statement ok
139199
set enable_experimental_merge_into = 0
140200

0 commit comments

Comments
 (0)