Skip to content

Commit 84851c2

Browse files
committed
update
1 parent c85485d commit 84851c2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+356
-484
lines changed

src/query/catalog/src/table.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use databend_common_meta_types::MetaId;
3737
use databend_common_pipeline_core::Pipeline;
3838
use databend_common_storage::Histogram;
3939
use databend_common_storage::StorageMetrics;
40-
use databend_storages_common_table_meta::meta::SnapshotId;
4140
use databend_storages_common_table_meta::meta::TableSnapshot;
4241
use databend_storages_common_table_meta::table::ChangeType;
4342
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
@@ -236,7 +235,7 @@ pub trait Table: Sync + Send {
236235
copied_files: Option<UpsertTableCopiedFileReq>,
237236
update_stream_meta: Vec<UpdateStreamMetaReq>,
238237
overwrite: bool,
239-
prev_snapshot_id: Option<SnapshotId>,
238+
forbid_occ_retry: bool,
240239
_deduplicated_label: Option<String>,
241240
) -> Result<()> {
242241
let (_, _, _, _, _, _) = (
@@ -245,7 +244,7 @@ pub trait Table: Sync + Send {
245244
update_stream_meta,
246245
pipeline,
247246
overwrite,
248-
prev_snapshot_id,
247+
forbid_occ_retry,
249248
);
250249

251250
Ok(())

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1164,7 +1164,7 @@ impl AccessChecker for PrivilegeAccess {
11641164
self.validate_access(&GrantObject::Global, UserPrivilegeType::Alter, false, false)
11651165
.await?;
11661166
}
1167-
Plan::CopyIntoTable { .. } => {
1167+
Plan::Append { .. } => {
11681168
// match &plan.source{
11691169

11701170
// }

src/query/service/src/interpreters/interpreter_copy_into_table.rs renamed to src/query/service/src/interpreters/interpreter_append.rs

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::sync::Arc;
1616

1717
use databend_common_catalog::lock::LockTableOption;
1818
use databend_common_catalog::plan::StageTableInfo;
19+
use databend_common_catalog::table::TableExt;
1920
use databend_common_exception::Result;
2021
use databend_common_expression::types::Int32Type;
2122
use databend_common_expression::types::StringType;
@@ -25,6 +26,7 @@ use databend_common_expression::SendableDataBlockStream;
2526
use databend_common_sql::executor::physical_plans::MutationKind;
2627
use databend_common_sql::executor::PhysicalPlanBuilder;
2728
use databend_common_sql::optimizer::SExpr;
29+
use databend_common_sql::plans::AppendType;
2830
use log::debug;
2931
use log::info;
3032

@@ -37,22 +39,23 @@ use crate::pipelines::PipelineBuilder;
3739
use crate::schedulers::build_query_pipeline_without_render_result_set;
3840
use crate::sessions::QueryContext;
3941
use crate::sessions::TableContext;
40-
use crate::sql::plans::CopyIntoTablePlan;
42+
use crate::sql::plans::Append;
4143
use crate::sql::MetadataRef;
4244
use crate::stream::DataBlockStream;
4345

44-
pub struct CopyIntoTableInterpreter {
46+
pub struct AppendInterpreter {
4547
ctx: Arc<QueryContext>,
4648
s_expr: SExpr,
4749
metadata: MetadataRef,
4850
stage_table_info: Option<Box<StageTableInfo>>,
4951
overwrite: bool,
52+
col_type_modified: bool,
5053
}
5154

5255
#[async_trait::async_trait]
53-
impl Interpreter for CopyIntoTableInterpreter {
56+
impl Interpreter for AppendInterpreter {
5457
fn name(&self) -> &str {
55-
"CopyIntoTableInterpreterV2"
58+
"AppendInterpreter"
5659
}
5760

5861
fn is_ddl(&self) -> bool {
@@ -67,7 +70,21 @@ impl Interpreter for CopyIntoTableInterpreter {
6770
return Ok(PipelineBuildResult::create());
6871
}
6972

70-
// build source and append pipeline
73+
let copy_into_table: Append = self.s_expr.plan().clone().try_into()?;
74+
let (target_table, catalog, database, table) = {
75+
let metadata = self.metadata.read();
76+
let t = metadata.table(copy_into_table.table_index);
77+
(
78+
t.table(),
79+
t.catalog().to_string(),
80+
t.database().to_string(),
81+
t.name().to_string(),
82+
)
83+
};
84+
85+
target_table.check_mutable()?;
86+
87+
// 1. build source and append pipeline
7188
let mut build_res = {
7289
let mut physical_plan_builder =
7390
PhysicalPlanBuilder::new(self.metadata.clone(), self.ctx.clone(), false);
@@ -77,16 +94,7 @@ impl Interpreter for CopyIntoTableInterpreter {
7794
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?
7895
};
7996

80-
// build commit pipeline
81-
let copy_into_table: CopyIntoTablePlan = self.s_expr.plan().clone().try_into()?;
82-
let target_table = self
83-
.ctx
84-
.get_table(
85-
&copy_into_table.catalog_name,
86-
&copy_into_table.database_name,
87-
&copy_into_table.table_name,
88-
)
89-
.await?;
97+
// 2. build commit pipeline
9098
let copied_files_meta_req = match &self.stage_table_info {
9199
Some(stage_table_info) => PipelineBuilder::build_upsert_copied_files_to_meta_req(
92100
self.ctx.clone(),
@@ -107,11 +115,11 @@ impl Interpreter for CopyIntoTableInterpreter {
107115
copied_files_meta_req,
108116
update_stream_meta,
109117
self.overwrite,
110-
None,
118+
self.col_type_modified,
111119
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
112120
)?;
113121

114-
// Purge files on pipeline finished.
122+
// 3. Purge files on pipeline finished.
115123
if let Some(stage_table_info) = &self.stage_table_info {
116124
let files_to_copy = stage_table_info
117125
.files_to_copy
@@ -137,14 +145,13 @@ impl Interpreter for CopyIntoTableInterpreter {
137145
)?;
138146
}
139147

140-
// Execute hook.
148+
// 4. Execute hook.
141149
{
142-
let copy_into_table: CopyIntoTablePlan = self.s_expr.plan().clone().try_into()?;
143150
let hook_operator = HookOperator::create(
144151
self.ctx.clone(),
145-
copy_into_table.catalog_name.to_string(),
146-
copy_into_table.database_name.to_string(),
147-
copy_into_table.table_name.to_string(),
152+
catalog,
153+
database,
154+
table,
148155
MutationKind::Insert,
149156
LockTableOption::LockNoRetry,
150157
);
@@ -155,33 +162,33 @@ impl Interpreter for CopyIntoTableInterpreter {
155162
}
156163

157164
fn inject_result(&self) -> Result<SendableDataBlockStream> {
158-
let copy_into_table: CopyIntoTablePlan = self.s_expr.plan().clone().try_into()?;
159-
match &copy_into_table.mutation_kind {
160-
MutationKind::CopyInto => {
165+
let copy_into_table: Append = self.s_expr.plan().clone().try_into()?;
166+
match &copy_into_table.append_type {
167+
AppendType::CopyInto => {
161168
let blocks = self.get_copy_into_table_result()?;
162169
Ok(Box::pin(DataBlockStream::create(None, blocks)))
163170
}
164-
MutationKind::Insert => Ok(Box::pin(DataBlockStream::create(None, vec![]))),
165-
_ => unreachable!(),
171+
AppendType::Insert => Ok(Box::pin(DataBlockStream::create(None, vec![]))),
166172
}
167173
}
168174
}
169175

170-
impl CopyIntoTableInterpreter {
171-
/// Create a CopyInterpreter with context and [`CopyIntoTablePlan`].
176+
impl AppendInterpreter {
172177
pub fn try_create(
173178
ctx: Arc<QueryContext>,
174179
s_expr: SExpr,
175180
metadata: MetadataRef,
176181
stage_table_info: Option<Box<StageTableInfo>>,
177182
overwrite: bool,
183+
col_type_modified: bool,
178184
) -> Result<Self> {
179-
Ok(CopyIntoTableInterpreter {
185+
Ok(AppendInterpreter {
180186
ctx,
181187
s_expr,
182188
metadata,
183189
stage_table_info,
184190
overwrite,
191+
col_type_modified,
185192
})
186193
}
187194

src/query/service/src/interpreters/interpreter_factory.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ use super::interpreter_table_set_options::SetOptionsInterpreter;
3636
use super::interpreter_user_stage_drop::DropUserStageInterpreter;
3737
use super::*;
3838
use crate::interpreters::access::Accessor;
39+
use crate::interpreters::interpreter_append::AppendInterpreter;
3940
use crate::interpreters::interpreter_catalog_drop::DropCatalogInterpreter;
4041
use crate::interpreters::interpreter_connection_create::CreateConnectionInterpreter;
4142
use crate::interpreters::interpreter_connection_desc::DescConnectionInterpreter;
4243
use crate::interpreters::interpreter_connection_drop::DropConnectionInterpreter;
4344
use crate::interpreters::interpreter_connection_show::ShowConnectionsInterpreter;
4445
use crate::interpreters::interpreter_copy_into_location::CopyIntoLocationInterpreter;
45-
use crate::interpreters::interpreter_copy_into_table::CopyIntoTableInterpreter;
4646
use crate::interpreters::interpreter_file_format_create::CreateFileFormatInterpreter;
4747
use crate::interpreters::interpreter_file_format_drop::DropFileFormatInterpreter;
4848
use crate::interpreters::interpreter_file_format_show::ShowFileFormatsInterpreter;
@@ -156,17 +156,19 @@ impl InterpreterFactory {
156156
*graphical,
157157
)?)),
158158

159-
Plan::CopyIntoTable {
159+
Plan::Append {
160160
s_expr,
161161
metadata,
162162
stage_table_info,
163163
overwrite,
164-
} => Ok(Arc::new(CopyIntoTableInterpreter::try_create(
164+
forbid_occ_retry: col_type_modified,
165+
} => Ok(Arc::new(AppendInterpreter::try_create(
165166
ctx,
166167
*s_expr.clone(),
167168
metadata.clone(),
168169
stage_table_info.clone(),
169170
*overwrite,
171+
*col_type_modified,
170172
)?)),
171173
Plan::CopyIntoLocation(copy_plan) => Ok(Arc::new(
172174
CopyIntoLocationInterpreter::try_create(ctx, copy_plan.clone())?,

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,8 @@ use databend_common_meta_app::schema::TableNameIdent;
3838
use databend_common_meta_app::schema::TableStatistics;
3939
use databend_common_meta_types::MatchSeq;
4040
use databend_common_pipeline_core::ExecutionInfo;
41-
use databend_common_sql::executor::physical_plans::MutationKind;
4241
use databend_common_sql::field_default_value;
43-
use databend_common_sql::optimizer::SExpr;
44-
use databend_common_sql::plans::CopyIntoTablePlan;
42+
use databend_common_sql::plans::create_append_plan_from_subquery;
4543
use databend_common_sql::plans::CreateTablePlan;
4644
use databend_common_storages_fuse::io::MetaReaders;
4745
use databend_common_storages_fuse::FuseStorageFormat;
@@ -66,8 +64,8 @@ use crate::interpreters::common::table_option_validation::is_valid_create_opt;
6664
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
6765
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
6866
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
69-
use crate::interpreters::interpreter_copy_into_table::CopyIntoTableInterpreter;
7067
use crate::interpreters::Interpreter;
68+
use crate::interpreters::InterpreterFactory;
7169
use crate::pipelines::PipelineBuildResult;
7270
use crate::sessions::QueryContext;
7371
use crate::sessions::TableContext;
@@ -219,44 +217,28 @@ impl CreateTableInterpreter {
219217
// For the situation above, we implicitly cast the data type when inserting data.
220218
// The casting and schema checking is in interpreter_insert.rs, function check_schema_cast.
221219

222-
let _table_info = TableInfo::new(
220+
let table_info = TableInfo::new(
223221
&self.plan.database,
224222
&self.plan.table,
225223
TableIdent::new(table_id, table_id_seq),
226224
table_meta,
227225
);
228-
229-
let (project_columns, source, metadata) = match select_plan.as_ref() {
230-
Plan::Query {
231-
bind_context,
232-
s_expr,
233-
metadata,
234-
..
235-
} => (
236-
Some(bind_context.columns.clone()),
237-
*s_expr.clone(),
238-
metadata.clone(),
239-
),
240-
_ => unreachable!(),
241-
};
242-
243-
let insert_plan = CopyIntoTablePlan {
244-
catalog_name: self.plan.catalog.clone(),
245-
database_name: self.plan.database.clone(),
246-
table_name: self.plan.table.clone(),
247-
required_values_schema: Arc::new(self.plan.schema.clone().into()),
248-
values_consts: vec![],
249-
required_source_schema: Arc::new(self.plan.schema.clone().into()),
250-
project_columns,
251-
mutation_kind: MutationKind::Insert,
252-
};
253-
254-
let s_expr = SExpr::create_unary(Arc::new(insert_plan.into()), Arc::new(source));
255-
256-
let mut pipeline =
257-
CopyIntoTableInterpreter::try_create(self.ctx.clone(), s_expr, metadata, None, false)?
258-
.execute2()
259-
.await?;
226+
let table = self.ctx.build_table_by_table_info(&table_info, None)?;
227+
228+
let append_plan = create_append_plan_from_subquery(
229+
&select_plan,
230+
self.plan.catalog.clone(),
231+
self.plan.database.clone(),
232+
table,
233+
Arc::new(self.plan.schema.clone().into()),
234+
false,
235+
self.ctx.clone(),
236+
)
237+
.await?;
238+
let mut pipeline = InterpreterFactory::get(self.ctx.clone(), &append_plan)
239+
.await?
240+
.execute2()
241+
.await?;
260242

261243
let db_name = self.plan.database.clone();
262244
let table_name = self.plan.table.clone();

0 commit comments

Comments
 (0)