Skip to content

Commit 4f2e1d0

Browse files
authored
feat: change tracking enabled table support replace into (#14831)
* change enabled table support replace into * add sqllogic test * update test
1 parent d6b0a07 commit 4f2e1d0

File tree

7 files changed

+140
-22
lines changed

7 files changed

+140
-22
lines changed

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,6 @@ impl ReplaceInterpreter {
132132

133133
// check mutability
134134
table.check_mutable()?;
135-
// check change tracking
136-
if table.change_tracking_enabled() {
137-
return Err(ErrorCode::Unimplemented(format!(
138-
"change tracking is enabled for table '{}', does not support REPLACE",
139-
table.name(),
140-
)));
141-
}
142135

143136
let catalog = self.ctx.get_catalog(&plan.catalog).await?;
144137
let schema = table.schema();

src/query/service/src/pipelines/builders/builder_replace_into.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ impl PipelineBuilder {
120120
cluster_stats_gen,
121121
MutationKind::Replace,
122122
)?;
123-
let block_builder = serialize_block_transform.get_block_builder();
123+
let mut block_builder = serialize_block_transform.get_block_builder();
124+
block_builder.source_schema = table.schema_with_stream();
124125

125126
let serialize_segment_transform = TransformSerializeSegment::new(
126127
self.ctx.clone(),

src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl TransformSerializeBlock {
8888
.filter(|f| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_))))
8989
.cloned()
9090
.collect::<Vec<_>>();
91-
if !matches!(kind, MutationKind::Insert) {
91+
if !matches!(kind, MutationKind::Insert | MutationKind::Replace) {
9292
// add stream fields.
9393
for stream_column in table.stream_columns().iter() {
9494
fields.push(stream_column.table_field());

src/query/storages/fuse/src/operations/replace.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,7 @@ impl FuseTable {
114114
bloom_filter_column_indexes.clone(),
115115
chunk_of_segment_locations,
116116
block_slots.clone(),
117-
self.operator.clone(),
118-
self.table_info.schema(),
119-
self.get_write_settings(),
117+
self,
120118
read_settings,
121119
block_builder.clone(),
122120
io_request_semaphore.clone(),

src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,29 @@ use databend_common_base::base::tokio::sync::Semaphore;
2222
use databend_common_base::base::ProgressValues;
2323
use databend_common_base::runtime::GlobalIORuntime;
2424
use databend_common_base::runtime::TrySpawn;
25+
use databend_common_catalog::plan::gen_mutation_stream_meta;
2526
use databend_common_catalog::plan::Projection;
27+
use databend_common_catalog::plan::StreamColumn;
28+
use databend_common_catalog::table::Table;
2629
use databend_common_catalog::table_context::TableContext;
2730
use databend_common_exception::ErrorCode;
2831
use databend_common_exception::Result;
32+
use databend_common_expression::types::DataType;
33+
use databend_common_expression::types::NumberDataType;
34+
use databend_common_expression::types::UInt64Type;
35+
use databend_common_expression::BlockEntry;
36+
use databend_common_expression::Column;
2937
use databend_common_expression::ColumnId;
3038
use databend_common_expression::ComputedExpr;
3139
use databend_common_expression::DataBlock;
3240
use databend_common_expression::FieldIndex;
41+
use databend_common_expression::FromData;
3342
use databend_common_expression::Scalar;
34-
use databend_common_expression::TableSchema;
43+
use databend_common_expression::Value;
3544
use databend_common_metrics::storage::*;
3645
use databend_common_sql::evaluator::BlockOperator;
3746
use databend_common_sql::executor::physical_plans::OnConflictField;
47+
use databend_common_sql::gen_mutation_stream_operator;
3848
use databend_storages_common_cache::LoadParams;
3949
use databend_storages_common_index::filters::Filter;
4050
use databend_storages_common_index::filters::Xor8Filter;
@@ -68,6 +78,7 @@ use crate::operations::replace_into::meta::MergeIntoOperation;
6878
use crate::operations::replace_into::meta::UniqueKeyDigest;
6979
use crate::operations::replace_into::mutator::row_hash_of_columns;
7080
use crate::operations::replace_into::mutator::DeletionAccumulator;
81+
use crate::FuseTable;
7182

7283
struct AggregationContext {
7384
segment_locations: AHashMap<SegmentIndex, Location>,
@@ -90,6 +101,8 @@ struct AggregationContext {
90101
block_builder: BlockBuilder,
91102
io_request_semaphore: Arc<Semaphore>,
92103
query_id: String,
104+
stream_columns: Vec<StreamColumn>,
105+
stream_operators: Vec<BlockOperator>,
93106
}
94107

95108
// Apply MergeIntoOperations to segments
@@ -107,13 +120,16 @@ impl MergeIntoOperationAggregator {
107120
bloom_filter_column_indexes: Vec<FieldIndex>,
108121
segment_locations: Vec<(SegmentIndex, Location)>,
109122
block_slots: Option<BlockSlotDescription>,
110-
data_accessor: Operator,
111-
table_schema: Arc<TableSchema>,
112-
write_settings: WriteSettings,
123+
table: &FuseTable,
113124
read_settings: ReadSettings,
114125
block_builder: BlockBuilder,
115126
io_request_semaphore: Arc<Semaphore>,
116127
) -> Result<Self> {
128+
let data_accessor = table.get_operator();
129+
let table_schema = table.schema_with_stream();
130+
let write_settings = table.get_write_settings();
131+
let update_stream_columns = table.change_tracking_enabled();
132+
117133
let deletion_accumulator = DeletionAccumulator::default();
118134
let segment_reader =
119135
MetaReaders::segment_info_reader(data_accessor.clone(), table_schema.clone());
@@ -145,7 +161,7 @@ impl MergeIntoOperationAggregator {
145161
table_schema.clone(),
146162
projection,
147163
false,
148-
false,
164+
update_stream_columns,
149165
false,
150166
)
151167
}?;
@@ -158,16 +174,23 @@ impl MergeIntoOperationAggregator {
158174
let reader = BlockReader::create(
159175
ctx.clone(),
160176
data_accessor.clone(),
161-
table_schema,
177+
table_schema.clone(),
162178
projection,
163179
false,
164-
false,
180+
update_stream_columns,
165181
false,
166182
)?;
167183
Some(reader)
168184
}
169185
};
170186
let query_id = ctx.get_id();
187+
188+
let (stream_columns, stream_operators) = if update_stream_columns {
189+
gen_mutation_stream_operator(table_schema, table.get_table_info().ident.seq, true)?
190+
} else {
191+
(vec![], vec![])
192+
};
193+
171194
Ok(Self {
172195
ctx,
173196
deletion_accumulator,
@@ -186,6 +209,8 @@ impl MergeIntoOperationAggregator {
186209
block_builder,
187210
io_request_semaphore,
188211
query_id,
212+
stream_columns,
213+
stream_operators,
189214
}),
190215
})
191216
}
@@ -471,7 +496,7 @@ impl AggregationContext {
471496
let bitmap = bitmap.into();
472497
let mut key_columns_data_after_deletion = key_columns_data.filter_with_bitmap(&bitmap)?;
473498

474-
let new_block = match &self.remain_column_reader {
499+
let mut new_block = match &self.remain_column_reader {
475500
None => key_columns_data_after_deletion,
476501
Some(remain_columns_reader) => {
477502
metrics_inc_replace_block_number_totally_loaded(1);
@@ -505,6 +530,31 @@ impl AggregationContext {
505530
}
506531
};
507532

533+
if self.key_column_reader.update_stream_columns {
534+
// generate row id column
535+
let mut row_ids = Vec::with_capacity(num_rows);
536+
for i in 0..num_rows {
537+
row_ids.push(i as u64);
538+
}
539+
let value = Value::Column(Column::filter(&UInt64Type::from_data(row_ids), &bitmap));
540+
let row_num = BlockEntry::new(
541+
DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
542+
value.wrap_nullable(None),
543+
);
544+
new_block.add_column(row_num);
545+
546+
let stream_meta = gen_mutation_stream_meta(None, &block_meta.location.0)?;
547+
for stream_column in self.stream_columns.iter() {
548+
let entry = stream_column.generate_column_values(&stream_meta, num_rows);
549+
new_block.add_column(entry);
550+
}
551+
let func_ctx = self.block_builder.ctx.get_function_context()?;
552+
new_block = self
553+
.stream_operators
554+
.iter()
555+
.try_fold(new_block, |input, op| op.execute(&func_ctx, input))?;
556+
}
557+
508558
// serialization and compression is cpu intensive, send them to dedicated thread pool
509559
// and wait (asyncly, which will NOT block the executor thread)
510560
let block_builder = self.block_builder.clone();
@@ -749,6 +799,7 @@ mod tests {
749799
use databend_common_expression::types::NumberScalar;
750800
use databend_common_expression::TableDataType;
751801
use databend_common_expression::TableField;
802+
use databend_common_expression::TableSchema;
752803

753804
use super::*;
754805

tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,18 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n
7171
statement error 1065
7272
select change$is_update from t
7373

74-
statement error 1002
75-
replace into t on(a) values(7)
74+
statement ok
75+
replace into t on(a) values(6),(7)
76+
77+
query IBBII
78+
select a, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t order by a
79+
----
80+
1 0 0 0 0
81+
2 0 0 1 0
82+
3 0 0 1 1
83+
5 0 0 0 0
84+
6 0 0 0 0
85+
7 0 0 1 0
7686

7787
statement ok
7888
create table t2(a int)

tests/sqllogictests/suites/ee/01_ee_system/01_0003_stream.test

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,13 +420,78 @@ select a, `B`, c, change$action, change$is_update from s6 order by a, `B`
420420
3 3 3 INSERT 0
421421
4 4 4 INSERT 0
422422

423+
# test truncate
424+
statement ok
425+
truncate table t6
426+
427+
query IIITB
428+
select a, `B`, c, change$action, change$is_update from s6 order by a, `B`
429+
----
430+
1 1 1 DELETE 0
431+
2 2 2 DELETE 0
432+
423433
statement ok
424434
drop stream s6
425435

426436
######################
427437
# end of issue 14506 #
428438
######################
429439

440+
statement ok
441+
create table t7(a int, b int)
442+
443+
statement ok
444+
insert into t7 values(1, 1), (2, 2), (3, 3)
445+
446+
statement ok
447+
create stream s7 on table t7 append_only = false
448+
449+
statement ok
450+
create table t8(a int, b int, c int)
451+
452+
statement ok
453+
insert into t8 values(2, 2, 2), (3, 3, 3), (4, 4, 4)
454+
455+
# test replace into
456+
statement ok
457+
replace into t7 on(a, b) delete when a = 3 select * from t8
458+
459+
query IITB
460+
select a, b, change$action, change$is_update from s7 order by a, b
461+
----
462+
2 2 DELETE 0
463+
2 2 INSERT 0
464+
3 3 DELETE 0
465+
4 4 INSERT 0
466+
467+
query IIBBII
468+
select a, b, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t7 order by a
469+
----
470+
1 1 0 0 0 0
471+
2 2 0 0 0 0
472+
4 4 0 0 1 0
473+
474+
statement ok
475+
create stream s7_1 on table t7 at(stream => s7)
476+
477+
query IITB
478+
select a, b, change$action, change$is_update from s7_1 order by a, b
479+
----
480+
2 2 INSERT 0
481+
4 4 INSERT 0
482+
483+
statement ok
484+
drop stream s7
485+
486+
statement ok
487+
drop stream s7_1
488+
489+
statement ok
490+
drop table t7 all
491+
492+
statement ok
493+
drop table t8 all
494+
430495
statement error 2733
431496
create stream s_err on table t4 at (stream => s3)
432497

0 commit comments

Comments
 (0)