Skip to content

Commit 87fe5ab

Browse files
committed
feat: add select from share db and show tables from share db SQL support
1 parent bb42c39 commit 87fe5ab

File tree

2 files changed

+396
-72
lines changed

2 files changed

+396
-72
lines changed

src/meta/api/src/schema_api_impl.rs

Lines changed: 236 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::collections::BTreeMap;
1616
use std::fmt::Display;
17+
use std::iter::FromIterator;
1718
use std::sync::Arc;
1819

1920
use common_datavalues::chrono::DateTime;
@@ -72,6 +73,7 @@ use common_meta_app::schema::UpsertTableCopiedFileReply;
7273
use common_meta_app::schema::UpsertTableCopiedFileReq;
7374
use common_meta_app::schema::UpsertTableOptionReply;
7475
use common_meta_app::schema::UpsertTableOptionReq;
76+
use common_meta_app::share::ShareGrantObject;
7577
use common_meta_app::share::ShareGrantObjectPrivilege;
7678
use common_meta_app::share::ShareId;
7779
use common_meta_app::share::ShareNameIdent;
@@ -81,6 +83,7 @@ use common_meta_types::app_error::CreateTableWithDropTime;
8183
use common_meta_types::app_error::DatabaseAlreadyExists;
8284
use common_meta_types::app_error::DropDbWithDropTime;
8385
use common_meta_types::app_error::DropTableWithDropTime;
86+
use common_meta_types::app_error::ShareHasNoGrantedDatabase;
8487
use common_meta_types::app_error::ShareHasNoGrantedPrivilege;
8588
use common_meta_types::app_error::TableAlreadyExists;
8689
use common_meta_types::app_error::TableVersionMismatched;
@@ -94,6 +97,7 @@ use common_meta_types::app_error::UnknownShareAccounts;
9497
use common_meta_types::app_error::UnknownTable;
9598
use common_meta_types::app_error::UnknownTableId;
9699
use common_meta_types::app_error::WrongShare;
100+
use common_meta_types::app_error::WrongShareObject;
97101
use common_meta_types::ConditionResult;
98102
use common_meta_types::GCDroppedDataReply;
99103
use common_meta_types::GCDroppedDataReq;
@@ -1491,24 +1495,44 @@ impl<KV: KVApi> SchemaApi for KV {
14911495

14921496
// Get db by name to ensure presence
14931497

1494-
let (db_id_seq, db_id) = get_u64_value(self, &tenant_dbname).await?;
1495-
debug!(db_id_seq, db_id, ?tenant_dbname_tbname, "get database");
1496-
1497-
db_has_to_exist(
1498-
db_id_seq,
1498+
let res = get_db_or_err(
1499+
self,
14991500
&tenant_dbname,
1500-
format!("get_table: {}", tenant_dbname_tbname),
1501-
)?;
1501+
format!("get_table: {}", tenant_dbname),
1502+
)
1503+
.await;
1504+
1505+
let (_db_id_seq, db_id, _db_meta_seq, db_meta) = match res {
1506+
Ok(x) => x,
1507+
Err(e) => {
1508+
return Err(e);
1509+
}
1510+
};
15021511

1503-
// Get table by tenant,db_id, table_name to assert presence.
1512+
let table_id = match db_meta.from_share {
1513+
Some(share) => {
1514+
get_table_id_from_share_by_name(
1515+
self,
1516+
share,
1517+
db_id,
1518+
&tenant_dbname_tbname.table_name,
1519+
)
1520+
.await?
1521+
}
1522+
None => {
1523+
// Get table by tenant,db_id, table_name to assert presence.
15041524

1505-
let dbid_tbname = DBIdTableName {
1506-
db_id,
1507-
table_name: tenant_dbname_tbname.table_name.clone(),
1508-
};
1525+
let dbid_tbname = DBIdTableName {
1526+
db_id,
1527+
table_name: tenant_dbname_tbname.table_name.clone(),
1528+
};
15091529

1510-
let (tb_id_seq, table_id) = get_u64_value(self, &dbid_tbname).await?;
1511-
table_has_to_exist(tb_id_seq, tenant_dbname_tbname, "get_table")?;
1530+
let (tb_id_seq, table_id) = get_u64_value(self, &dbid_tbname).await?;
1531+
table_has_to_exist(tb_id_seq, tenant_dbname_tbname, "get_table")?;
1532+
1533+
table_id
1534+
}
1535+
};
15121536

15131537
let tbid = TableId { table_id };
15141538

@@ -1517,7 +1541,7 @@ impl<KV: KVApi> SchemaApi for KV {
15171541
table_has_to_exist(
15181542
tb_meta_seq,
15191543
tenant_dbname_tbname,
1520-
format!("get_table meta by: {}", tbid),
1544+
format!("get_table meta by: {}", tenant_dbname_tbname),
15211545
)?;
15221546

15231547
debug!(
@@ -1529,7 +1553,7 @@ impl<KV: KVApi> SchemaApi for KV {
15291553

15301554
let tb_info = TableInfo {
15311555
ident: TableIdent {
1532-
table_id,
1556+
table_id: tbid.table_id,
15331557
seq: tb_meta_seq,
15341558
},
15351559
desc: tenant_dbname_tbname.to_string(),
@@ -1639,64 +1663,24 @@ impl<KV: KVApi> SchemaApi for KV {
16391663
let tenant_dbname = &req.inner;
16401664

16411665
// Get db by name to ensure presence
1642-
1643-
let (db_id_seq, db_id) = get_u64_value(self, tenant_dbname).await?;
1644-
debug!(
1645-
db_id_seq,
1646-
db_id,
1647-
?tenant_dbname,
1648-
"get database for listing table"
1649-
);
1650-
1651-
db_has_to_exist(db_id_seq, tenant_dbname, "list_tables")?;
1652-
1653-
// List tables by tenant, db_id, table_name.
1654-
1655-
let dbid_tbname = DBIdTableName {
1656-
db_id,
1657-
// Use empty name to scan all tables
1658-
table_name: "".to_string(),
1666+
let res = get_db_or_err(
1667+
self,
1668+
tenant_dbname,
1669+
format!("list_tables: {}", &tenant_dbname),
1670+
)
1671+
.await;
1672+
1673+
let (_db_id_seq, db_id, _db_meta_seq, db_meta) = match res {
1674+
Ok(x) => x,
1675+
Err(e) => {
1676+
return Err(e);
1677+
}
16591678
};
16601679

1661-
let (dbid_tbnames, ids) = list_u64_value(self, &dbid_tbname).await?;
1662-
1663-
let mut tb_meta_keys = Vec::with_capacity(ids.len());
1664-
for (i, _name_key) in dbid_tbnames.iter().enumerate() {
1665-
let tbid = TableId { table_id: ids[i] };
1666-
1667-
tb_meta_keys.push(tbid.to_key());
1668-
}
1669-
1670-
// mget() corresponding table_metas
1671-
1672-
let seq_tb_metas = self.mget_kv(&tb_meta_keys).await?;
1673-
1674-
let mut tb_infos = Vec::with_capacity(ids.len());
1675-
1676-
for (i, seq_meta_opt) in seq_tb_metas.iter().enumerate() {
1677-
if let Some(seq_meta) = seq_meta_opt {
1678-
let tb_meta: TableMeta = deserialize_struct(&seq_meta.data)?;
1679-
1680-
let tb_info = TableInfo {
1681-
ident: TableIdent {
1682-
table_id: ids[i],
1683-
seq: seq_meta.seq,
1684-
},
1685-
desc: format!(
1686-
"'{}'.'{}'",
1687-
tenant_dbname.db_name, dbid_tbnames[i].table_name
1688-
),
1689-
meta: tb_meta,
1690-
name: dbid_tbnames[i].table_name.clone(),
1691-
};
1692-
tb_infos.push(Arc::new(tb_info));
1693-
} else {
1694-
debug!(
1695-
k = display(&tb_meta_keys[i]),
1696-
"db_meta not found, maybe just deleted after listing names and before listing meta"
1697-
);
1698-
}
1699-
}
1680+
let tb_infos = match db_meta.from_share {
1681+
None => list_tables_from_unshare_db(self, db_id, tenant_dbname).await?,
1682+
Some(share) => list_tables_from_share_db(self, share, db_id, tenant_dbname).await?,
1683+
};
17001684

17011685
Ok(tb_infos)
17021686
}
@@ -2520,3 +2504,183 @@ async fn count_tables(kv_api: &impl KVApi, key: &CountTablesKey) -> Result<u64,
25202504
}
25212505
Ok(count)
25222506
}
2507+
2508+
async fn get_table_id_from_share_by_name(
2509+
kv_api: &impl KVApi,
2510+
share: ShareNameIdent,
2511+
db_id: u64,
2512+
table_name: &String,
2513+
) -> Result<u64, MetaError> {
2514+
let res = get_share_or_err(
2515+
kv_api,
2516+
&share,
2517+
format!("list_tables_from_share_db: {}", &share),
2518+
)
2519+
.await;
2520+
2521+
let (share_id_seq, _share_id, _share_meta_seq, share_meta) = match res {
2522+
Ok(x) => x,
2523+
Err(e) => {
2524+
return Err(e);
2525+
}
2526+
};
2527+
if share_id_seq == 0 {
2528+
return Err(MetaError::AppError(AppError::WrongShare(WrongShare::new(
2529+
share.to_string(),
2530+
))));
2531+
}
2532+
if !share_meta.share_from_db_ids.contains(&db_id) {
2533+
return Err(MetaError::AppError(AppError::ShareHasNoGrantedDatabase(
2534+
ShareHasNoGrantedDatabase::new(&share.tenant, &share.share_name),
2535+
)));
2536+
}
2537+
2538+
let mut ids = Vec::with_capacity(share_meta.entries.len());
2539+
for (_, entry) in share_meta.entries.iter() {
2540+
if let ShareGrantObject::Table(table_id) = entry.object {
2541+
ids.push(table_id);
2542+
}
2543+
}
2544+
2545+
let table_names = get_table_names_by_ids(kv_api, &ids).await?;
2546+
match table_names.binary_search(table_name) {
2547+
Ok(i) => Ok(ids[i]),
2548+
Err(_) => Err(MetaError::AppError(AppError::WrongShareObject(
2549+
WrongShareObject::new(table_name.to_string()),
2550+
))),
2551+
}
2552+
}
2553+
2554+
async fn get_table_names_by_ids(
2555+
kv_api: &impl KVApi,
2556+
ids: &[u64],
2557+
) -> Result<Vec<String>, MetaError> {
2558+
let mut table_names = vec![];
2559+
2560+
for id in ids.iter() {
2561+
let key = TableIdToName { table_id: *id };
2562+
let (_table_id_to_name_seq, table_name_opt): (_, Option<DBIdTableName>) =
2563+
get_struct_value(kv_api, &key).await?;
2564+
2565+
match table_name_opt {
2566+
Some(table_name) => table_names.push(table_name.table_name),
2567+
None => {
2568+
return Err(MetaError::AppError(AppError::UnknownTableId(
2569+
UnknownTableId::new(*id, "get_table_names_by_ids"),
2570+
)));
2571+
}
2572+
}
2573+
}
2574+
Ok(table_names)
2575+
}
2576+
2577+
async fn get_tableinfos_by_ids(
2578+
kv_api: &impl KVApi,
2579+
ids: &[u64],
2580+
tenant_dbname: &DatabaseNameIdent,
2581+
dbid_tbnames_opt: Option<Vec<DBIdTableName>>,
2582+
) -> Result<Vec<Arc<TableInfo>>, MetaError> {
2583+
let mut tb_meta_keys = Vec::with_capacity(ids.len());
2584+
for id in ids.iter() {
2585+
let tbid = TableId { table_id: *id };
2586+
2587+
tb_meta_keys.push(tbid.to_key());
2588+
}
2589+
2590+
// mget() corresponding table_metas
2591+
2592+
let seq_tb_metas = kv_api.mget_kv(&tb_meta_keys).await?;
2593+
2594+
let mut tb_infos = Vec::with_capacity(ids.len());
2595+
2596+
let tbnames = match dbid_tbnames_opt {
2597+
Some(dbid_tbnames) => Vec::<String>::from_iter(
2598+
dbid_tbnames
2599+
.into_iter()
2600+
.map(|dbid_tbname| dbid_tbname.table_name),
2601+
),
2602+
2603+
None => get_table_names_by_ids(kv_api, ids).await?,
2604+
};
2605+
2606+
for (i, seq_meta_opt) in seq_tb_metas.iter().enumerate() {
2607+
if let Some(seq_meta) = seq_meta_opt {
2608+
let tb_meta: TableMeta = deserialize_struct(&seq_meta.data)?;
2609+
2610+
let tb_info = TableInfo {
2611+
ident: TableIdent {
2612+
table_id: ids[i],
2613+
seq: seq_meta.seq,
2614+
},
2615+
desc: format!("'{}'.'{}'", tenant_dbname.db_name, tbnames[i]),
2616+
meta: tb_meta,
2617+
name: tbnames[i].clone(),
2618+
};
2619+
tb_infos.push(Arc::new(tb_info));
2620+
} else {
2621+
debug!(
2622+
k = display(&tb_meta_keys[i]),
2623+
"db_meta not found, maybe just deleted after listing names and before listing meta"
2624+
);
2625+
}
2626+
}
2627+
2628+
Ok(tb_infos)
2629+
}
2630+
2631+
async fn list_tables_from_unshare_db(
2632+
kv_api: &impl KVApi,
2633+
db_id: u64,
2634+
tenant_dbname: &DatabaseNameIdent,
2635+
) -> Result<Vec<Arc<TableInfo>>, MetaError> {
2636+
// List tables by tenant, db_id, table_name.
2637+
2638+
let dbid_tbname = DBIdTableName {
2639+
db_id,
2640+
// Use empty name to scan all tables
2641+
table_name: "".to_string(),
2642+
};
2643+
2644+
let (dbid_tbnames, ids) = list_u64_value(kv_api, &dbid_tbname).await?;
2645+
2646+
get_tableinfos_by_ids(kv_api, &ids, tenant_dbname, Some(dbid_tbnames)).await
2647+
}
2648+
2649+
async fn list_tables_from_share_db(
2650+
kv_api: &impl KVApi,
2651+
share: ShareNameIdent,
2652+
db_id: u64,
2653+
tenant_dbname: &DatabaseNameIdent,
2654+
) -> Result<Vec<Arc<TableInfo>>, MetaError> {
2655+
let res = get_share_or_err(
2656+
kv_api,
2657+
&share,
2658+
format!("list_tables_from_share_db: {}", &share),
2659+
)
2660+
.await;
2661+
2662+
let (share_id_seq, _share_id, _share_meta_seq, share_meta) = match res {
2663+
Ok(x) => x,
2664+
Err(e) => {
2665+
return Err(e);
2666+
}
2667+
};
2668+
if share_id_seq == 0 {
2669+
return Err(MetaError::AppError(AppError::WrongShare(WrongShare::new(
2670+
share.to_string(),
2671+
))));
2672+
}
2673+
if !share_meta.share_from_db_ids.contains(&db_id) {
2674+
return Err(MetaError::AppError(AppError::ShareHasNoGrantedDatabase(
2675+
ShareHasNoGrantedDatabase::new(&share.tenant, &share.share_name),
2676+
)));
2677+
}
2678+
2679+
let mut ids = Vec::with_capacity(share_meta.entries.len());
2680+
for (_, entry) in share_meta.entries.iter() {
2681+
if let ShareGrantObject::Table(table_id) = entry.object {
2682+
ids.push(table_id);
2683+
}
2684+
}
2685+
get_tableinfos_by_ids(kv_api, &ids, tenant_dbname, None).await
2686+
}

0 commit comments

Comments
 (0)