Skip to content

Commit c09bc28

Browse files
authored
refactor: optimize share table performance (#16218)
* refactor: reuse http client in share endpoint client * refactor: add global share cache manager * refactor: add global share cache manager * refactor: add share table by db name API * refactor: reuse http client in share endpoint client * refactor: add global share cache manager * refactor: add global share cache manager * refactor: add global share cache manager
1 parent 3d294c7 commit c09bc28

File tree

7 files changed

+259
-82
lines changed

7 files changed

+259
-82
lines changed

src/query/service/src/catalogs/share/share_catalog.rs

Lines changed: 102 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use databend_common_meta_app::schema::CreateVirtualColumnReq;
5151
use databend_common_meta_app::schema::DatabaseIdent;
5252
use databend_common_meta_app::schema::DatabaseInfo;
5353
use databend_common_meta_app::schema::DatabaseMeta;
54+
use databend_common_meta_app::schema::DatabaseType;
5455
use databend_common_meta_app::schema::DeleteLockRevReq;
5556
use databend_common_meta_app::schema::DropDatabaseReply;
5657
use databend_common_meta_app::schema::DropDatabaseReq;
@@ -165,6 +166,8 @@ pub struct ShareCatalog {
165166
info: Arc<CatalogInfo>,
166167

167168
option: ShareCatalogOption,
169+
170+
client: ShareEndpointClient,
168171
}
169172

170173
impl Debug for ShareCatalog {
@@ -190,8 +193,14 @@ impl ShareCatalog {
190193
},
191194
disable_table_info_refresh: false,
192195
};
196+
let client = ShareEndpointClient::new();
193197

194-
Ok(Self { info, option, ctx })
198+
Ok(Self {
199+
info,
200+
option,
201+
ctx,
202+
client,
203+
})
195204
}
196205

197206
async fn get_share_spec(&self) -> Result<ShareSpec> {
@@ -219,8 +228,8 @@ impl ShareCatalog {
219228

220229
// 2. check if ShareSpec exists using share endpoint
221230
let share_endpoint_meta = &reply.share_endpoint_meta_vec[0].1;
222-
let client = ShareEndpointClient::new();
223-
let share_spec = client
231+
let share_spec = self
232+
.client
224233
.get_share_spec_by_name(share_endpoint_meta, tenant, provider, share_name)
225234
.await?;
226235

@@ -390,72 +399,112 @@ impl Catalog for ShareCatalog {
390399
db_name: &str,
391400
table_name: &str,
392401
) -> Result<Arc<dyn Table>> {
393-
let share_spec = self.get_share_spec().await?;
394-
if let Some(use_database) = &share_spec.use_database {
395-
if use_database.name == db_name {
396-
let db_info = self.generate_share_database_info(use_database);
397-
let db = ShareDatabase::try_create(self.ctx.clone(), db_info)?;
398-
db.get_table(table_name).await
399-
} else {
400-
Err(ErrorCode::UnknownDatabase(format!(
401-
"cannot find database {} from share {}",
402-
db_name, self.option.share_name,
403-
)))
404-
}
402+
let share_option = &self.option;
403+
let share_name = &share_option.share_name;
404+
let share_endpoint = &share_option.share_endpoint;
405+
let provider = &share_option.provider;
406+
let tenant = &self.info.name_ident.tenant;
407+
408+
// 1. get share endpoint
409+
let meta_api = UserApiProvider::instance().get_meta_store_client();
410+
let req = GetShareEndpointReq {
411+
tenant: Tenant {
412+
tenant: tenant.to_owned(),
413+
},
414+
endpoint: Some(share_endpoint.clone()),
415+
};
416+
let reply = meta_api.get_share_endpoint(req).await?;
417+
if reply.share_endpoint_meta_vec.is_empty() {
418+
return Err(ErrorCode::UnknownShareEndpoint(format!(
419+
"UnknownShareEndpoint {:?}",
420+
share_endpoint
421+
)));
422+
}
423+
424+
// 2. check if ShareSpec exists using share endpoint
425+
let share_endpoint_meta = reply.share_endpoint_meta_vec[0].1.clone();
426+
let mut table_info = self
427+
.client
428+
.get_share_table(
429+
&share_endpoint_meta,
430+
tenant,
431+
provider,
432+
share_name,
433+
db_name,
434+
table_name,
435+
)
436+
.await?;
437+
438+
let db_type = table_info.db_type.clone();
439+
if let DatabaseType::ShareDB(params) = db_type {
440+
let mut params = params;
441+
params.share_endpoint_url = share_endpoint_meta.url.clone();
442+
params.share_endpoint_credential = share_endpoint_meta.credential.clone().unwrap();
443+
table_info.db_type = DatabaseType::ShareDB(params);
444+
445+
self.ctx.storage_factory.get_table(&table_info)
405446
} else {
406-
Err(ErrorCode::ShareHasNoGrantedDatabase(format!(
407-
"share {}.{} has no granted database",
408-
self.option.provider, self.option.share_name,
409-
)))
447+
unreachable!()
410448
}
411449
}
412450

413451
#[async_backtrace::framed]
414452
async fn list_tables(&self, _tenant: &Tenant, db_name: &str) -> Result<Vec<Arc<dyn Table>>> {
415-
let share_spec = self.get_share_spec().await?;
416-
if let Some(use_database) = &share_spec.use_database {
417-
if use_database.name == db_name {
418-
let db_info = self.generate_share_database_info(use_database);
419-
let db = ShareDatabase::try_create(self.ctx.clone(), db_info)?;
420-
db.list_tables().await
453+
let share_option = &self.option;
454+
let share_name = &share_option.share_name;
455+
let share_endpoint = &share_option.share_endpoint;
456+
let provider = &share_option.provider;
457+
let tenant = &self.info.name_ident.tenant;
458+
459+
// 1. get share endpoint
460+
let meta_api = UserApiProvider::instance().get_meta_store_client();
461+
let req = GetShareEndpointReq {
462+
tenant: Tenant {
463+
tenant: tenant.to_owned(),
464+
},
465+
endpoint: Some(share_endpoint.clone()),
466+
};
467+
let reply = meta_api.get_share_endpoint(req).await?;
468+
if reply.share_endpoint_meta_vec.is_empty() {
469+
return Err(ErrorCode::UnknownShareEndpoint(format!(
470+
"UnknownShareEndpoint {:?}",
471+
share_endpoint
472+
)));
473+
}
474+
475+
// 2. check if ShareSpec exists using share endpoint
476+
let share_endpoint_meta = reply.share_endpoint_meta_vec[0].1.clone();
477+
let table_info_map = self
478+
.client
479+
.get_share_tables(&share_endpoint_meta, tenant, provider, share_name, db_name)
480+
.await?;
481+
482+
let mut table_info_vec = vec![];
483+
for info in table_info_map.values() {
484+
let mut table_info = info.clone();
485+
if let DatabaseType::ShareDB(params) = &table_info.db_type {
486+
let mut params = params.clone();
487+
params.share_endpoint_url = share_endpoint_meta.url.clone();
488+
params.share_endpoint_credential = share_endpoint_meta.credential.clone().unwrap();
489+
table_info.db_type = DatabaseType::ShareDB(params);
490+
491+
table_info_vec.push(self.ctx.storage_factory.get_table(&table_info)?);
421492
} else {
422-
Err(ErrorCode::UnknownDatabase(format!(
423-
"cannot find database {} from share {}",
424-
db_name, self.option.share_name,
425-
)))
493+
unreachable!()
426494
}
427-
} else {
428-
Err(ErrorCode::ShareHasNoGrantedDatabase(format!(
429-
"share {}.{} has no granted database",
430-
self.option.provider, self.option.share_name,
431-
)))
432495
}
496+
Ok(table_info_vec)
433497
}
434498

435499
#[async_backtrace::framed]
436500
async fn list_tables_history(
437501
&self,
438502
_tenant: &Tenant,
439-
db_name: &str,
503+
_db_name: &str,
440504
) -> Result<Vec<Arc<dyn Table>>> {
441-
let share_spec = self.get_share_spec().await?;
442-
if let Some(use_database) = &share_spec.use_database {
443-
if use_database.name == db_name {
444-
let db_info = self.generate_share_database_info(use_database);
445-
let db = ShareDatabase::try_create(self.ctx.clone(), db_info)?;
446-
db.list_tables_history().await
447-
} else {
448-
Err(ErrorCode::UnknownDatabase(format!(
449-
"cannot find database {} from share {}",
450-
db_name, self.option.share_name,
451-
)))
452-
}
453-
} else {
454-
Err(ErrorCode::ShareHasNoGrantedDatabase(format!(
455-
"share {}.{} has no granted database",
456-
self.option.provider, self.option.share_name,
457-
)))
458-
}
505+
Err(ErrorCode::PermissionDenied(
506+
"Permission denied, cannot list table history from a shared database".to_string(),
507+
))
459508
}
460509

461510
#[async_backtrace::framed]

src/query/service/src/databases/share/share_database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ impl ShareDatabase {
172172

173173
let client = ShareEndpointClient::new();
174174
let table_info_map = client
175-
.get_share_tables(
175+
.get_share_tables_by_db_id(
176176
&share_endpoint_meta,
177177
self.get_tenant().tenant_name(),
178178
from_share.tenant_name(),

src/query/service/src/global_services.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_config::InnerConfig;
2626
use databend_common_exception::Result;
2727
use databend_common_meta_app::schema::CatalogType;
2828
use databend_common_sharing::ShareEndpointManager;
29+
use databend_common_sharing::SharePresignedCacheManager;
2930
use databend_common_storage::DataOperator;
3031
use databend_common_storage::ShareTableConfig;
3132
use databend_common_storages_hive::HiveCreator;
@@ -113,6 +114,7 @@ impl GlobalServices {
113114
SessionManager::init(config)?;
114115
LockManager::init()?;
115116
AuthMgr::init(config)?;
117+
SharePresignedCacheManager::init()?;
116118

117119
// Init user manager.
118120
// Builtin users and udfs are created here.

src/query/sharing/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,6 @@ pub use signer::SharedSigner;
2727

2828
mod share_endpoint;
2929
pub use share_endpoint::ShareEndpointManager;
30+
31+
mod share_presigned_cache_manager;
32+
pub use share_presigned_cache_manager::SharePresignedCacheManager;

src/query/sharing/src/share_endpoint_client.rs

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,17 @@ use ring::hmac;
3131

3232
use crate::signer::HMAC_AUTH_METHOD;
3333

34-
pub struct ShareEndpointClient {}
34+
#[derive(Clone)]
35+
pub struct ShareEndpointClient {
36+
client: reqwest::Client,
37+
}
3538

3639
impl ShareEndpointClient {
3740
#[allow(clippy::new_without_default)]
3841
pub fn new() -> Self {
39-
Self {}
42+
Self {
43+
client: reqwest::Client::new(),
44+
}
4045
}
4146

4247
pub fn generate_auth_headers(
@@ -80,8 +85,7 @@ impl ShareEndpointClient {
8085
HeaderMap::new()
8186
};
8287

83-
let client = reqwest::Client::new();
84-
let resp = client.get(&uri).headers(headers).send().await;
88+
let resp = self.client.get(&uri).headers(headers).send().await;
8589

8690
match resp {
8791
Ok(resp) => {
@@ -117,8 +121,44 @@ impl ShareEndpointClient {
117121
} else {
118122
HeaderMap::new()
119123
};
120-
let client = reqwest::Client::new();
121-
let resp = client.get(&uri).headers(headers).send().await;
124+
let resp = self.client.get(&uri).headers(headers).send().await;
125+
126+
match resp {
127+
Ok(resp) => {
128+
let body = resp.text().await?;
129+
let ret: TableInfo = serde_json::from_str(&body)?;
130+
Ok(ret)
131+
}
132+
Err(err) => {
133+
error!("get_share_spec_by_name fail: {:?}", err);
134+
Err(err.into())
135+
}
136+
}
137+
}
138+
139+
#[async_backtrace::framed]
140+
pub async fn get_share_table(
141+
&self,
142+
share_endpoint_meta: &ShareEndpointMeta,
143+
from_tenant: &str,
144+
to_tenant: &str,
145+
share_name: &str,
146+
db_name: &str,
147+
table_name: &str,
148+
) -> Result<TableInfo> {
149+
let path = format!(
150+
"/{}/{}/{}/{}/v2/share_table",
151+
to_tenant, share_name, db_name, table_name
152+
);
153+
// skip path first `/` char
154+
let uri = format!("{}{}", share_endpoint_meta.url, &path[1..]);
155+
let headers = if let Some(credential) = &share_endpoint_meta.credential {
156+
Self::generate_auth_headers(&path, credential, from_tenant)
157+
} else {
158+
HeaderMap::new()
159+
};
160+
161+
let resp = self.client.get(&uri).headers(headers).send().await;
122162

123163
match resp {
124164
Ok(resp) => {
@@ -135,6 +175,39 @@ impl ShareEndpointClient {
135175

136176
#[async_backtrace::framed]
137177
pub async fn get_share_tables(
178+
&self,
179+
share_endpoint_meta: &ShareEndpointMeta,
180+
from_tenant: &str,
181+
to_tenant: &str,
182+
share_name: &str,
183+
db_name: &str,
184+
) -> Result<BTreeMap<String, TableInfo>> {
185+
let path = format!("/{}/{}/{}/v2/share_tables", to_tenant, share_name, db_name);
186+
// skip path first `/` char
187+
let uri = format!("{}{}", share_endpoint_meta.url, &path[1..]);
188+
let headers = if let Some(credential) = &share_endpoint_meta.credential {
189+
Self::generate_auth_headers(&path, credential, from_tenant)
190+
} else {
191+
HeaderMap::new()
192+
};
193+
194+
let resp = self.client.get(&uri).headers(headers).send().await;
195+
196+
match resp {
197+
Ok(resp) => {
198+
let body = resp.text().await?;
199+
let ret: BTreeMap<String, TableInfo> = serde_json::from_str(&body)?;
200+
Ok(ret)
201+
}
202+
Err(err) => {
203+
error!("get_share_tables fail: {:?}", err);
204+
Err(err.into())
205+
}
206+
}
207+
}
208+
209+
#[async_backtrace::framed]
210+
pub async fn get_share_tables_by_db_id(
138211
&self,
139212
share_endpoint_meta: &ShareEndpointMeta,
140213
from_tenant: &str,
@@ -153,8 +226,7 @@ impl ShareEndpointClient {
153226
} else {
154227
HeaderMap::new()
155228
};
156-
let client = reqwest::Client::new();
157-
let resp = client.get(&uri).headers(headers).send().await;
229+
let resp = self.client.get(&uri).headers(headers).send().await;
158230

159231
match resp {
160232
Ok(resp) => {
@@ -190,8 +262,7 @@ impl ShareEndpointClient {
190262
&share_params.share_endpoint_credential,
191263
from_tenant,
192264
);
193-
let client = reqwest::Client::new();
194-
let resp = client.get(&uri).headers(headers).send().await;
265+
let resp = self.client.get(&uri).headers(headers).send().await;
195266

196267
match resp {
197268
Ok(resp) => {

0 commit comments

Comments
 (0)