Skip to content

Commit a045127

Browse files
committed
save
1 parent 69eb9d6 commit a045127

File tree

18 files changed

+876
-3
lines changed

18 files changed

+876
-3
lines changed

Cargo.lock

Lines changed: 27 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ members = [
3030
"src/query/catalog",
3131
"src/query/datablocks",
3232
"src/query/sharing",
33+
"src/query/sharing-endpoint",
3334
"src/query/datavalues",
3435
"src/query/expression",
3536
"src/query/formats",
@@ -75,6 +76,7 @@ members = [
7576
"src/meta/protos",
7677
# databend-meta
7778
"src/meta/service",
79+
7880
]
7981

8082
[workspace.dependencies]

src/binaries/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ common-metrics = { path = "../common/metrics" }
4444
common-tracing = { path = "../common/tracing" }
4545
databend-meta = { path = "../meta/service" }
4646
databend-query = { path = "../query/service" }
47+
sharing-endpoint = { path = "../query/sharing-endpoint" }
4748

4849
# Crates.io dependencies
4950
anyhow = { workspace = true }
@@ -57,6 +58,7 @@ tokio-stream = "0.1.10"
5758
tonic = "0.8.1"
5859
tracing = "0.1.36"
5960
url = "2.3.1"
61+
poem = { version = "1", features = ["rustls", "multipart", "compression"] }
6062

6163
[[bin]]
6264
name = "databend-meta"
@@ -81,3 +83,9 @@ name = "databend-query"
8183
path = "query/main.rs"
8284
doctest = false
8385
test = false
86+
87+
[[bin]]
88+
name = "open-sharing"
89+
path = "opensharing/main.rs"
90+
doctest = false
91+
test = false

src/binaries/opensharing/main.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use poem::{EndpointExt, Route, Server};
16+
use common_base::base::tokio;
17+
use poem::listener::TcpListener;
18+
use sharing_endpoint::handlers::presign_files;
19+
use sharing_endpoint::middlewares::SharingAuth;
20+
use sharing_endpoint::configs::Config;
21+
use sharing_endpoint::services::SharingServices;
22+
#[tokio::main]
23+
async fn main() ->Result<(), std::io::Error> {
24+
let config = Config::load().expect("cfgs");
25+
println!("config: {:?}", config);
26+
SharingServices::init(config).await.expect("failed to init sharing service");
27+
let app = Route::new().at("/tenant/:tenant_id/:share_name/table/:table_name/presign", poem::post(presign_files)).with(SharingAuth);
28+
29+
// TODO(zhihanz): remove the hard coded port into a config
30+
Server::new(TcpListener::bind("127.0.0.1:33003"))
31+
.run(app)
32+
.await
33+
}

src/query/config/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,8 @@ pub use inner::CatalogHiveConfig;
3232
pub use inner::Config;
3333
pub use inner::QueryConfig;
3434
pub use inner::ThriftProtocol;
35+
36+
pub use outer_v0::StorageConfig;
37+
3538
pub use version::DATABEND_COMMIT_VERSION;
3639
pub use version::QUERY_SEMVER;

src/query/sharing-endpoint/Cargo.toml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
[package]
2+
name = "sharing-endpoint"
3+
version = { workspace = true }
4+
authors = { workspace = true }
5+
license = { workspace = true }
6+
publish = { workspace = true }
7+
edition = { workspace = true }
8+
9+
[lib]
10+
doctest = false
11+
12+
[dependencies]
13+
common-exception = { path = "../../common/exception" }
14+
common-base = { path = "../../common/base" }
15+
common-storage = { path = "../../common/storage" }
16+
common-config = { path = "../../query/config" }
17+
uuid = { version = "1.1.2", features = ["serde", "v4"] }
18+
once_cell = "1.15.0"
19+
20+
time = { version = "0.3", features = ["serde"] }
21+
22+
anyhow = { workspace = true }
23+
serde = { workspace = true }
24+
serde_json = { workspace = true }
25+
thiserror = { version = "1" }
26+
opendal = "0.22.2"
27+
poem = { version = "1", features = ["rustls", "multipart", "compression"] }
28+
clap = { workspace = true }
29+
serfig = "0.0.3"
30+
base64 = "0.13.0"
31+
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use std::sync::Arc;
2+
use time::Duration;
3+
use opendal::Operator;
4+
use crate::configs::Config;
5+
use once_cell::sync::OnceCell;
6+
use common_base::base::Singleton;
7+
use common_exception::Result;
8+
use common_storage::{init_operator, StorageParams};
9+
use crate::models;
10+
use crate::models::{PresignFileResponse, SharedTableResponse};
11+
12+
static SHARING_ACCESSOR: OnceCell<Singleton<Arc<SharingAccessor>>> = OnceCell::new();
13+
14+
#[derive(Clone)]
15+
pub struct SharingAccessor {
16+
op : Arc<Operator>,
17+
config: Config,
18+
}
19+
20+
// file would have two kind of path:
21+
// 1. with root. e.g. /root1/root2/root3/db1/tb1/file1
22+
// 2. without root e.g. db1/tb1/file1
23+
// after it would be converted to file1
24+
// and then it would use the location in table spec to form the final path
25+
// {localtion}/file1
26+
pub fn truncate_root(root: String, loc: String) -> String {
27+
let root = root.trim_matches('/');
28+
let loc = loc.trim_matches('/');
29+
return if loc.starts_with(root) {
30+
let o1 = loc.strip_prefix(root).unwrap();
31+
32+
let updated = o1.trim_matches('/');
33+
let arr = updated.split('/').collect::<Vec<&str>>();
34+
if arr.len() > 2 {
35+
return arr[2..].join("/");
36+
}
37+
updated.to_string()
38+
} else {
39+
let arr = loc.split('/').collect::<Vec<&str>>();
40+
if arr.len() > 2 {
41+
return arr[2..].join("/");
42+
}
43+
loc.to_string()
44+
}
45+
}
46+
47+
impl SharingAccessor {
48+
pub async fn init(cfg: &Config, v: Singleton<Arc<SharingAccessor>>) -> Result<()> {
49+
let op = init_operator(&cfg.storage.params)?;
50+
v.init(Arc::new(SharingAccessor {
51+
op: Arc::new(op),
52+
config: cfg.clone(),
53+
}))?;
54+
55+
SHARING_ACCESSOR.set(v).ok();
56+
Ok(())
57+
}
58+
pub fn instance() -> Arc<SharingAccessor> {
59+
match SHARING_ACCESSOR.get() {
60+
None => panic!("Sharing Accessor is not init"),
61+
Some(sharing_accessor) => sharing_accessor.get(),
62+
}
63+
}
64+
65+
fn get_root(&self) -> String {
66+
match self.config.storage.params {
67+
StorageParams::S3(ref s3) => s3.root.trim_matches('/').to_string(),
68+
_ => "".to_string(),
69+
}
70+
}
71+
fn get_path(&self) -> String {
72+
format!("{}/{}", self.get_root(), self.config.tenant)
73+
}
74+
75+
fn get_share_location(&self) -> String {
76+
format!("{}/_share_config/share_specs.json", self.config.tenant)
77+
}
78+
79+
pub async fn get_shared_table(&self, input: &models::LambdaInput) -> Result<Option<SharedTableResponse>> {
80+
let sharing_accessor = Self::instance();
81+
let path = sharing_accessor.get_share_location();
82+
println!("path: {}", path);
83+
// let path = "t1/_share_config/share_specs.json";
84+
let data = sharing_accessor.op.object(&*path).read().await?;
85+
let share_specs: models::SharingConfig = serde_json::from_slice(data.as_slice())?;
86+
return share_specs.get_tables(input);
87+
}
88+
89+
pub async fn presign_file(&self, table: &SharedTableResponse, input: &models::RequestFile) -> Result<PresignFileResponse> {
90+
let loc_prefix = table.location.trim_matches('/');
91+
println!("loc_prefix: {}", loc_prefix.clone());
92+
println!("input: {}", input.file_name);
93+
let file_path = truncate_root(self.get_root(), input.file_name.clone());
94+
let loc_prefix = loc_prefix.strip_prefix(self.get_root().as_str()).unwrap();
95+
let obj_path = format!("{}/{}", loc_prefix, file_path);
96+
let op = self.op.clone();
97+
if input.method == "HEAD" {
98+
let s = op.object(obj_path.as_str()).presign_stat(Duration::hours(1))?;
99+
return Ok( PresignFileResponse::new(&s, input.file_name.clone()));
100+
}
101+
102+
let s = op.object(obj_path.as_str()).presign_read(Duration::hours(1))?;
103+
return Ok( PresignFileResponse::new(&s, input.file_name.clone()));
104+
}
105+
106+
pub async fn get_presigned_files(input: &models::LambdaInput) -> Result<Vec<PresignFileResponse>> {
107+
108+
let accessor = Self::instance();
109+
let table = accessor.get_shared_table(input).await?;
110+
return match table {
111+
Some(t) => {
112+
let mut presigned_files = vec![];
113+
for f in input.request_files.iter() {
114+
let presigned_file = accessor.presign_file(&t, f).await?;
115+
presigned_files.push(presigned_file);
116+
}
117+
Ok(presigned_files)
118+
}
119+
None => {
120+
Ok(vec![])
121+
}
122+
}
123+
}
124+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use serde::Serialize;
16+
use serde::Deserialize;
17+
use common_storage::StorageConfig;
18+
use common_exception::Result;
19+
use super::outer_v0::Config as OuterV0Config;
20+
21+
22+
#[derive(Clone, Debug, PartialEq, Eq)]
23+
pub struct Config {
24+
pub tenant : String,
25+
pub storage: StorageConfig,
26+
}
27+
28+
impl Config {
29+
30+
/// As requires by [RFC: Config Backward Compatibility](https://github.com/datafuselabs/databend/pull/5324), we will load user's config via wrapper [`ConfigV0`] and then convert from [`ConfigV0`] to [`Config`].
31+
///
32+
/// In the future, we could have `ConfigV1` and `ConfigV2`.
33+
pub fn load() -> Result<Self> {
34+
let cfg: Self = OuterV0Config::load(true)?.try_into()?;
35+
Ok(cfg)
36+
}
37+
38+
/// # NOTE
39+
///
40+
/// This function is served for tests only.
41+
pub fn load_for_test() -> Result<Self> {
42+
let cfg: Self = OuterV0Config::load(false)?.try_into()?;
43+
Ok(cfg)
44+
}
45+
46+
47+
/// Transform config into the outer style.
48+
///
49+
/// This function should only be used for end-users.
50+
pub fn into_outer(self) -> OuterV0Config {
51+
OuterV0Config::from(self)
52+
}
53+
}
54+
55+
56+
impl Default for Config {
57+
fn default() -> Self {
58+
Self {
59+
tenant: "".to_string(),
60+
storage: StorageConfig::default(),
61+
}
62+
}
63+
}
64+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright 2021 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod inner;
16+
mod outer_v0;
17+
18+
pub use inner::Config;

0 commit comments

Comments
 (0)