Skip to content

Commit 5f1541b

Browse files
authored
feat: change tracking enabled table support merge into (#14900)
* change tracking enabled table support merge into * add sqllogic test * make lint * update
1 parent 8dd06c3 commit 5f1541b

40 files changed

+788
-643
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ doctest = false
99
test = false
1010

1111
[dependencies]
12-
databend-common-arrow = { path = "../../common/arrow" }
1312
databend-common-base = { path = "../../common/base" }
1413
databend-common-config = { path = "../config" }
1514
databend-common-exception = { path = "../../common/exception" }

src/query/catalog/src/plan/internal_column.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pub struct InternalColumnMeta {
9898
/// The row offsets in the block.
9999
pub offsets: Option<Vec<usize>>,
100100
pub base_block_ids: Option<Scalar>,
101+
pub inner: Option<BlockMetaInfoPtr>,
101102
}
102103

103104
#[typetag::serde(name = "internal_column_meta")]

src/query/catalog/src/plan/stream_column.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@ use std::any::Any;
1616
use std::path::Path;
1717
use std::sync::Arc;
1818

19-
use databend_common_arrow::arrow::bitmap::Bitmap;
2019
use databend_common_base::base::uuid::Uuid;
2120
use databend_common_exception::ErrorCode;
2221
use databend_common_exception::Result;
2322
use databend_common_expression::types::decimal::DecimalScalar;
24-
use databend_common_expression::types::nullable::NullableColumn;
2523
use databend_common_expression::types::AnyType;
2624
use databend_common_expression::types::DataType;
2725
use databend_common_expression::types::DecimalDataType;
@@ -32,7 +30,6 @@ use databend_common_expression::BlockEntry;
3230
use databend_common_expression::BlockMetaInfo;
3331
use databend_common_expression::BlockMetaInfoDowncast;
3432
use databend_common_expression::BlockMetaInfoPtr;
35-
use databend_common_expression::Column;
3633
use databend_common_expression::ColumnId;
3734
use databend_common_expression::FromData;
3835
use databend_common_expression::Scalar;
@@ -128,16 +125,17 @@ impl StreamColumnMeta {
128125
}
129126
}
130127

131-
pub fn build_origin_block_row_num(num_rows: usize) -> Value<AnyType> {
128+
pub fn build_origin_block_row_num(num_rows: usize) -> BlockEntry {
132129
let mut row_ids = Vec::with_capacity(num_rows);
133130
for i in 0..num_rows {
134131
row_ids.push(i as u64);
135132
}
136-
let column = UInt64Type::from_data(row_ids);
137-
Value::Column(Column::Nullable(Box::new(NullableColumn {
138-
column,
139-
validity: Bitmap::new_constant(true, num_rows),
140-
})))
133+
let column = Value::Column(UInt64Type::from_data(row_ids));
134+
135+
BlockEntry::new(
136+
DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
137+
column.wrap_nullable(None),
138+
)
141139
}
142140

143141
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -218,10 +216,7 @@ impl StreamColumn {
218216
)))),
219217
meta.build_origin_block_id(),
220218
),
221-
StreamColumnType::OriginRowNum => BlockEntry::new(
222-
DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
223-
build_origin_block_row_num(num_rows),
224-
),
219+
StreamColumnType::OriginRowNum => build_origin_block_row_num(num_rows),
225220
}
226221
}
227222
}

src/query/service/src/interpreters/interpreter_copy_into_table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl CopyIntoTableInterpreter {
149149
None,
150150
None,
151151
false,
152+
false,
152153
)
153154
.await?,
154155
);

src/query/service/src/interpreters/interpreter_merge_into.rs

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ use databend_common_sql::executor::PhysicalPlanBuilder;
4747
use databend_common_sql::plans;
4848
use databend_common_sql::plans::MergeInto as MergePlan;
4949
use databend_common_sql::plans::RelOperator;
50-
use databend_common_sql::plans::UpdatePlan;
5150
use databend_common_sql::IndexType;
5251
use databend_common_sql::ScalarExpr;
5352
use databend_common_sql::TypeCheck;
@@ -207,13 +206,6 @@ impl MergeIntoInterpreter {
207206
// check mutability
208207
let check_table = self.ctx.get_table(catalog, database, table_name).await?;
209208
check_table.check_mutable()?;
210-
// check change tracking
211-
if check_table.change_tracking_enabled() {
212-
return Err(ErrorCode::Unimplemented(format!(
213-
"change tracking is enabled for table '{}', does not support MERGE INTO",
214-
check_table.name(),
215-
)));
216-
}
217209

218210
let update_stream_meta = build_update_stream_meta_seq(self.ctx.clone(), meta_data).await?;
219211

@@ -376,31 +368,23 @@ impl MergeIntoInterpreter {
376368

377369
// update
378370
let update_list = if let Some(update_list) = &item.update {
379-
// use update_plan to get exprs
380-
let update_plan = UpdatePlan {
381-
selection: None,
382-
subquery_desc: vec![],
383-
database: database.clone(),
384-
table: match target_alias {
385-
None => table_name.clone(),
386-
Some(alias) => alias.name.to_string().to_lowercase(),
387-
},
388-
update_list: update_list.clone(),
389-
bind_context: bind_context.clone(),
390-
metadata: self.plan.meta_data.clone(),
391-
catalog: catalog.clone(),
392-
};
393371
// we don't need real col_indices here, just give a
394372
// dummy index, that's ok.
395373
let col_indices = vec![DUMMY_COL_INDEX];
396-
let update_list: Vec<(FieldIndex, RemoteExpr<String>)> = update_plan
397-
.generate_update_list(
398-
self.ctx.clone(),
399-
fuse_table.schema().into(),
400-
col_indices,
401-
Some(PREDICATE_COLUMN_INDEX),
402-
target_alias.is_some(),
403-
)?;
374+
let (database, table) = match target_alias {
375+
None => (Some(database.as_str()), table_name.clone()),
376+
Some(alias) => (None, alias.name.to_string().to_lowercase()),
377+
};
378+
let update_list = plans::generate_update_list(
379+
self.ctx.clone(),
380+
bind_context,
381+
update_list,
382+
fuse_table.schema_with_stream().into(),
383+
col_indices,
384+
Some(PREDICATE_COLUMN_INDEX),
385+
database,
386+
&table,
387+
)?;
404388
let update_list = update_list
405389
.iter()
406390
.map(|(idx, remote_expr)| {
@@ -422,7 +406,7 @@ impl MergeIntoInterpreter {
422406
)
423407
})
424408
.collect_vec();
425-
//
409+
// update
426410
Some(update_list)
427411
} else {
428412
// delete
@@ -452,7 +436,7 @@ impl MergeIntoInterpreter {
452436

453437
let commit_input = if !distributed {
454438
// recv datablocks from matched upstream and unmatched upstream
455-
// transform and append dat
439+
// transform and append data
456440
PhysicalPlan::MergeInto(Box::new(MergeInto {
457441
input: Box::new(merge_into_source),
458442
table_info: table_info.clone(),

src/query/service/src/interpreters/interpreter_update.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,6 @@ impl UpdateInterpreter {
221221
self.ctx.clone(),
222222
tbl.schema_with_stream().into(),
223223
col_indices.clone(),
224-
None,
225-
false,
226224
)?;
227225

228226
let computed_list = self

src/query/service/src/pipelines/builders/builder_delete.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@ use databend_common_catalog::table::Table;
2020
use databend_common_catalog::table_context::TableContext;
2121
use databend_common_exception::Result;
2222
use databend_common_pipeline_sources::EmptySource;
23-
use databend_common_sql::evaluator::CompoundBlockOperator;
2423
use databend_common_sql::executor::physical_plans::DeleteSource;
2524
use databend_common_sql::executor::physical_plans::MutationKind;
26-
use databend_common_sql::gen_mutation_stream_operator;
25+
use databend_common_sql::StreamContext;
2726
use databend_common_storages_fuse::operations::MutationBlockPruningContext;
2827
use databend_common_storages_fuse::operations::TransformSerializeBlock;
2928
use databend_common_storages_fuse::FuseLazyPartInfo;
@@ -106,8 +105,8 @@ impl PipelineBuilder {
106105
&mut self.main_pipeline,
107106
)?;
108107
if table.change_tracking_enabled() {
109-
let func_ctx = self.ctx.get_function_context()?;
110-
let (stream, operators) = gen_mutation_stream_operator(
108+
let stream_ctx = StreamContext::try_create(
109+
self.ctx.get_function_context()?,
111110
table.schema_with_stream(),
112111
table.get_table_info().ident.seq,
113112
true,
@@ -117,11 +116,7 @@ impl PipelineBuilder {
117116
TransformAddStreamColumns::try_create(
118117
transform_input_port,
119118
transform_output_port,
120-
CompoundBlockOperator {
121-
operators: operators.clone(),
122-
ctx: func_ctx.clone(),
123-
},
124-
stream.clone(),
119+
stream_ctx.clone(),
125120
)
126121
})?;
127122
}

src/query/service/src/pipelines/builders/builder_merge_into.rs

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ impl PipelineBuilder {
190190
// start to append data
191191

192192
// 1.fill default columns
193-
let table_default_schema = &tbl.schema().remove_computed_fields();
193+
let table_default_schema = &tbl.schema_with_stream().remove_computed_fields();
194194
let mut builder = self.main_pipeline.add_transform_with_specified_len(
195195
|transform_input_port, transform_output_port| {
196196
TransformResortAddOnWithoutSourceSchema::try_create(
@@ -200,7 +200,7 @@ impl PipelineBuilder {
200200
Arc::new(DataSchema::from(table_default_schema)),
201201
unmatched.clone(),
202202
tbl.clone(),
203-
Arc::new(DataSchema::from(tbl.schema())),
203+
Arc::new(DataSchema::from(tbl.schema_with_stream())),
204204
)
205205
},
206206
1,
@@ -209,7 +209,7 @@ impl PipelineBuilder {
209209
self.main_pipeline.add_pipe(builder.finalize());
210210

211211
// 2.fill computed columns
212-
let table_computed_schema = &tbl.schema().remove_virtual_computed_fields();
212+
let table_computed_schema = &tbl.schema_with_stream().remove_virtual_computed_fields();
213213
let default_schema: DataSchemaRef = Arc::new(table_default_schema.into());
214214
let computed_schema: DataSchemaRef = Arc::new(table_computed_schema.into());
215215
if default_schema != computed_schema {
@@ -307,15 +307,6 @@ impl PipelineBuilder {
307307
table.get_block_thresholds(),
308308
None,
309309
)?;
310-
let block_builder = TransformSerializeBlock::try_create(
311-
self.ctx.clone(),
312-
InputPort::create(),
313-
OutputPort::create(),
314-
table,
315-
cluster_stats_gen.clone(),
316-
MutationKind::MergeInto,
317-
)?
318-
.get_block_builder();
319310
let max_threads = self.settings.get_max_threads()?;
320311
let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize));
321312
// MutationsLogs port0
@@ -324,7 +315,7 @@ impl PipelineBuilder {
324315
self.main_pipeline.add_pipe(Pipe::create(2, 2, vec![
325316
table.rowid_aggregate_mutator(
326317
self.ctx.clone(),
327-
block_builder,
318+
cluster_stats_gen,
328319
io_request_semaphore,
329320
segments.clone(),
330321
false, // we don't support for distributed mode.
@@ -429,17 +420,6 @@ impl PipelineBuilder {
429420
let cluster_stats_gen =
430421
table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds, None)?;
431422

432-
// this TransformSerializeBlock is just used to get block_builder
433-
let block_builder = TransformSerializeBlock::try_create(
434-
self.ctx.clone(),
435-
InputPort::create(),
436-
OutputPort::create(),
437-
table,
438-
cluster_stats_gen.clone(),
439-
MutationKind::MergeInto,
440-
)?
441-
.get_block_builder();
442-
443423
let serialize_segment_transform = TransformSerializeSegment::new(
444424
self.ctx.clone(),
445425
InputPort::create(),
@@ -492,7 +472,7 @@ impl PipelineBuilder {
492472
matched.clone(),
493473
field_index_of_input_schema.clone(),
494474
input.output_schema()?,
495-
Arc::new(DataSchema::from(tbl.schema())),
475+
Arc::new(DataSchema::from(tbl.schema_with_stream())),
496476
merge_into.target_build_optimization,
497477
*can_try_update_column_only,
498478
)?;
@@ -676,7 +656,7 @@ impl PipelineBuilder {
676656
};
677657

678658
// fill default columns
679-
let table_default_schema = &table.schema().remove_computed_fields();
659+
let table_default_schema = &table.schema_with_stream().remove_computed_fields();
680660
let mut builder = self.main_pipeline.add_transform_with_specified_len(
681661
|transform_input_port, transform_output_port| {
682662
TransformResortAddOnWithoutSourceSchema::try_create(
@@ -686,7 +666,7 @@ impl PipelineBuilder {
686666
Arc::new(DataSchema::from(table_default_schema)),
687667
unmatched.clone(),
688668
tbl.clone(),
689-
Arc::new(DataSchema::from(table.schema())),
669+
Arc::new(DataSchema::from(table.schema_with_stream())),
690670
)
691671
},
692672
fill_default_len,
@@ -713,7 +693,7 @@ impl PipelineBuilder {
713693
self.main_pipeline
714694
.add_pipe(add_builder_pipe(builder, distributed));
715695
// fill computed columns
716-
let table_computed_schema = &table.schema().remove_virtual_computed_fields();
696+
let table_computed_schema = &table.schema_with_stream().remove_virtual_computed_fields();
717697
let default_schema: DataSchemaRef = Arc::new(table_default_schema.into());
718698
let computed_schema: DataSchemaRef = Arc::new(table_computed_schema.into());
719699
if default_schema != computed_schema {
@@ -839,7 +819,7 @@ impl PipelineBuilder {
839819
} else {
840820
pipe_items.push(table.rowid_aggregate_mutator(
841821
self.ctx.clone(),
842-
block_builder,
822+
cluster_stats_gen.clone(),
843823
io_request_semaphore,
844824
segments.clone(),
845825
merge_into.target_build_optimization,

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use databend_common_sql::evaluator::CompoundBlockOperator;
3030
use databend_common_sql::executor::physical_plans::MutationKind;
3131
use databend_common_sql::executor::physical_plans::ReclusterSink;
3232
use databend_common_sql::executor::physical_plans::ReclusterSource;
33-
use databend_common_sql::gen_mutation_stream_operator;
33+
use databend_common_sql::StreamContext;
3434
use databend_common_storages_factory::Table;
3535
use databend_common_storages_fuse::operations::CommitSink;
3636
use databend_common_storages_fuse::operations::MutationGenerator;
@@ -100,19 +100,18 @@ impl PipelineBuilder {
100100

101101
let num_input_columns = schema.fields().len();
102102
if table.change_tracking_enabled() {
103-
let func_ctx = self.ctx.get_function_context()?;
104-
let (stream, operators) =
105-
gen_mutation_stream_operator(schema, table_info.ident.seq, false)?;
103+
let stream_ctx = StreamContext::try_create(
104+
self.ctx.get_function_context()?,
105+
schema,
106+
table_info.ident.seq,
107+
false,
108+
)?;
106109
self.main_pipeline.add_transform(
107110
|transform_input_port, transform_output_port| {
108111
TransformAddStreamColumns::try_create(
109112
transform_input_port,
110113
transform_output_port,
111-
CompoundBlockOperator {
112-
operators: operators.clone(),
113-
ctx: func_ctx.clone(),
114-
},
115-
stream.clone(),
114+
stream_ctx.clone(),
116115
)
117116
},
118117
)?;

0 commit comments

Comments
 (0)