Skip to content

Commit fe274f4

Browse files
authored
feat: support nextval as field default value. (#17670)
* feat: support nextval as field default value. * feat: support nextval as field default value.
1 parent dd86b4e commit fe274f4

File tree

30 files changed

+548
-249
lines changed

30 files changed

+548
-249
lines changed

src/query/catalog/src/plan/datasource/datasource_info/stage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
use databend_common_ast::ast::CopyIntoLocationOptions;
2121
use databend_common_ast::ast::CopyIntoTableOptions;
2222
use databend_common_exception::Result;
23-
use databend_common_expression::RemoteExpr;
23+
use databend_common_expression::RemoteDefaultExpr;
2424
use databend_common_expression::TableSchema;
2525
use databend_common_expression::TableSchemaRef;
2626
use databend_common_meta_app::principal::StageInfo;
@@ -31,7 +31,7 @@ use databend_common_storage::StageFilesInfo;
3131
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
3232
pub struct StageTableInfo {
3333
pub schema: TableSchemaRef,
34-
pub default_values: Option<Vec<RemoteExpr>>,
34+
pub default_exprs: Option<Vec<RemoteDefaultExpr>>,
3535
pub files_info: StageFilesInfo,
3636
pub stage_info: StageInfo,
3737
pub files_to_copy: Option<Vec<StageFileInfo>>,

src/query/expression/src/expression.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,3 +784,9 @@ impl<Index: ColumnIndex> RemoteExpr<Index> {
784784
}
785785
}
786786
}
787+
788+
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, EnumAsInner)]
789+
pub enum RemoteDefaultExpr {
790+
RemoteExpr(RemoteExpr),
791+
Sequence(String),
792+
}

src/query/service/src/interpreters/interpreter_copy_into_location.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl CopyIntoLocationInterpreter {
110110
files_to_copy: None,
111111
duplicated_files_detected: vec![],
112112
is_select: false,
113-
default_values: None,
113+
default_exprs: None,
114114
copy_into_location_options: options.clone(),
115115
copy_into_table_options: Default::default(),
116116
stage_root: "".to_string(),

src/query/service/src/interpreters/interpreter_table_add_column.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ use databend_common_meta_app::schema::TableInfo;
2828
use databend_common_meta_app::schema::TableMeta;
2929
use databend_common_meta_app::schema::UpdateTableMetaReq;
3030
use databend_common_meta_types::MatchSeq;
31-
use databend_common_sql::field_default_value;
3231
use databend_common_sql::plans::AddColumnOption;
3332
use databend_common_sql::plans::AddTableColumnPlan;
3433
use databend_common_sql::plans::Mutation;
3534
use databend_common_sql::plans::Plan;
35+
use databend_common_sql::DefaultExprBinder;
3636
use databend_common_sql::Planner;
3737
use databend_common_storages_fuse::FuseTable;
3838
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
@@ -105,7 +105,7 @@ impl Interpreter for AddTableColumnInterpreter {
105105
}
106106

107107
if field.default_expr().is_some() {
108-
let _ = field_default_value(self.ctx.clone(), &field)?;
108+
let _ = DefaultExprBinder::try_new(self.ctx.clone())?.get_scalar(&field)?;
109109
}
110110
is_valid_column(field.name())?;
111111
let index = match &self.plan.option {

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ use databend_common_meta_app::schema::TableStatistics;
4040
use databend_common_meta_types::MatchSeq;
4141
use databend_common_pipeline_core::always_callback;
4242
use databend_common_pipeline_core::ExecutionInfo;
43-
use databend_common_sql::field_default_value;
4443
use databend_common_sql::plans::CreateTablePlan;
44+
use databend_common_sql::DefaultExprBinder;
4545
use databend_common_storages_fuse::io::MetaReaders;
4646
use databend_common_storages_fuse::FuseStorageFormat;
4747
use databend_common_users::RoleCacheManager;
@@ -374,7 +374,7 @@ impl CreateTableInterpreter {
374374
let fields = self.plan.schema.fields().clone();
375375
for field in fields.iter() {
376376
if field.default_expr().is_some() {
377-
let _ = field_default_value(self.ctx.clone(), field)?;
377+
let _ = DefaultExprBinder::try_new(self.ctx.clone())?.get_scalar(field)?;
378378
}
379379
is_valid_column(field.name())?;
380380
}

src/query/service/src/interpreters/interpreter_table_modify_column.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ use databend_common_meta_types::MatchSeq;
4141
use databend_common_sql::executor::physical_plans::DistributedInsertSelect;
4242
use databend_common_sql::executor::PhysicalPlan;
4343
use databend_common_sql::executor::PhysicalPlanBuilder;
44-
use databend_common_sql::field_default_value;
4544
use databend_common_sql::plans::ModifyColumnAction;
4645
use databend_common_sql::plans::ModifyTableColumnPlan;
4746
use databend_common_sql::plans::Plan;
4847
use databend_common_sql::BloomIndexColumns;
48+
use databend_common_sql::DefaultExprBinder;
4949
use databend_common_sql::Planner;
5050
use databend_common_storages_fuse::FuseTable;
5151
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
@@ -150,6 +150,7 @@ impl ModifyTableColumnInterpreter {
150150
let schema = table.schema().as_ref().clone();
151151
let table_info = table.get_table_info();
152152
let mut new_schema = schema.clone();
153+
let mut default_expr_binder = DefaultExprBinder::try_new(self.ctx.clone())?;
153154
// first check default expr before lock table
154155
for (field, _comment) in field_and_comments {
155156
if let Some((i, old_field)) = schema.column_with_name(&field.name) {
@@ -167,7 +168,7 @@ impl ModifyTableColumnInterpreter {
167168
if let Some(default_expr) = &field.default_expr {
168169
let default_expr = default_expr.to_string();
169170
new_schema.fields[i].default_expr = Some(default_expr);
170-
let _ = field_default_value(self.ctx.clone(), &new_schema.fields[i])?;
171+
let _ = default_expr_binder.get_scalar(&new_schema.fields[i])?;
171172
} else {
172173
new_schema.fields[i].default_expr = None;
173174
}

src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,20 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::sync::Arc;
1617

1718
use databend_common_catalog::table::Table;
19+
use databend_common_catalog::table_context::TableContext;
1820
use databend_common_exception::Result;
1921
use databend_common_expression::DataSchemaRef;
2022
use databend_common_pipeline_core::Pipeline;
2123
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
24+
use databend_common_sql::DefaultExprBinder;
2225

2326
use crate::pipelines::processors::transforms::TransformAddComputedColumns;
27+
use crate::pipelines::processors::transforms::TransformAsyncFunction;
28+
use crate::pipelines::processors::TransformCastSchema;
2429
use crate::pipelines::processors::TransformResortAddOn;
2530
use crate::pipelines::PipelineBuilder;
2631
use crate::sessions::QueryContext;
@@ -42,14 +47,46 @@ impl PipelineBuilder {
4247

4348
// Fill missing default columns and resort the columns.
4449
if source_schema != default_schema {
45-
pipeline.try_add_transformer(|| {
46-
TransformResortAddOn::try_new(
47-
ctx.clone(),
48-
source_schema.clone(),
49-
default_schema.clone(),
50-
table.clone(),
51-
)
52-
})?;
50+
let mut default_expr_binder = DefaultExprBinder::try_new(ctx.clone())?;
51+
if let Some((async_funcs, new_default_schema, new_default_schema_no_cast)) =
52+
default_expr_binder
53+
.split_async_default_exprs(source_schema.clone(), default_schema.clone())?
54+
{
55+
pipeline.try_add_async_transformer(|| {
56+
Ok(TransformAsyncFunction::new(
57+
ctx.clone(),
58+
async_funcs.clone(),
59+
BTreeMap::new(),
60+
))
61+
})?;
62+
if new_default_schema != new_default_schema_no_cast {
63+
pipeline.try_add_transformer(|| {
64+
TransformCastSchema::try_new(
65+
new_default_schema_no_cast.clone(),
66+
new_default_schema.clone(),
67+
ctx.get_function_context().unwrap(),
68+
)
69+
})?;
70+
}
71+
72+
pipeline.try_add_transformer(|| {
73+
TransformResortAddOn::try_new(
74+
ctx.clone(),
75+
new_default_schema.clone(),
76+
default_schema.clone(),
77+
table.clone(),
78+
)
79+
})?;
80+
} else {
81+
pipeline.try_add_transformer(|| {
82+
TransformResortAddOn::try_new(
83+
ctx.clone(),
84+
source_schema.clone(),
85+
default_schema.clone(),
86+
table.clone(),
87+
)
88+
})?;
89+
}
5390
}
5491

5592
// Fill computed columns.

src/query/service/src/sessions/query_ctx.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1632,7 +1632,7 @@ impl TableContext for QueryContext {
16321632
files_to_copy,
16331633
duplicated_files_detected: vec![],
16341634
is_select: true,
1635-
default_values: None,
1635+
default_exprs: None,
16361636
copy_into_location_options: Default::default(),
16371637
copy_into_table_options: Default::default(),
16381638
stage_root,
@@ -1651,7 +1651,7 @@ impl TableContext for QueryContext {
16511651
files_to_copy,
16521652
duplicated_files_detected: vec![],
16531653
is_select: true,
1654-
default_values: None,
1654+
default_exprs: None,
16551655
copy_into_location_options: Default::default(),
16561656
copy_into_table_options: Default::default(),
16571657
stage_root,
@@ -1688,7 +1688,7 @@ impl TableContext for QueryContext {
16881688
files_to_copy,
16891689
duplicated_files_detected: vec![],
16901690
is_select: true,
1691-
default_values: None,
1691+
default_exprs: None,
16921692
copy_into_location_options: Default::default(),
16931693
copy_into_table_options: Default::default(),
16941694
stage_root,

src/query/sql/src/planner/binder/copy_into_table.rs

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ use databend_common_expression::shrink_scalar;
4949
use databend_common_expression::types::DataType;
5050
use databend_common_expression::DataSchema;
5151
use databend_common_expression::DataSchemaRef;
52-
use databend_common_expression::RemoteExpr;
5352
use databend_common_expression::Scalar;
5453
use databend_common_meta_app::principal::EmptyFieldAs;
5554
use databend_common_meta_app::principal::FileFormatOptionsReader;
@@ -73,9 +72,9 @@ use crate::plans::CopyIntoTablePlan;
7372
use crate::plans::Plan;
7473
use crate::plans::ValidationMode;
7574
use crate::BindContext;
75+
use crate::DefaultExprBinder;
7676
use crate::Metadata;
7777
use crate::NameResolutionContext;
78-
use crate::ScalarBinder;
7978

8079
impl Binder {
8180
#[async_backtrace::framed]
@@ -87,7 +86,7 @@ impl Binder {
8786
match &stmt.src {
8887
CopyIntoTableSource::Location(location) => {
8988
let mut plan = self
90-
.bind_copy_into_table_common(bind_context, stmt, location, false)
89+
.bind_copy_into_table_common(stmt, location, false)
9190
.await?;
9291

9392
// for copy from location, collect files explicitly
@@ -105,7 +104,7 @@ impl Binder {
105104
.set_max_column_position(max_column_position.max_pos);
106105
let (select_list, location, alias) = check_transform_query(query)?;
107106
let plan = self
108-
.bind_copy_into_table_common(bind_context, stmt, location, true)
107+
.bind_copy_into_table_common(stmt, location, true)
109108
.await?;
110109

111110
self.bind_copy_from_query_into_table(bind_context, plan, select_list, alias)
@@ -136,7 +135,6 @@ impl Binder {
136135

137136
async fn bind_copy_into_table_common(
138137
&mut self,
139-
bind_context: &mut BindContext,
140138
stmt: &CopyIntoTableStmt,
141139
location: &FileLocation,
142140
is_transform: bool,
@@ -198,8 +196,8 @@ impl Binder {
198196

199197
let default_values = if stage_info.file_format_params.need_field_default() {
200198
Some(
201-
self.prepare_default_values(bind_context, &required_values_schema)
202-
.await?,
199+
DefaultExprBinder::try_new(self.ctx.clone())?
200+
.prepare_default_values(&required_values_schema)?,
203201
)
204202
} else {
205203
None
@@ -222,7 +220,7 @@ impl Binder {
222220
files_to_copy: None,
223221
duplicated_files_detected: vec![],
224222
is_select: false,
225-
default_values,
223+
default_exprs: default_values,
226224
copy_into_location_options: Default::default(),
227225
copy_into_table_options: stmt.options.clone(),
228226
stage_root: "".to_string(),
@@ -381,9 +379,8 @@ impl Binder {
381379

382380
let stage_schema = infer_table_schema(&data_schema)?;
383381

384-
let default_values = self
385-
.prepare_default_values(bind_context, &data_schema)
386-
.await?;
382+
let default_values = DefaultExprBinder::try_new(self.ctx.clone())?
383+
.prepare_default_values(&required_values_schema)?;
387384

388385
let plan = CopyIntoTablePlan {
389386
catalog_info,
@@ -403,7 +400,7 @@ impl Binder {
403400
files_to_copy: Some(files_to_copy),
404401
duplicated_files_detected,
405402
is_select: false,
406-
default_values: Some(default_values),
403+
default_exprs: Some(default_values),
407404
copy_into_location_options: Default::default(),
408405
copy_into_table_options: options,
409406
stage_root: "".to_string(),
@@ -573,26 +570,6 @@ impl Binder {
573570
.await?;
574571
Ok((Arc::new(DataSchema::new(attachment_fields)), const_values))
575572
}
576-
577-
async fn prepare_default_values(
578-
&mut self,
579-
bind_context: &mut BindContext,
580-
data_schema: &DataSchemaRef,
581-
) -> Result<Vec<RemoteExpr>> {
582-
let mut scalar_binder = ScalarBinder::new(
583-
bind_context,
584-
self.ctx.clone(),
585-
&self.name_resolution_ctx,
586-
self.metadata.clone(),
587-
&[],
588-
);
589-
let mut values = Vec::with_capacity(data_schema.fields.len());
590-
for field in &data_schema.fields {
591-
let expr = scalar_binder.get_default_value(field, data_schema).await?;
592-
values.push(expr.as_remote_expr());
593-
}
594-
Ok(values)
595-
}
596573
}
597574

598575
// we can avoid this by specializing the parser.

src/query/sql/src/planner/binder/ddl/table.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ use crate::binder::ColumnBindingBuilder;
106106
use crate::binder::Visibility;
107107
use crate::optimizer::SExpr;
108108
use crate::parse_computed_expr_to_string;
109-
use crate::parse_default_expr_to_string;
110109
use crate::planner::semantic::normalize_identifier;
111110
use crate::planner::semantic::resolve_type_name;
112111
use crate::planner::semantic::IdentifierNormalizer;
@@ -144,6 +143,7 @@ use crate::plans::VacuumTableOption;
144143
use crate::plans::VacuumTablePlan;
145144
use crate::plans::VacuumTemporaryFilesPlan;
146145
use crate::BindContext;
146+
use crate::DefaultExprBinder;
147147
use crate::Planner;
148148
use crate::SelectBuilder;
149149

@@ -1374,8 +1374,9 @@ impl Binder {
13741374
if let Some(expr) = &column.expr {
13751375
match expr {
13761376
ColumnExpr::Default(default_expr) => {
1377+
let mut default_expr_binder = DefaultExprBinder::try_new(self.ctx.clone())?;
13771378
let (expr, expr_is_deterministic) =
1378-
parse_default_expr_to_string(self.ctx.clone(), &field, default_expr)?;
1379+
default_expr_binder.parse_default_expr_to_string(&field, default_expr)?;
13791380
field = field.with_default_expr(Some(expr));
13801381
is_deterministic = expr_is_deterministic;
13811382
}
@@ -1413,6 +1414,7 @@ impl Binder {
14131414
let mut fields = Vec::with_capacity(columns.len());
14141415
let mut fields_comments = Vec::with_capacity(columns.len());
14151416
let not_null = self.is_column_not_null();
1417+
let mut default_expr_binder = DefaultExprBinder::try_new(self.ctx.clone())?;
14161418
for column in columns.iter() {
14171419
let name = normalize_identifier(&column.name, &self.name_resolution_ctx).name;
14181420
let schema_data_type = resolve_type_name(&column.data_type, not_null)?;
@@ -1421,8 +1423,8 @@ impl Binder {
14211423
if let Some(expr) = &column.expr {
14221424
match expr {
14231425
ColumnExpr::Default(default_expr) => {
1424-
let (expr, _) =
1425-
parse_default_expr_to_string(self.ctx.clone(), &field, default_expr)?;
1426+
let (expr, _) = default_expr_binder
1427+
.parse_default_expr_to_string(&field, default_expr)?;
14261428
field = field.with_default_expr(Some(expr));
14271429
}
14281430
_ => has_computed = true,

0 commit comments

Comments
 (0)