Skip to content

Commit 8ce71df

Browse files
authored
feat: add create or replace share endpoint support (#14660)
1 parent 8c85ace commit 8ce71df

File tree

12 files changed

+357
-105
lines changed

12 files changed

+357
-105
lines changed

src/meta/api/src/schema_api_impl.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
10111011
let mut condition = vec![];
10121012
let mut if_then = vec![];
10131013

1014-
let (index_id, _) = construct_drop_index_txn_operations(
1014+
let (index_id, index_id_seq) = construct_drop_index_txn_operations(
10151015
self,
10161016
tenant_index,
10171017
req.if_exists,
@@ -1020,6 +1020,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
10201020
&mut if_then,
10211021
)
10221022
.await?;
1023+
if index_id_seq == 0 {
1024+
return Ok(DropIndexReply {});
1025+
}
10231026

10241027
let txn_req = TxnRequest {
10251028
condition,
@@ -1401,7 +1404,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
14011404
trials.next().unwrap()?.await;
14021405

14031406
let mut if_then = vec![];
1404-
construct_drop_virtual_column_txn_operations(
1407+
let seq = construct_drop_virtual_column_txn_operations(
14051408
self,
14061409
&req.name_ident,
14071410
req.if_exists,
@@ -1410,6 +1413,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
14101413
&mut if_then,
14111414
)
14121415
.await?;
1416+
if seq == 0 {
1417+
return Ok(DropVirtualColumnReply {});
1418+
}
14131419

14141420
let txn_req = TxnRequest {
14151421
condition: vec![],
@@ -1627,7 +1633,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
16271633
};
16281634
} else {
16291635
// create or replace
1630-
drop_table_by_id(
1636+
construct_drop_table_txn_operations(
16311637
self,
16321638
req.name_ident.table_name.clone(),
16331639
req.name_ident.tenant.clone(),
@@ -2457,7 +2463,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
24572463
let mut condition = vec![];
24582464
let mut if_then = vec![];
24592465

2460-
let opt = drop_table_by_id(
2466+
let opt = construct_drop_table_txn_operations(
24612467
self,
24622468
req.table_name.clone(),
24632469
req.tenant.clone(),
@@ -2470,6 +2476,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
24702476
&mut if_then,
24712477
)
24722478
.await?;
2479+
// seq == 0 means that req.if_exists == true and cannot find table meta,
2480+
// in this case just return directly
2481+
if opt.1 == 0 {
2482+
return Ok(DropTableReply { spec_vec: None });
2483+
}
24732484
let txn_req = TxnRequest {
24742485
condition,
24752486
if_then,
@@ -3883,7 +3894,7 @@ async fn construct_drop_index_txn_operations(
38833894
Ok((index_id, index_id_seq))
38843895
}
38853896

3886-
async fn drop_table_by_id(
3897+
async fn construct_drop_table_txn_operations(
38873898
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
38883899
table_name: String,
38893900
tenant: String,

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3014,6 +3014,10 @@ impl SchemaApiTestSuite {
30143014
};
30153015
mt.create_data_mask(req).await?;
30163016

3017+
// assert old id key has been deleted
3018+
let meta: Result<DatamaskMeta, KVAppError> = get_kv_data(mt.as_kv_api(), &id_key).await;
3019+
assert!(meta.is_err());
3020+
30173021
let id: u64 = get_kv_u64_data(mt.as_kv_api(), &name).await?;
30183022
assert_ne!(old_id, id);
30193023
let id_key = DatamaskId { id };
@@ -5956,7 +5960,13 @@ impl SchemaApiTestSuite {
59565960
name_ident: replace_name_ident.clone(),
59575961
};
59585962

5959-
let _res = mt.create_index(req).await?;
5963+
let res = mt.create_index(req).await?;
5964+
let old_index_id = res.index_id;
5965+
let old_index_id_key = IndexId {
5966+
index_id: old_index_id,
5967+
};
5968+
let meta: IndexMeta = get_kv_data(mt.as_kv_api(), &old_index_id_key).await?;
5969+
assert_eq!(meta, index_meta_1);
59605970

59615971
let resp = mt.get_index(get_req.clone()).await?;
59625972

@@ -5968,7 +5978,17 @@ impl SchemaApiTestSuite {
59685978
meta: index_meta_2.clone(),
59695979
};
59705980

5971-
let _res = mt.create_index(req).await?;
5981+
let res = mt.create_index(req).await?;
5982+
5983+
// assert old index id key has been deleted
5984+
let meta: IndexMeta = get_kv_data(mt.as_kv_api(), &old_index_id_key).await?;
5985+
assert!(meta.dropped_on.is_some());
5986+
5987+
// assert new index id key has been created
5988+
let index_id = res.index_id;
5989+
let index_id_key = IndexId { index_id };
5990+
let meta: IndexMeta = get_kv_data(mt.as_kv_api(), &index_id_key).await?;
5991+
assert_eq!(meta, index_meta_2);
59725992

59735993
let resp = mt.get_index(get_req).await?;
59745994

src/meta/api/src/share_api_impl.rs

Lines changed: 130 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use databend_common_meta_app::app_error::UnknownShareAccounts;
2424
use databend_common_meta_app::app_error::UnknownTable;
2525
use databend_common_meta_app::app_error::WrongShare;
2626
use databend_common_meta_app::app_error::WrongShareObject;
27+
use databend_common_meta_app::schema::CreateOption;
2728
use databend_common_meta_app::schema::DBIdTableName;
2829
use databend_common_meta_app::schema::DatabaseId;
2930
use databend_common_meta_app::schema::DatabaseIdToName;
@@ -988,17 +989,33 @@ impl<KV: kvapi::KVApi<Error = MetaError>> ShareApi for KV {
988989
"create_share_endpoint"
989990
);
990991

992+
let mut condition = vec![];
993+
let mut if_then = vec![];
994+
991995
if share_endpoint_id_seq > 0 {
992-
return if req.if_not_exists {
993-
Ok(CreateShareEndpointReply { share_endpoint_id })
996+
if let CreateOption::CreateIfNotExists(if_not_exists) = req.create_option {
997+
return if if_not_exists {
998+
Ok(CreateShareEndpointReply { share_endpoint_id })
999+
} else {
1000+
Err(KVAppError::AppError(AppError::ShareEndpointAlreadyExists(
1001+
ShareEndpointAlreadyExists::new(
1002+
&name_key.endpoint,
1003+
format!("create share endpoint: tenant: {}", name_key.tenant),
1004+
),
1005+
)))
1006+
};
9941007
} else {
995-
Err(KVAppError::AppError(AppError::ShareEndpointAlreadyExists(
996-
ShareEndpointAlreadyExists::new(
997-
&name_key.endpoint,
998-
format!("create share endpoint: tenant: {}", name_key.tenant),
999-
),
1000-
)))
1001-
};
1008+
construct_drop_share_endpoint_txn_operations(
1009+
self,
1010+
name_key,
1011+
false,
1012+
false,
1013+
func_name!(),
1014+
&mut condition,
1015+
&mut if_then,
1016+
)
1017+
.await?;
1018+
}
10021019
}
10031020

10041021
// Create share endpoint by inserting these record:
@@ -1019,16 +1036,19 @@ impl<KV: kvapi::KVApi<Error = MetaError>> ShareApi for KV {
10191036
// Create share endpoint by transaction.
10201037
{
10211038
let share_endpoint_meta = ShareEndpointMeta::new(&req);
1022-
let txn_req = TxnRequest {
1023-
condition: vec![
1024-
txn_cond_seq(name_key, Eq, 0),
1025-
txn_cond_seq(&id_to_name_key, Eq, 0),
1026-
],
1027-
if_then: vec![
1039+
condition.extend(vec![
1040+
txn_cond_seq(name_key, Eq, share_endpoint_id_seq),
1041+
txn_cond_seq(&id_to_name_key, Eq, 0),
1042+
]);
1043+
if_then.extend(vec![
10281044
txn_op_put(name_key, serialize_u64(share_endpoint_id)?), /* (tenant, share_endpoint_name) -> share_endpoint_id */
10291045
txn_op_put(&id_key, serialize_struct(&share_endpoint_meta)?), /* (share_endpoint_id) -> share_endpoint_meta */
10301046
txn_op_put(&id_to_name_key, serialize_struct(name_key)?), /* __fd_share_endpoint_id_to_name/<share_endpoint_id> -> (tenant,share_endpoint_name) */
1031-
],
1047+
]);
1048+
1049+
let txn_req = TxnRequest {
1050+
condition,
1051+
if_then,
10321052
else_then: vec![],
10331053
};
10341054

@@ -1200,83 +1220,115 @@ impl<KV: kvapi::KVApi<Error = MetaError>> ShareApi for KV {
12001220
loop {
12011221
trials.next().unwrap()?.await;
12021222

1203-
let res = get_share_endpoint_or_err(
1204-
self,
1205-
name_key,
1206-
format!("drop_share_endpoint: {}", &name_key),
1207-
)
1208-
.await;
1209-
1210-
let (
1211-
share_endpoint_id_seq,
1212-
share_endpoint_id,
1213-
share_endpoint_meta_seq,
1214-
_share_endpoint_meta,
1215-
) = match res {
1216-
Ok(x) => x,
1217-
Err(e) => {
1218-
if let KVAppError::AppError(AppError::UnknownShareEndpoint(_)) = e {
1219-
if req.if_exists {
1220-
return Ok(DropShareEndpointReply {});
1221-
}
1222-
}
1223+
let mut condition = vec![];
1224+
let mut if_then = vec![];
12231225

1224-
return Err(e);
1225-
}
1226+
let share_endpoint_id = if let Some(share_endpoint_id) =
1227+
construct_drop_share_endpoint_txn_operations(
1228+
self,
1229+
name_key,
1230+
req.if_exists,
1231+
true,
1232+
func_name!(),
1233+
&mut condition,
1234+
&mut if_then,
1235+
)
1236+
.await?
1237+
{
1238+
share_endpoint_id
1239+
} else {
1240+
return Ok(DropShareEndpointReply {});
12261241
};
1227-
let (share_endpoint_name_seq, _share_endpoint) = get_share_endpoint_id_to_name_or_err(
1228-
self,
1229-
share_endpoint_id,
1230-
format!("drop_share_endpoint: {}", &name_key),
1231-
)
1232-
.await?;
12331242

1234-
// Delete share endpoint by these operations:
1235-
// del (tenant, share_endpoint)
1236-
// del share_endpoint_id
1237-
// del (share_endpoint_id) -> (tenant, share_endpoint)
1243+
let txn_req = TxnRequest {
1244+
condition,
1245+
if_then,
1246+
else_then: vec![],
1247+
};
12381248

1239-
let mut condition = vec![];
1240-
let mut if_then = vec![];
1249+
let (succ, _responses) = send_txn(self, txn_req).await?;
12411250

12421251
let share_id_key = ShareEndpointId { share_endpoint_id };
1243-
let id_name_key = ShareEndpointIdToName { share_endpoint_id };
1244-
12451252
debug!(
1246-
share_endpoint_id = share_endpoint_id,
1247-
name_key = as_debug!(name_key);
1253+
name = as_debug!(name_key) ,
1254+
id = as_debug!(&share_id_key),
1255+
succ = succ;
12481256
"drop_share_endpoint"
12491257
);
12501258

1251-
{
1252-
condition.push(txn_cond_seq(name_key, Eq, share_endpoint_id_seq));
1253-
condition.push(txn_cond_seq(&share_id_key, Eq, share_endpoint_meta_seq));
1254-
condition.push(txn_cond_seq(&id_name_key, Eq, share_endpoint_name_seq));
1255-
if_then.push(txn_op_del(name_key)); // del (tenant, share_endpoint)
1256-
if_then.push(txn_op_del(&share_id_key)); // del share_endpoint_id
1257-
if_then.push(txn_op_del(&id_name_key)); // del (share_endpoint_id) -> (tenant, share_endpoint)
1258-
1259-
let txn_req = TxnRequest {
1260-
condition,
1261-
if_then,
1262-
else_then: vec![],
1263-
};
1264-
1265-
let (succ, _responses) = send_txn(self, txn_req).await?;
1259+
if succ {
1260+
return Ok(DropShareEndpointReply {});
1261+
}
1262+
}
1263+
}
1264+
}
12661265

1267-
debug!(
1268-
name = as_debug!(name_key) ,
1269-
id = as_debug!(&share_id_key),
1270-
succ = succ;
1271-
"drop_share_endpoint"
1272-
);
1266+
async fn construct_drop_share_endpoint_txn_operations(
1267+
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
1268+
name_key: &ShareEndpointIdent,
1269+
drop_if_exists: bool,
1270+
if_delete: bool,
1271+
ctx: &str,
1272+
condition: &mut Vec<TxnCondition>,
1273+
if_then: &mut Vec<TxnOp>,
1274+
) -> Result<Option<u64>, KVAppError> {
1275+
let res = get_share_endpoint_or_err(
1276+
kv_api,
1277+
name_key,
1278+
format!(
1279+
"construct_drop_share_endpoint_txn_operations: {}",
1280+
&name_key
1281+
),
1282+
)
1283+
.await;
12731284

1274-
if succ {
1275-
return Ok(DropShareEndpointReply {});
1285+
let (share_endpoint_id_seq, share_endpoint_id, share_endpoint_meta_seq, _share_endpoint_meta) =
1286+
match res {
1287+
Ok(x) => x,
1288+
Err(e) => {
1289+
if let KVAppError::AppError(AppError::UnknownShareEndpoint(_)) = e {
1290+
if drop_if_exists {
1291+
return Ok(None);
1292+
}
12761293
}
1294+
1295+
return Err(e);
12771296
}
1278-
}
1297+
};
1298+
let (share_endpoint_name_seq, _share_endpoint) = get_share_endpoint_id_to_name_or_err(
1299+
kv_api,
1300+
share_endpoint_id,
1301+
format!(
1302+
"construct_drop_share_endpoint_txn_operations: {}",
1303+
&name_key
1304+
),
1305+
)
1306+
.await?;
1307+
1308+
// Delete share endpoint by these operations:
1309+
// del (tenant, share_endpoint)
1310+
// del share_endpoint_id
1311+
// del (share_endpoint_id) -> (tenant, share_endpoint)
1312+
let share_id_key = ShareEndpointId { share_endpoint_id };
1313+
let id_name_key = ShareEndpointIdToName { share_endpoint_id };
1314+
1315+
debug!(
1316+
share_endpoint_id = share_endpoint_id,
1317+
name_key = as_debug!(name_key),
1318+
ctx = ctx;
1319+
"construct_drop_share_endpoint_txn_operations"
1320+
);
1321+
1322+
condition.push(txn_cond_seq(&share_id_key, Eq, share_endpoint_meta_seq));
1323+
condition.push(txn_cond_seq(&id_name_key, Eq, share_endpoint_name_seq));
1324+
if_then.push(txn_op_del(&share_id_key)); // del share_endpoint_id
1325+
if_then.push(txn_op_del(&id_name_key)); // del (share_endpoint_id) -> (tenant, share_endpoint)
1326+
if if_delete {
1327+
condition.push(txn_cond_seq(name_key, Eq, share_endpoint_id_seq));
1328+
if_then.push(txn_op_del(name_key)); // del (tenant, share_endpoint)
12791329
}
1330+
1331+
Ok(Some(share_endpoint_id))
12801332
}
12811333

12821334
async fn get_share_database_name(

0 commit comments

Comments
 (0)