Skip to content

Commit 4112e06

Browse files
authored
feat: add create or replace virtual column support (#14649)
1 parent ec91213 commit 4112e06

File tree

11 files changed

+291
-60
lines changed

11 files changed

+291
-60
lines changed

src/meta/api/src/schema_api_impl.rs

Lines changed: 83 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,28 +1246,44 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
12461246
) -> Result<CreateVirtualColumnReply, KVAppError> {
12471247
debug!(req = as_debug!(&req); "SchemaApi: {}", func_name!());
12481248

1249+
let ctx = func_name!();
12491250
let mut trials = txn_backoff(None, func_name!());
12501251
loop {
12511252
trials.next().unwrap()?.await;
12521253

12531254
let (_, old_virtual_column_opt): (_, Option<VirtualColumnMeta>) =
12541255
get_pb_value(self, &req.name_ident).await?;
12551256

1256-
if old_virtual_column_opt.is_some() {
1257-
if req.if_not_exists {
1258-
return Ok(CreateVirtualColumnReply {});
1259-
} else {
1260-
return Err(KVAppError::AppError(AppError::VirtualColumnAlreadyExists(
1261-
VirtualColumnAlreadyExists::new(
1262-
req.name_ident.table_id,
1263-
format!(
1264-
"create virtual column with tenant: {} table_id: {}",
1265-
req.name_ident.tenant, req.name_ident.table_id
1257+
let mut if_then = vec![];
1258+
let seq = if old_virtual_column_opt.is_some() {
1259+
if let CreateOption::CreateIfNotExists(if_not_exists) = req.create_option {
1260+
if if_not_exists {
1261+
return Ok(CreateVirtualColumnReply {});
1262+
} else {
1263+
return Err(KVAppError::AppError(AppError::VirtualColumnAlreadyExists(
1264+
VirtualColumnAlreadyExists::new(
1265+
req.name_ident.table_id,
1266+
format!(
1267+
"create virtual column with tenant: {} table_id: {}",
1268+
req.name_ident.tenant, req.name_ident.table_id
1269+
),
12661270
),
1267-
),
1268-
)));
1271+
)));
1272+
}
1273+
} else {
1274+
construct_drop_virtual_column_txn_operations(
1275+
self,
1276+
&req.name_ident,
1277+
false,
1278+
false,
1279+
ctx,
1280+
&mut if_then,
1281+
)
1282+
.await?
12691283
}
1270-
}
1284+
} else {
1285+
0
1286+
};
12711287
let virtual_column_meta = VirtualColumnMeta {
12721288
table_id: req.name_ident.table_id,
12731289
virtual_columns: req.virtual_columns.clone(),
@@ -1278,11 +1294,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
12781294
// Create virtual column by inserting this record:
12791295
// (tenant, table_id) -> virtual_column_meta
12801296
{
1281-
let condition = vec![txn_cond_seq(&req.name_ident, Eq, 0)];
1282-
let if_then = vec![txn_op_put(
1297+
let condition = vec![txn_cond_seq(&req.name_ident, Eq, seq)];
1298+
if_then.push(txn_op_put(
12831299
&req.name_ident,
12841300
serialize_struct(&virtual_column_meta)?,
1285-
)];
1301+
));
12861302

12871303
let txn_req = TxnRequest {
12881304
condition,
@@ -1384,35 +1400,27 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
13841400
loop {
13851401
trials.next().unwrap()?.await;
13861402

1387-
if let Err(err) = get_virtual_column_by_id_or_err(self, &req.name_ident, ctx).await {
1388-
if req.if_exists {
1389-
return Ok(DropVirtualColumnReply {});
1390-
} else {
1391-
return Err(err);
1392-
}
1393-
}
1394-
1395-
// Drop virtual column by deleting this record:
1396-
// (tenant, table_id) -> virtual_column_meta
1397-
{
1398-
let if_then = vec![txn_op_del(&req.name_ident)];
1399-
let txn_req = TxnRequest {
1400-
condition: vec![],
1401-
if_then,
1402-
else_then: vec![],
1403-
};
1403+
let mut if_then = vec![];
1404+
construct_drop_virtual_column_txn_operations(
1405+
self,
1406+
&req.name_ident,
1407+
req.if_exists,
1408+
true,
1409+
ctx,
1410+
&mut if_then,
1411+
)
1412+
.await?;
14041413

1405-
let (succ, _responses) = send_txn(self, txn_req).await?;
1414+
let txn_req = TxnRequest {
1415+
condition: vec![],
1416+
if_then,
1417+
else_then: vec![],
1418+
};
14061419

1407-
debug!(
1408-
"req.name_ident" = as_debug!(&req.name_ident),
1409-
succ = succ;
1410-
"drop_virtual_column"
1411-
);
1420+
let (succ, _responses) = send_txn(self, txn_req).await?;
14121421

1413-
if succ {
1414-
break;
1415-
}
1422+
if succ {
1423+
break;
14161424
}
14171425
}
14181426

@@ -3782,6 +3790,39 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
37823790
"SchemaApiImpl".to_string()
37833791
}
37843792
}
3793+
async fn construct_drop_virtual_column_txn_operations(
3794+
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
3795+
name_ident: &VirtualColumnNameIdent,
3796+
drop_if_exists: bool,
3797+
if_delete: bool,
3798+
ctx: &str,
3799+
if_then: &mut Vec<TxnOp>,
3800+
) -> Result<u64, KVAppError> {
3801+
let res = get_virtual_column_by_id_or_err(kv_api, name_ident, ctx).await;
3802+
let seq = if let Err(err) = res {
3803+
if drop_if_exists {
3804+
return Ok(0);
3805+
} else {
3806+
return Err(err);
3807+
}
3808+
} else {
3809+
res.unwrap().0
3810+
};
3811+
3812+
// Drop virtual column by deleting this record:
3813+
// (tenant, table_id) -> virtual_column_meta
3814+
if if_delete {
3815+
if_then.push(txn_op_del(name_ident));
3816+
}
3817+
3818+
debug!(
3819+
"name_ident" = as_debug!(&name_ident),
3820+
ctx = ctx;
3821+
"construct_drop_virtual_column_txn_operations"
3822+
);
3823+
3824+
Ok(seq)
3825+
}
37853826

37863827
async fn construct_drop_index_txn_operations(
37873828
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5970,7 +5970,7 @@ impl SchemaApiTestSuite {
59705970
{
59715971
info!("--- create virtual column");
59725972
let req = CreateVirtualColumnReq {
5973-
if_not_exists: false,
5973+
create_option: CreateOption::CreateIfNotExists(false),
59745974
name_ident: name_ident.clone(),
59755975
virtual_columns: vec!["variant:k1".to_string(), "variant[1]".to_string()],
59765976
};
@@ -5979,7 +5979,7 @@ impl SchemaApiTestSuite {
59795979

59805980
info!("--- create virtual column again");
59815981
let req = CreateVirtualColumnReq {
5982-
if_not_exists: false,
5982+
create_option: CreateOption::CreateIfNotExists(false),
59835983
name_ident: name_ident.clone(),
59845984
virtual_columns: vec!["variant:k1".to_string(), "variant[1]".to_string()],
59855985
};
@@ -6070,6 +6070,46 @@ impl SchemaApiTestSuite {
60706070
assert!(res.is_err());
60716071
}
60726072

6073+
{
6074+
info!("--- create or replace virtual column");
6075+
let req = CreateVirtualColumnReq {
6076+
create_option: CreateOption::CreateIfNotExists(false),
6077+
name_ident: name_ident.clone(),
6078+
virtual_columns: vec!["variant:k1".to_string(), "variant[1]".to_string()],
6079+
};
6080+
6081+
let _res = mt.create_virtual_column(req.clone()).await?;
6082+
6083+
let req = ListVirtualColumnsReq {
6084+
tenant: tenant.to_string(),
6085+
table_id: Some(table_id),
6086+
};
6087+
6088+
let res = mt.list_virtual_columns(req).await?;
6089+
assert_eq!(1, res.len());
6090+
assert_eq!(res[0].virtual_columns, vec![
6091+
"variant:k1".to_string(),
6092+
"variant[1]".to_string(),
6093+
]);
6094+
6095+
let req = CreateVirtualColumnReq {
6096+
create_option: CreateOption::CreateOrReplace,
6097+
name_ident: name_ident.clone(),
6098+
virtual_columns: vec!["variant:k2".to_string()],
6099+
};
6100+
6101+
let _res = mt.create_virtual_column(req.clone()).await?;
6102+
6103+
let req = ListVirtualColumnsReq {
6104+
tenant: tenant.to_string(),
6105+
table_id: Some(table_id),
6106+
};
6107+
6108+
let res = mt.list_virtual_columns(req).await?;
6109+
assert_eq!(1, res.len());
6110+
assert_eq!(res[0].virtual_columns, vec!["variant:k2".to_string(),]);
6111+
}
6112+
60736113
Ok(())
60746114
}
60756115

src/meta/app/src/schema/virtual_column.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use chrono::DateTime;
2020
use chrono::Utc;
2121
use databend_common_meta_types::MetaId;
2222

23+
use super::CreateOption;
24+
2325
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)]
2426
pub struct VirtualColumnNameIdent {
2527
pub tenant: String,
@@ -56,7 +58,7 @@ pub struct VirtualColumnMeta {
5658

5759
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
5860
pub struct CreateVirtualColumnReq {
59-
pub if_not_exists: bool,
61+
pub create_option: CreateOption,
6062
pub name_ident: VirtualColumnNameIdent,
6163
pub virtual_columns: Vec<String>,
6264
}

src/query/ast/src/ast/statements/virtual_column.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::fmt::Display;
1616
use std::fmt::Formatter;
1717

18+
use databend_common_meta_app::schema::CreateOption;
19+
1820
use crate::ast::write_comma_separated_list;
1921
use crate::ast::write_dot_separated_list;
2022
use crate::ast::Expr;
@@ -23,7 +25,7 @@ use crate::ast::ShowLimit;
2325

2426
#[derive(Debug, Clone, PartialEq)]
2527
pub struct CreateVirtualColumnStmt {
26-
pub if_not_exists: bool,
28+
pub create_option: CreateOption,
2729
pub catalog: Option<Identifier>,
2830
pub database: Option<Identifier>,
2931
pub table: Identifier,
@@ -33,9 +35,15 @@ pub struct CreateVirtualColumnStmt {
3335

3436
impl Display for CreateVirtualColumnStmt {
3537
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36-
write!(f, "CREATE VIRTUAL COLUMN ")?;
37-
if self.if_not_exists {
38-
write!(f, "IF NOT EXISTS ")?;
38+
write!(f, "CREATE ")?;
39+
if let CreateOption::CreateOrReplace = self.create_option {
40+
write!(f, "OR REPLACE ")?;
41+
}
42+
write!(f, "VIRTUAL COLUMN ")?;
43+
if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option {
44+
if if_not_exists {
45+
write!(f, "IF NOT EXISTS ")?;
46+
}
3947
}
4048
write!(f, "(")?;
4149
write_comma_separated_list(f, &self.virtual_columns)?;

src/query/ast/src/parser/statement.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -996,18 +996,31 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
996996
},
997997
);
998998

999-
let create_virtual_column = map(
999+
let create_virtual_column = map_res(
10001000
rule! {
1001-
CREATE ~ VIRTUAL ~ COLUMN ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ ^"(" ~ ^#comma_separated_list1(expr) ~ ^")" ~ FOR ~ #dot_separated_idents_1_to_3
1001+
CREATE ~ (OR ~ REPLACE)? ~ VIRTUAL ~ COLUMN ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ ^"(" ~ ^#comma_separated_list1(expr) ~ ^")" ~ FOR ~ #dot_separated_idents_1_to_3
10021002
},
1003-
|(_, _, _, opt_if_not_exists, _, virtual_columns, _, _, (catalog, database, table))| {
1004-
Statement::CreateVirtualColumn(CreateVirtualColumnStmt {
1005-
if_not_exists: opt_if_not_exists.is_some(),
1003+
|(
1004+
_,
1005+
opt_or_replace,
1006+
_,
1007+
_,
1008+
opt_if_not_exists,
1009+
_,
1010+
virtual_columns,
1011+
_,
1012+
_,
1013+
(catalog, database, table),
1014+
)| {
1015+
let create_option =
1016+
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
1017+
Ok(Statement::CreateVirtualColumn(CreateVirtualColumnStmt {
1018+
create_option,
10061019
catalog,
10071020
database,
10081021
table,
10091022
virtual_columns,
1010-
})
1023+
}))
10111024
},
10121025
);
10131026

src/query/ast/tests/it/parser.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ fn test_statement() {
513513
r#"DESC MASKING POLICY email_mask"#,
514514
r#"DROP MASKING POLICY IF EXISTS email_mask"#,
515515
r#"CREATE VIRTUAL COLUMN (a['k1']['k2'], b[0][1]) FOR t"#,
516+
r#"CREATE OR REPLACE VIRTUAL COLUMN (a['k1']['k2'], b[0][1]) FOR t"#,
516517
r#"ALTER VIRTUAL COLUMN (a['k1']['k2'], b[0][1]) FOR t"#,
517518
r#"DROP VIRTUAL COLUMN FOR t"#,
518519
r#"REFRESH VIRTUAL COLUMN FOR t"#,

0 commit comments

Comments
 (0)