Skip to content

Commit eb81346

Browse files
committed
feat: add X-DATABEND-TENANT in sharing request
1 parent 228f426 commit eb81346

File tree

5 files changed

+19
-1
lines changed

5 files changed

+19
-1
lines changed

src/common/storage/src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,17 +500,20 @@ impl Debug for StorageRedisConfig {
500500
pub struct ShareTableConfig {
501501
pub share_endpoint_address: Option<String>,
502502
pub share_endpoint_token: RefreshableToken,
503+
pub requester_tenant_id: String,
503504
}
504505

505506
impl ShareTableConfig {
506507
pub fn init(
507508
share_endpoint_address: &str,
508509
token_file: &str,
510+
requester_tenant_id: String,
509511
default_token: String,
510512
) -> common_exception::Result<()> {
511513
GlobalInstance::set(Self::try_create(
512514
share_endpoint_address,
513515
token_file,
516+
requester_tenant_id,
514517
default_token,
515518
)?);
516519

@@ -520,6 +523,7 @@ impl ShareTableConfig {
520523
pub fn try_create(
521524
share_endpoint_address: &str,
522525
token_file: &str,
526+
requester_tenant_id: String,
523527
default_token: String,
524528
) -> common_exception::Result<ShareTableConfig> {
525529
let share_endpoint_address = if share_endpoint_address.is_empty() {
@@ -537,6 +541,7 @@ impl ShareTableConfig {
537541
Ok(ShareTableConfig {
538542
share_endpoint_address,
539543
share_endpoint_token,
544+
requester_tenant_id,
540545
})
541546
}
542547

@@ -548,6 +553,10 @@ impl ShareTableConfig {
548553
ShareTableConfig::instance().share_endpoint_token
549554
}
550555

556+
pub fn requester_tenant_id() -> String {
557+
ShareTableConfig::instance().requester_tenant_id
558+
}
559+
551560
pub fn instance() -> ShareTableConfig {
552561
GlobalInstance::get()
553562
}

src/query/service/src/global_services.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ impl GlobalServices {
7171
&config.query.share_endpoint_address,
7272
&config.query.share_endpoint_auth_token_file,
7373
config.query.tenant_id.clone(),
74+
config.query.tenant_id.clone(),
7475
)?;
7576

7677
CacheManager::init(&config.query)?;

src/query/sharing/src/layer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use crate::SharedSigner;
5656
pub fn create_share_table_operator(
5757
share_endpoint_address: Option<String>,
5858
share_endpoint_token: RefreshableToken,
59+
requester_tenant_id: String,
5960
share_tenant_id: &str,
6061
share_name: &str,
6162
table_name: &str,
@@ -68,6 +69,7 @@ pub fn create_share_table_operator(
6869
share_endpoint_address, share_tenant_id, share_name, table_name
6970
),
7071
share_endpoint_token,
72+
requester_tenant_id,
7173
);
7274
Operator::new(apply_wrapper(SharedAccessor {
7375
signer,

src/query/sharing/src/signer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use opendal::raw::HttpClient;
3333
use opendal::raw::Operation;
3434
use opendal::raw::PresignedRequest;
3535

36+
const TENANT_HEADER: &str = "X-DATABEND-TENANT";
37+
3638
/// SharedSigner is used to track presign request, and it's response.
3739
///
3840
/// There is an internal cache about presign request. Getting an expired
@@ -43,6 +45,7 @@ pub struct SharedSigner {
4345
cache: Cache<PresignRequest, PresignedRequest>,
4446
client: HttpClient,
4547
token: RefreshableToken,
48+
requester_tenant_id: String,
4649
}
4750

4851
impl Debug for SharedSigner {
@@ -55,7 +58,7 @@ impl Debug for SharedSigner {
5558

5659
impl SharedSigner {
5760
/// Create a new SharedSigner.
58-
pub fn new(endpoint: &str, token: RefreshableToken) -> Self {
61+
pub fn new(endpoint: &str, token: RefreshableToken, req_tenant_id: String) -> Self {
5962
let cache = Cache::builder()
6063
// Databend Cloud Presign will expire after 3600s (1 hour).
6164
// We will expire them 10 minutes before to avoid edge cases.
@@ -67,6 +70,7 @@ impl SharedSigner {
6770
cache,
6871
client: HttpClient::new(),
6972
token,
73+
requester_tenant_id: req_tenant_id,
7074
}
7175
}
7276

@@ -154,6 +158,7 @@ impl SharedSigner {
154158
.uri(&self.endpoint)
155159
.header(AUTHORIZATION, auth)
156160
.header(CONTENT_LENGTH, bs.len())
161+
.header(TENANT_HEADER, self.requester_tenant_id.clone())
157162
.body(AsyncBody::Bytes(bs))?;
158163
let resp = self.client.send_async(req).await?;
159164
let bs = resp.into_body().bytes().await?;

src/query/storages/fuse/fuse/src/fuse_table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ impl FuseTable {
9898
Some(ref from_share) => create_share_table_operator(
9999
ShareTableConfig::share_endpoint_address(),
100100
ShareTableConfig::share_endpoint_token(),
101+
ShareTableConfig::requester_tenant_id(),
101102
&from_share.tenant,
102103
&from_share.share_name,
103104
&table_info.name,

0 commit comments

Comments
 (0)