@@ -19,8 +19,8 @@ use databend_common_meta_app::app_error::DatamaskAlreadyExists;
19
19
use databend_common_meta_app:: app_error:: UnknownDatamask ;
20
20
use databend_common_meta_app:: data_mask:: CreateDatamaskReply ;
21
21
use databend_common_meta_app:: data_mask:: CreateDatamaskReq ;
22
+ use databend_common_meta_app:: data_mask:: DataMaskIdIdent ;
22
23
use databend_common_meta_app:: data_mask:: DataMaskNameIdent ;
23
- use databend_common_meta_app:: data_mask:: DatamaskId ;
24
24
use databend_common_meta_app:: data_mask:: DatamaskMeta ;
25
25
use databend_common_meta_app:: data_mask:: DropDatamaskReply ;
26
26
use databend_common_meta_app:: data_mask:: DropDatamaskReq ;
@@ -32,9 +32,11 @@ use databend_common_meta_app::id_generator::IdGenerator;
32
32
use databend_common_meta_app:: schema:: CreateOption ;
33
33
use databend_common_meta_app:: schema:: TableId ;
34
34
use databend_common_meta_app:: schema:: TableMeta ;
35
+ use databend_common_meta_app:: KeyWithTenant ;
35
36
use databend_common_meta_kvapi:: kvapi;
36
37
use databend_common_meta_types:: ConditionResult :: Eq ;
37
38
use databend_common_meta_types:: MetaError ;
39
+ use databend_common_meta_types:: SeqValue ;
38
40
use databend_common_meta_types:: TxnCondition ;
39
41
use databend_common_meta_types:: TxnOp ;
40
42
use databend_common_meta_types:: TxnRequest ;
@@ -46,10 +48,12 @@ use crate::fetch_id;
46
48
use crate :: get_pb_value;
47
49
use crate :: get_u64_value;
48
50
use crate :: kv_app_error:: KVAppError ;
51
+ use crate :: kv_pb_api:: KVPbApi ;
49
52
use crate :: send_txn;
50
53
use crate :: serialize_struct;
51
54
use crate :: serialize_u64;
52
55
use crate :: txn_backoff:: txn_backoff;
56
+ use crate :: txn_cond_eq_seq;
53
57
use crate :: txn_cond_seq;
54
58
use crate :: txn_op_del;
55
59
use crate :: txn_op_put;
@@ -64,15 +68,15 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
64
68
) -> Result < CreateDatamaskReply , KVAppError > {
65
69
debug ! ( req : ? =( & req) ; "DatamaskApi: {}" , func_name!( ) ) ;
66
70
67
- let name_key = & req. name ;
71
+ let name_ident = & req. name ;
68
72
69
73
let mut trials = txn_backoff ( None , func_name ! ( ) ) ;
70
74
let id = loop {
71
75
trials. next ( ) . unwrap ( ) ?. await ;
72
76
73
77
// Get db mask by name to ensure absence
74
- let ( seq, id) = get_u64_value ( self , name_key ) . await ?;
75
- debug ! ( seq = seq, id = id, name_key : ? =( name_key ) ; "create_data_mask" ) ;
78
+ let ( seq, id) = get_u64_value ( self , name_ident ) . await ?;
79
+ debug ! ( seq = seq, id = id, name_key : ? =( name_ident ) ; "create_data_mask" ) ;
76
80
77
81
let mut condition = vec ! [ ] ;
78
82
let mut if_then = vec ! [ ] ;
@@ -82,7 +86,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
82
86
CreateOption :: Create => {
83
87
return Err ( KVAppError :: AppError ( AppError :: DatamaskAlreadyExists (
84
88
DatamaskAlreadyExists :: new (
85
- name_key . name ( ) ,
89
+ name_ident . name ( ) ,
86
90
format ! ( "create data mask: {}" , req. name. display( ) ) ,
87
91
) ,
88
92
) ) ) ;
@@ -91,7 +95,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
91
95
CreateOption :: CreateOrReplace => {
92
96
construct_drop_mask_policy_operations (
93
97
self ,
94
- name_key ,
98
+ name_ident ,
95
99
false ,
96
100
false ,
97
101
func_name ! ( ) ,
@@ -109,22 +113,23 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
109
113
// data mask name -> data mask table id list
110
114
111
115
let id = fetch_id ( self , IdGenerator :: data_mask_id ( ) ) . await ?;
112
- let id_key = DatamaskId { id } ;
113
- let id_list_key = MaskPolicyTableIdListIdent :: new_from ( name_key. clone ( ) ) ;
116
+
117
+ let id_ident = DataMaskIdIdent :: new ( name_ident. tenant ( ) , id) ;
118
+ let id_list_key = MaskPolicyTableIdListIdent :: new_from ( name_ident. clone ( ) ) ;
114
119
115
120
debug ! (
116
- id : ? =( & id_key ) ,
117
- name_key : ? =( name_key ) ;
121
+ id : ? =( & id_ident ) ,
122
+ name_key : ? =( name_ident ) ;
118
123
"new datamask id"
119
124
) ;
120
125
121
126
{
122
127
let meta: DatamaskMeta = req. clone ( ) . into ( ) ;
123
128
let id_list = MaskpolicyTableIdList :: default ( ) ;
124
- condition. push ( txn_cond_seq ( name_key , Eq , seq) ) ;
129
+ condition. push ( txn_cond_seq ( name_ident , Eq , seq) ) ;
125
130
if_then. extend ( vec ! [
126
- txn_op_put( name_key , serialize_u64( id) ?) , // name -> db_id
127
- txn_op_put( & id_key , serialize_struct( & meta) ?) , // id -> meta
131
+ txn_op_put( name_ident , serialize_u64( id) ?) , // name -> db_id
132
+ txn_op_put( & id_ident , serialize_struct( & meta) ?) , // id -> meta
128
133
txn_op_put( & id_list_key, serialize_struct( & id_list) ?) , /* data mask name -> id_list */
129
134
] ) ;
130
135
@@ -137,8 +142,8 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
137
142
let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
138
143
139
144
debug ! (
140
- name : ? =( name_key ) ,
141
- id : ? =( & id_key ) ,
145
+ name : ? =( name_ident ) ,
146
+ id : ? =( & id_ident ) ,
142
147
succ = succ;
143
148
"create_data_mask"
144
149
) ;
@@ -217,35 +222,28 @@ async fn get_data_mask_or_err(
217
222
msg : impl Display ,
218
223
) -> Result < ( u64 , u64 , u64 , DatamaskMeta ) , KVAppError > {
219
224
let ( id_seq, id) = get_u64_value ( kv_api, name_key) . await ?;
220
- data_mask_has_to_exist ( id_seq, name_key, & msg) ?;
225
+ assert_data_mask_exist ( id_seq, name_key, & msg) ?;
221
226
222
- let id_key = DatamaskId { id } ;
227
+ let id_ident = DataMaskIdIdent :: new ( name_key . tenant ( ) , id ) ;
223
228
224
- let ( data_mask_seq , data_mask ) = get_pb_value ( kv_api, & id_key ) . await ?;
225
- data_mask_has_to_exist ( data_mask_seq , name_key, msg) ?;
229
+ let seq_v = kv_api. get_pb ( & id_ident ) . await ?;
230
+ assert_data_mask_exist ( seq_v . seq ( ) , name_key, msg) ?;
226
231
227
- Ok ( (
228
- id_seq,
229
- id,
230
- data_mask_seq,
231
- // Safe unwrap(): data_mask_seq > 0 implies data_mask is not None.
232
- data_mask. unwrap ( ) ,
233
- ) )
232
+ // Safe unwrap(): data_mask_seq > 0 implies data_mask is not None.
233
+ Ok ( ( id_seq, id, seq_v. seq ( ) , seq_v. unwrap ( ) . data ) )
234
234
}
235
235
236
- /// Return OK if a db_id or db_meta exists by checking the seq.
237
- ///
238
- /// Otherwise returns UnknownDatamask error
239
- pub fn data_mask_has_to_exist (
236
+ pub fn assert_data_mask_exist (
240
237
seq : u64 ,
241
238
name_ident : & DataMaskNameIdent ,
242
239
msg : impl Display ,
243
- ) -> Result < ( ) , KVAppError > {
240
+ ) -> Result < ( ) , AppError > {
244
241
if seq == 0 {
245
242
debug ! ( seq = seq, name_ident : ? =( name_ident) ; "data mask does not exist" ) ;
246
243
247
- Err ( KVAppError :: AppError ( AppError :: UnknownDatamask (
248
- UnknownDatamask :: new ( name_ident. name ( ) , format ! ( "{}: {}" , msg, name_ident. name( ) ) ) ,
244
+ Err ( AppError :: UnknownDatamask ( UnknownDatamask :: new (
245
+ name_ident. name ( ) ,
246
+ format ! ( "{}: {}" , msg, name_ident. data_mask_name( ) ) ,
249
247
) ) )
250
248
} else {
251
249
Ok ( ( ) )
@@ -319,20 +317,21 @@ async fn construct_drop_mask_policy_operations(
319
317
return Err ( err) ;
320
318
}
321
319
} ;
322
- let id_key = DatamaskId { id } ;
323
320
324
- condition. push ( txn_cond_seq ( & id_key, Eq , data_mask_seq) ) ;
325
- if_then. push ( txn_op_del ( & id_key) ) ;
321
+ let id_ident = DataMaskIdIdent :: new ( name_key. tenant ( ) , id) ;
322
+
323
+ condition. push ( txn_cond_eq_seq ( & id_ident, data_mask_seq) ) ;
324
+ if_then. push ( txn_op_del ( & id_ident) ) ;
326
325
327
326
if if_delete {
328
- condition. push ( txn_cond_seq ( name_key, Eq , id_seq) ) ;
327
+ condition. push ( txn_cond_eq_seq ( name_key, id_seq) ) ;
329
328
if_then. push ( txn_op_del ( name_key) ) ;
330
329
clear_table_column_mask_policy ( kv_api, name_key, condition, if_then) . await ?;
331
330
}
332
331
333
332
debug ! (
334
333
name : ? =( name_key) ,
335
- id : ? =( & DatamaskId { id } ) ,
334
+ id : ? =id ,
336
335
ctx = ctx;
337
336
"construct_drop_mask_policy_operations"
338
337
) ;
0 commit comments