Skip to content

Commit baed42d

Browse files
authored
feat: add create or replace masking policy support (#14656)
* feat: add create or replace masking policy support * feat: add create or replace masking policy support
1 parent 4ca2def commit baed42d

File tree

11 files changed

+270
-57
lines changed

11 files changed

+270
-57
lines changed

src/meta/api/src/data_mask_api_impl.rs

Lines changed: 88 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_meta_app::data_mask::GetDatamaskReply;
2828
use databend_common_meta_app::data_mask::GetDatamaskReq;
2929
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
3030
use databend_common_meta_app::data_mask::MaskpolicyTableIdListKey;
31+
use databend_common_meta_app::schema::CreateOption;
3132
use databend_common_meta_app::schema::TableId;
3233
use databend_common_meta_app::schema::TableMeta;
3334
use databend_common_meta_kvapi::kvapi;
@@ -74,18 +75,34 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
7475
let (seq, id) = get_u64_value(self, name_key).await?;
7576
debug!(seq = seq, id = id, name_key = as_debug!(name_key); "create_data_mask");
7677

78+
let mut condition = vec![];
79+
let mut if_then = vec![];
80+
7781
if seq > 0 {
78-
return if req.if_not_exists {
79-
Ok(CreateDatamaskReply { id })
82+
if let CreateOption::CreateIfNotExists(if_not_exists) = req.create_option {
83+
return if if_not_exists {
84+
Ok(CreateDatamaskReply { id })
85+
} else {
86+
Err(KVAppError::AppError(AppError::DatamaskAlreadyExists(
87+
DatamaskAlreadyExists::new(
88+
&name_key.name,
89+
format!("create data mask: {}", req.name),
90+
),
91+
)))
92+
};
8093
} else {
81-
Err(KVAppError::AppError(AppError::DatamaskAlreadyExists(
82-
DatamaskAlreadyExists::new(
83-
&name_key.name,
84-
format!("create data mask: {}", req.name),
85-
),
86-
)))
87-
};
88-
}
94+
construct_drop_mask_policy_operations(
95+
self,
96+
name_key,
97+
false,
98+
false,
99+
func_name!(),
100+
&mut condition,
101+
&mut if_then,
102+
)
103+
.await?;
104+
}
105+
};
89106

90107
// Create data mask by inserting these record:
91108
// name -> id
@@ -108,12 +125,12 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
108125
{
109126
let meta: DatamaskMeta = req.clone().into();
110127
let id_list = MaskpolicyTableIdList::default();
111-
let condition = vec![txn_cond_seq(name_key, Eq, 0)];
112-
let if_then = vec![
128+
condition.push(txn_cond_seq(name_key, Eq, seq));
129+
if_then.extend( vec![
113130
txn_op_put(name_key, serialize_u64(id)?), // name -> db_id
114131
txn_op_put(&id_key, serialize_struct(&meta)?), // id -> meta
115132
txn_op_put(&id_list_key, serialize_struct(&id_list)?), /* data mask name -> id_list */
116-
];
133+
]);
117134

118135
let txn_req = TxnRequest {
119136
condition,
@@ -148,41 +165,27 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
148165
loop {
149166
trials.next().unwrap()?.await;
150167

151-
let result =
152-
get_data_mask_or_err(self, name_key, format!("drop_data_mask: {}", name_key)).await;
153-
154-
let (id_seq, id, data_mask_seq, _) = match result {
155-
Ok((id_seq, id, data_mask_seq, meta)) => (id_seq, id, data_mask_seq, meta),
156-
Err(err) => {
157-
if let KVAppError::AppError(AppError::UnknownDatamask(_)) = err {
158-
if req.if_exists {
159-
return Ok(DropDatamaskReply {});
160-
}
161-
}
162-
163-
return Err(err);
164-
}
165-
};
166-
let id_key = DatamaskId { id };
167-
let mut condition = vec![
168-
txn_cond_seq(name_key, Eq, id_seq),
169-
txn_cond_seq(&id_key, Eq, data_mask_seq),
170-
];
171-
let mut if_then = vec![txn_op_del(name_key), txn_op_del(&id_key)];
172-
173-
clear_table_column_mask_policy(self, name_key, &mut condition, &mut if_then).await?;
174-
168+
let mut condition = vec![];
169+
let mut if_then = vec![];
170+
171+
construct_drop_mask_policy_operations(
172+
self,
173+
name_key,
174+
req.if_exists,
175+
true,
176+
func_name!(),
177+
&mut condition,
178+
&mut if_then,
179+
)
180+
.await?;
175181
let txn_req = TxnRequest {
176182
condition,
177183
if_then,
178184
else_then: vec![],
179185
};
180-
181186
let (succ, _responses) = send_txn(self, txn_req).await?;
182187

183188
debug!(
184-
name = as_debug!(name_key),
185-
id = as_debug!(&DatamaskId { id }),
186189
succ = succ;
187190
"drop_data_mask"
188191
);
@@ -289,3 +292,48 @@ async fn clear_table_column_mask_policy(
289292

290293
Ok(())
291294
}
295+
296+
async fn construct_drop_mask_policy_operations(
297+
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
298+
name_key: &DatamaskNameIdent,
299+
drop_if_exists: bool,
300+
if_delete: bool,
301+
ctx: &str,
302+
condition: &mut Vec<TxnCondition>,
303+
if_then: &mut Vec<TxnOp>,
304+
) -> Result<(), KVAppError> {
305+
let result =
306+
get_data_mask_or_err(kv_api, name_key, format!("drop_data_mask: {}", name_key)).await;
307+
308+
let (id_seq, id, data_mask_seq, _) = match result {
309+
Ok((id_seq, id, data_mask_seq, meta)) => (id_seq, id, data_mask_seq, meta),
310+
Err(err) => {
311+
if let KVAppError::AppError(AppError::UnknownDatamask(_)) = err {
312+
if drop_if_exists {
313+
return Ok(());
314+
}
315+
}
316+
317+
return Err(err);
318+
}
319+
};
320+
let id_key = DatamaskId { id };
321+
322+
condition.push(txn_cond_seq(&id_key, Eq, data_mask_seq));
323+
if_then.push(txn_op_del(&id_key));
324+
325+
if if_delete {
326+
condition.push(txn_cond_seq(name_key, Eq, id_seq));
327+
if_then.push(txn_op_del(name_key));
328+
clear_table_column_mask_policy(kv_api, name_key, condition, if_then).await?;
329+
}
330+
331+
debug!(
332+
name = as_debug!(name_key),
333+
id = as_debug!(&DatamaskId { id }),
334+
ctx = ctx;
335+
"construct_drop_mask_policy_operations"
336+
);
337+
338+
Ok(())
339+
}

src/meta/api/src/schema_api_impl.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1419,6 +1419,12 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
14191419

14201420
let (succ, _responses) = send_txn(self, txn_req).await?;
14211421

1422+
debug!(
1423+
"name_ident" = as_debug!(&req.name_ident),
1424+
succ = succ;
1425+
"drop_virtual_column"
1426+
);
1427+
14221428
if succ {
14231429
break;
14241430
}
@@ -3790,6 +3796,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
37903796
"SchemaApiImpl".to_string()
37913797
}
37923798
}
3799+
37933800
async fn construct_drop_virtual_column_txn_operations(
37943801
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
37953802
name_ident: &VirtualColumnNameIdent,

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use databend_common_expression::TableDataType;
2828
use databend_common_expression::TableField;
2929
use databend_common_expression::TableSchema;
3030
use databend_common_meta_app::data_mask::CreateDatamaskReq;
31+
use databend_common_meta_app::data_mask::DatamaskId;
32+
use databend_common_meta_app::data_mask::DatamaskMeta;
3133
use databend_common_meta_app::data_mask::DatamaskNameIdent;
3234
use databend_common_meta_app::data_mask::DropDatamaskReq;
3335
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
@@ -2732,7 +2734,7 @@ impl SchemaApiTestSuite {
27322734
info!("--- create mask policy");
27332735
{
27342736
let req = CreateDatamaskReq {
2735-
if_not_exists: true,
2737+
create_option: CreateOption::CreateIfNotExists(true),
27362738
name: DatamaskNameIdent {
27372739
tenant: tenant.to_string(),
27382740
name: mask_name_1.to_string(),
@@ -2746,7 +2748,7 @@ impl SchemaApiTestSuite {
27462748
mt.create_data_mask(req).await?;
27472749

27482750
let req = CreateDatamaskReq {
2749-
if_not_exists: true,
2751+
create_option: CreateOption::CreateIfNotExists(true),
27502752
name: DatamaskNameIdent {
27512753
tenant: tenant.to_string(),
27522754
name: mask_name_2.to_string(),
@@ -2979,6 +2981,46 @@ impl SchemaApiTestSuite {
29792981
assert!(id_list.is_err())
29802982
}
29812983

2984+
info!("--- create or replace mask policy");
2985+
{
2986+
let mask_name = "replace_mask";
2987+
let name = DatamaskNameIdent {
2988+
tenant: tenant.to_string(),
2989+
name: mask_name.to_string(),
2990+
};
2991+
let req = CreateDatamaskReq {
2992+
create_option: CreateOption::CreateIfNotExists(true),
2993+
name: name.clone(),
2994+
args: vec![],
2995+
return_type: "".to_string(),
2996+
body: "".to_string(),
2997+
comment: Some("before".to_string()),
2998+
create_on: created_on,
2999+
};
3000+
mt.create_data_mask(req).await?;
3001+
let old_id: u64 = get_kv_u64_data(mt.as_kv_api(), &name).await?;
3002+
let id_key = DatamaskId { id: old_id };
3003+
let meta: DatamaskMeta = get_kv_data(mt.as_kv_api(), &id_key).await?;
3004+
assert_eq!(meta.comment, Some("before".to_string()));
3005+
3006+
let req = CreateDatamaskReq {
3007+
create_option: CreateOption::CreateOrReplace,
3008+
name: name.clone(),
3009+
args: vec![],
3010+
return_type: "".to_string(),
3011+
body: "".to_string(),
3012+
comment: Some("after".to_string()),
3013+
create_on: created_on,
3014+
};
3015+
mt.create_data_mask(req).await?;
3016+
3017+
let id: u64 = get_kv_u64_data(mt.as_kv_api(), &name).await?;
3018+
assert_ne!(old_id, id);
3019+
let id_key = DatamaskId { id };
3020+
let meta: DatamaskMeta = get_kv_data(mt.as_kv_api(), &id_key).await?;
3021+
assert_eq!(meta.comment, Some("after".to_string()));
3022+
}
3023+
29823024
Ok(())
29833025
}
29843026

src/meta/app/src/data_mask/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use std::fmt::Formatter;
1919
use chrono::DateTime;
2020
use chrono::Utc;
2121

22+
use crate::schema::CreateOption;
23+
2224
const PREFIX_DATAMASK: &str = "__fd_datamask";
2325
const PREFIX_DATAMASK_BY_ID: &str = "__fd_datamask_by_id";
2426
const PREFIX_DATAMASK_ID_LIST: &str = "__fd_datamask_id_list";
@@ -72,7 +74,7 @@ impl From<CreateDatamaskReq> for DatamaskMeta {
7274

7375
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
7476
pub struct CreateDatamaskReq {
75-
pub if_not_exists: bool,
77+
pub create_option: CreateOption,
7678
pub name: DatamaskNameIdent,
7779
pub args: Vec<(String, String)>,
7880
pub return_type: String,

src/query/ast/src/ast/statements/data_mask.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
use std::fmt::Display;
1616
use std::fmt::Formatter;
1717

18+
use databend_common_meta_app::schema::CreateOption;
19+
1820
use crate::ast::Expr;
1921
use crate::ast::TypeName;
2022

@@ -34,16 +36,22 @@ pub struct DataMaskPolicy {
3436

3537
#[derive(Debug, Clone, PartialEq)]
3638
pub struct CreateDatamaskPolicyStmt {
37-
pub if_not_exists: bool,
39+
pub create_option: CreateOption,
3840
pub name: String,
3941
pub policy: DataMaskPolicy,
4042
}
4143

4244
impl Display for CreateDatamaskPolicyStmt {
4345
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
44-
write!(f, "CREATE MASKING POLICY ")?;
45-
if self.if_not_exists {
46-
write!(f, "IF NOT EXISTS ")?;
46+
write!(f, "CREATE ")?;
47+
if let CreateOption::CreateOrReplace = self.create_option {
48+
write!(f, "OR REPLACE ")?;
49+
}
50+
write!(f, "MASKING POLICY ")?;
51+
if let CreateOption::CreateIfNotExists(if_not_exists) = self.create_option {
52+
if if_not_exists {
53+
write!(f, "IF NOT EXISTS ")?;
54+
}
4755
}
4856
write!(f, "{} AS (", self.name)?;
4957
let mut flag = false;

src/query/ast/src/parser/statement.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1581,17 +1581,19 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
15811581
let show_file_formats = value(Statement::ShowFileFormats, rule! { SHOW ~ FILE ~ FORMATS });
15821582

15831583
// data mark policy
1584-
let create_data_mask_policy = map(
1584+
let create_data_mask_policy = map_res(
15851585
rule! {
1586-
CREATE ~ MASKING ~ POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ #ident ~ #data_mask_policy
1586+
CREATE ~ (OR ~ REPLACE)? ~ MASKING ~ POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ #ident ~ #data_mask_policy
15871587
},
1588-
|(_, _, _, opt_if_not_exists, name, policy)| {
1588+
|(_, opt_or_replace, _, _, opt_if_not_exists, name, policy)| {
1589+
let create_option =
1590+
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;
15891591
let stmt = CreateDatamaskPolicyStmt {
1590-
if_not_exists: opt_if_not_exists.is_some(),
1592+
create_option,
15911593
name: name.to_string(),
15921594
policy,
15931595
};
1594-
Statement::CreateDatamaskPolicy(stmt)
1596+
Ok(Statement::CreateDatamaskPolicy(stmt))
15951597
},
15961598
);
15971599
let drop_data_mask_policy = map(

src/query/ast/tests/it/parser.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,7 @@ fn test_statement() {
510510
r#"SELECT * FROM t GROUP BY CUBE (a, b, c)"#,
511511
r#"SELECT * FROM t GROUP BY ROLLUP (a, b, c)"#,
512512
r#"CREATE MASKING POLICY email_mask AS (val STRING) RETURNS STRING -> CASE WHEN current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy'"#,
513+
r#"CREATE OR REPLACE MASKING POLICY email_mask AS (val STRING) RETURNS STRING -> CASE WHEN current_role() IN ('ANALYST') THEN VAL ELSE '*********'END comment = 'this is a masking policy'"#,
513514
r#"DESC MASKING POLICY email_mask"#,
514515
r#"DROP MASKING POLICY IF EXISTS email_mask"#,
515516
r#"CREATE VIRTUAL COLUMN (a['k1']['k2'], b[0][1]) FOR t"#,

0 commit comments

Comments
 (0)