Skip to content

Commit 9c210b9

Browse files
authored
Merge pull request #45 from risingwavelabs/wcy/azblob_storage
feat: support azblob storage
2 parents 14c3387 + 79ffb1e commit 9c210b9

File tree

6 files changed

+96
-2
lines changed

6 files changed

+96
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ keywords = ["iceberg"]
3030

3131
[features]
3232
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
33-
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"]
33+
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs", "storage-azblob"]
3434

3535
storage-memory = ["opendal/services-memory"]
3636
storage-fs = ["opendal/services-fs"]
3737
storage-s3 = ["opendal/services-s3"]
3838
storage-gcs = ["opendal/services-gcs"]
39+
storage-azblob = ["opendal/services-azblob"]
3940

4041
async-std = ["dep:async-std"]
4142
tokio = ["dep:tokio"]

crates/iceberg/src/io/file_io.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use crate::{Error, ErrorKind, Result};
4141
/// | Memory | `storage-memory` | `memory` |
4242
/// | S3 | `storage-s3` | `s3`, `s3a`|
4343
/// | GCS | `storage-gcs` | `gs`, `gcs`|
44+
/// | AZBLOB | `storage-azblob` | `azblob` |
4445
#[derive(Clone, Debug)]
4546
pub struct FileIO {
4647
builder: FileIOBuilder,

crates/iceberg/src/io/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ mod storage_gcs;
8989
#[cfg(feature = "storage-gcs")]
9090
pub use storage_gcs::*;
9191

92+
#[cfg(feature = "storage-azblob")]
93+
mod storage_azblob;
94+
#[cfg(feature = "storage-azblob")]
95+
pub use storage_azblob::*;
9296
pub(crate) fn is_truthy(value: &str) -> bool {
9397
["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
9498
}

crates/iceberg/src/io/storage.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use std::sync::Arc;
1919

2020
use opendal::layers::RetryLayer;
21+
#[cfg(feature = "storage-azblob")]
22+
use opendal::services::AzblobConfig;
2123
#[cfg(feature = "storage-gcs")]
2224
use opendal::services::GcsConfig;
2325
#[cfg(feature = "storage-s3")]
@@ -47,6 +49,9 @@ pub(crate) enum Storage {
4749
},
4850
#[cfg(feature = "storage-gcs")]
4951
Gcs { config: Arc<GcsConfig> },
52+
53+
#[cfg(feature = "storage-azblob")]
54+
Azblob { config: Arc<AzblobConfig> },
5055
}
5156

5257
impl Storage {
@@ -70,6 +75,10 @@ impl Storage {
7075
Scheme::Gcs => Ok(Self::Gcs {
7176
config: super::gcs_config_parse(props)?.into(),
7277
}),
78+
#[cfg(feature = "storage-azblob")]
79+
Scheme::Azblob => Ok(Self::Azblob {
80+
config: super::azblob_config_parse(props)?.into(),
81+
}),
7382
// Update doc on [`FileIO`] when adding new schemes.
7483
_ => Err(Error::new(
7584
ErrorKind::FeatureUnsupported,
@@ -147,10 +156,24 @@ impl Storage {
147156
))
148157
}
149158
}
159+
#[cfg(feature = "storage-azblob")]
160+
Storage::Azblob { config } => {
161+
let operator = super::azblob_config_build(config, path)?;
162+
let prefix = format!("azblob://{}/", operator.info().name());
163+
if path.starts_with(&prefix) {
164+
Ok((operator, &path[prefix.len()..]))
165+
} else {
166+
Err(Error::new(
167+
ErrorKind::DataInvalid,
168+
format!("Invalid azblob url: {}, should start with {}", path, prefix),
169+
))
170+
}
171+
}
150172
#[cfg(all(
151173
not(feature = "storage-s3"),
152174
not(feature = "storage-fs"),
153-
not(feature = "storage-gcs")
175+
not(feature = "storage-gcs"),
176+
not(feature = "storage-azblob")
154177
))]
155178
_ => Err(Error::new(
156179
ErrorKind::FeatureUnsupported,
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
//! Google Cloud Storage properties
18+
19+
use std::collections::HashMap;
20+
21+
use opendal::services::AzblobConfig;
22+
use opendal::Operator;
23+
use url::Url;
24+
25+
use crate::{Error, ErrorKind, Result};
26+
27+
/// Azure blob account name.
28+
pub const AZBLOB_ACCOUNT_NAME: &str = "azblob.account-name";
29+
/// Azure blob account key.
30+
pub const AZBLOB_ACCOUNT_KEY: &str = "azblob.account-key";
31+
/// Azure blob account endpoint.
32+
pub const AZBLOB_ENDPOINT: &str = "azblob.endpoint";
33+
34+
/// Parse iceberg properties to [`AzblobConfig`].
35+
pub(crate) fn azblob_config_parse(mut m: HashMap<String, String>) -> Result<AzblobConfig> {
36+
let mut cfg = AzblobConfig::default();
37+
38+
if let Some(account_name) = m.remove(AZBLOB_ACCOUNT_NAME) {
39+
cfg.account_name = Some(account_name);
40+
};
41+
if let Some(account_key) = m.remove(AZBLOB_ACCOUNT_KEY) {
42+
cfg.account_key = Some(account_key);
43+
};
44+
if let Some(endpoint) = m.remove(AZBLOB_ENDPOINT) {
45+
cfg.endpoint = Some(endpoint);
46+
};
47+
48+
Ok(cfg)
49+
}
50+
51+
/// Build a new OpenDAL [`Operator`] based on a provided [`AzblobConfig`].
52+
pub(crate) fn azblob_config_build(cfg: &AzblobConfig, path: &str) -> Result<Operator> {
53+
let url = Url::parse(path)?;
54+
let container = url.host_str().ok_or_else(|| {
55+
Error::new(
56+
ErrorKind::DataInvalid,
57+
format!("Invalid azblob url: {}, container is required", path),
58+
)
59+
})?;
60+
61+
let mut cfg = cfg.clone();
62+
cfg.container = container.to_string();
63+
Ok(Operator::from_config(cfg)?.finish())
64+
}

0 commit comments

Comments
 (0)