Skip to content

Commit b5b8aa8

Browse files
authored
feat(storage-azdls): Add Azure Datalake Storage support (#1368)
## Which issue does this PR close? - Closes #1360. ## What changes are included in this PR? This PR adds an integration for the Azure Datalake storage service. At it's core, it adds parsing logic for configuration properties. The finished config struct is simply passed down to OpenDAL. In addition it adds logic to parse fully qualified file URIs, and matches it against expected (previously configured) values. It also creates a new `Storage::Azdls` enum variant based on OpenDAL's existing `Scheme::Azdls` enum variant. It then fits the parsing logic into the existing framework to build the storage integration from an `io::FileIOBuilder`. ### Note on WASB support Other Iceberg ADLS integrations ([pyiceberg + Java](https://github.com/apache/iceberg-go/pull/313/files#r2021460617)) also support the `wasb://` and `wasbs://` schemes. WASB refers to a client-side implementation of hierarchical namespaces on top of Blob Storage. ADLS(v2) on the other hand is a service offered by Azure, also built on top of Blob Storage. IIUC we can accept both schemes because objects written to Blob Storage via `wasb://` will also be accessible via `adfs://` (which operates on the same Blob Storage). Even though the URIs slightly differ in format when they refer to the same object, we can largely reuse existing logic. ```diff -wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path> +adfs[s]://<filesystemname>@<accountname>.dfs.core.windows.net/<path> ``` ## Are these changes tested? ### Unit I added minor unit tests to validate the configuration property parsing logic. ### Integration I decided **not** to add integration tests because 1. ADLS is not S3-compatible which means that we can't reuse our Minio setup 2. the Azure-specific alternative to local testing - Azurite - doesn't support ADLS ### End-to-end I have yet to test it in a functioning environment. --------- Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
1 parent 43c9fe2 commit b5b8aa8

File tree

8 files changed

+636
-63
lines changed

8 files changed

+636
-63
lines changed

Cargo.lock

Lines changed: 8 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ motore-macros = "0.4.3"
8787
murmur3 = "0.5.2"
8888
num-bigint = "0.4.6"
8989
once_cell = "1.20"
90-
opendal = "0.53.0"
90+
opendal = "0.53.3"
9191
ordered-float = "4"
9292
parquet = "55"
9393
pilota = "0.11.2"

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ repository = { workspace = true }
3232
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
3333
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"]
3434

35+
storage-azdls = ["opendal/services-azdls"]
3536
storage-fs = ["opendal/services-fs"]
3637
storage-gcs = ["opendal/services-gcs"]
3738
storage-memory = ["opendal/services-memory"]

crates/iceberg/README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,18 @@ async fn main() -> Result<()> {
6262

6363
Iceberg Rust provides various storage backends through feature flags. Here are the currently supported storage backends:
6464

65-
| Storage Backend | Feature Flag | Status | Description |
66-
|----------------|--------------|--------|-------------|
67-
| Memory | `storage-memory` | ✅ Stable | In-memory storage for testing and development |
68-
| Local Filesystem | `storage-fs` | ✅ Stable | Local filesystem storage |
69-
| Amazon S3 | `storage-s3` | ✅ Stable | Amazon S3 storage |
70-
| Google Cloud Storage | `storage-gcs` | ✅ Stable | Google Cloud Storage |
71-
| Alibaba Cloud OSS | `storage-oss` | 🧪 Experimental | Alibaba Cloud Object Storage Service |
65+
| Storage Backend | Feature Flag | Status | Description |
66+
| -------------------- | ---------------- | -------------- | --------------------------------------------- |
67+
| Memory | `storage-memory` | ✅ Stable | In-memory storage for testing and development |
68+
| Local Filesystem | `storage-fs` | ✅ Stable | Local filesystem storage |
69+
| Amazon S3 | `storage-s3` | ✅ Stable | Amazon S3 storage |
70+
| Google Cloud Storage | `storage-gcs` | ✅ Stable | Google Cloud Storage |
71+
| Alibaba Cloud OSS | `storage-oss` | 🧪 Experimental | Alibaba Cloud Object Storage Service |
72+
| Azure Datalake | `storage-azdls` | 🧪 Experimental | Azure Datalake Storage v2 |
7273

7374
You can enable all stable storage backends at once using the `storage-all` feature flag.
7475

75-
> Note that `storage-oss` is currently experimental and not included in `storage-all`.
76+
> Note that `storage-oss` and `storage-azdls` are currently experimental and not included in `storage-all`.
7677
7778
Example usage in `Cargo.toml`:
7879

crates/iceberg/src/io/file_io.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ use crate::{Error, ErrorKind, Result};
3535
///
3636
/// Supported storages:
3737
///
38-
/// | Storage | Feature Flag | Schemes |
39-
/// |--------------------|-------------------|------------|
40-
/// | Local file system | `storage-fs` | `file` |
41-
/// | Memory | `storage-memory` | `memory` |
42-
/// | S3 | `storage-s3` | `s3`, `s3a`|
43-
/// | GCS | `storage-gcs` | `gs`, `gcs`|
38+
/// | Storage | Feature Flag | Expected Path Format | Schemes |
39+
/// |--------------------|-------------------|----------------------------------| ------------------------------|
40+
/// | Local file system | `storage-fs` | `file` | `file://path/to/file` |
41+
/// | Memory | `storage-memory` | `memory` | `memory://path/to/file` |
42+
/// | S3 | `storage-s3` | `s3`, `s3a` | `s3://<bucket>/path/to/file` |
43+
/// | GCS | `storage-gcs` | `gs`, `gcs` | `gs://<bucket>/path/to/file` |
44+
/// | OSS | `storage-oss` | `oss` | `oss://<bucket>/path/to/file` |
45+
/// | Azure Datalake | `storage-azdls` | `abfs`, `abfss`, `wasb`, `wasbs` | `abfs://<filesystem>@<account>.dfs.core.windows.net/path/to/file` or `wasb://<container>@<account>.blob.core.windows.net/path/to/file` |
4446
#[derive(Clone, Debug)]
4547
pub struct FileIO {
4648
builder: FileIOBuilder,

crates/iceberg/src/io/mod.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,32 +67,36 @@
6767
//! - `new_output`: Create output file for writing.
6868
6969
mod file_io;
70+
mod storage;
71+
7072
pub use file_io::*;
73+
pub(crate) mod object_cache;
7174

72-
mod storage;
75+
#[cfg(feature = "storage-azdls")]
76+
mod storage_azdls;
77+
#[cfg(feature = "storage-fs")]
78+
mod storage_fs;
79+
#[cfg(feature = "storage-gcs")]
80+
mod storage_gcs;
7381
#[cfg(feature = "storage-memory")]
7482
mod storage_memory;
75-
#[cfg(feature = "storage-memory")]
76-
use storage_memory::*;
83+
#[cfg(feature = "storage-oss")]
84+
mod storage_oss;
7785
#[cfg(feature = "storage-s3")]
7886
mod storage_s3;
79-
#[cfg(feature = "storage-s3")]
80-
pub use storage_s3::*;
81-
pub(crate) mod object_cache;
82-
#[cfg(feature = "storage-fs")]
83-
mod storage_fs;
8487

88+
#[cfg(feature = "storage-azdls")]
89+
pub use storage_azdls::*;
8590
#[cfg(feature = "storage-fs")]
8691
use storage_fs::*;
8792
#[cfg(feature = "storage-gcs")]
88-
mod storage_gcs;
89-
#[cfg(feature = "storage-gcs")]
9093
pub use storage_gcs::*;
91-
92-
#[cfg(feature = "storage-oss")]
93-
mod storage_oss;
94+
#[cfg(feature = "storage-memory")]
95+
use storage_memory::*;
9496
#[cfg(feature = "storage-oss")]
9597
pub use storage_oss::*;
98+
#[cfg(feature = "storage-s3")]
99+
pub use storage_s3::*;
96100

97101
pub(crate) fn is_truthy(value: &str) -> bool {
98102
["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())

crates/iceberg/src/io/storage.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use std::sync::Arc;
1919

2020
use opendal::layers::RetryLayer;
21+
#[cfg(feature = "storage-azdls")]
22+
use opendal::services::AzdlsConfig;
2123
#[cfg(feature = "storage-gcs")]
2224
use opendal::services::GcsConfig;
2325
#[cfg(feature = "storage-oss")]
@@ -26,6 +28,8 @@ use opendal::services::OssConfig;
2628
use opendal::services::S3Config;
2729
use opendal::{Operator, Scheme};
2830

31+
#[cfg(feature = "storage-azdls")]
32+
use super::AzureStorageScheme;
2933
use super::FileIOBuilder;
3034
use crate::{Error, ErrorKind};
3135

@@ -36,17 +40,28 @@ pub(crate) enum Storage {
3640
Memory(Operator),
3741
#[cfg(feature = "storage-fs")]
3842
LocalFs,
43+
/// Expects paths of the form `s3[a]://<bucket>/<path>`.
3944
#[cfg(feature = "storage-s3")]
4045
S3 {
4146
/// s3 storage could have `s3://` and `s3a://`.
4247
/// Storing the scheme string here to return the correct path.
43-
scheme_str: String,
48+
configured_scheme: String,
4449
config: Arc<S3Config>,
4550
},
46-
#[cfg(feature = "storage-oss")]
47-
Oss { config: Arc<OssConfig> },
4851
#[cfg(feature = "storage-gcs")]
4952
Gcs { config: Arc<GcsConfig> },
53+
#[cfg(feature = "storage-oss")]
54+
Oss { config: Arc<OssConfig> },
55+
/// Expects paths of the form
56+
/// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
57+
/// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
58+
#[cfg(feature = "storage-azdls")]
59+
Azdls {
60+
/// Because Azdls accepts multiple possible schemes, we store the full
61+
/// passed scheme here to later validate schemes passed via paths.
62+
configured_scheme: AzureStorageScheme,
63+
config: Arc<AzdlsConfig>,
64+
},
5065
}
5166

5267
impl Storage {
@@ -62,7 +77,7 @@ impl Storage {
6277
Scheme::Fs => Ok(Self::LocalFs),
6378
#[cfg(feature = "storage-s3")]
6479
Scheme::S3 => Ok(Self::S3 {
65-
scheme_str,
80+
configured_scheme: scheme_str,
6681
config: super::s3_config_parse(props)?.into(),
6782
}),
6883
#[cfg(feature = "storage-gcs")]
@@ -73,6 +88,14 @@ impl Storage {
7388
Scheme::Oss => Ok(Self::Oss {
7489
config: super::oss_config_parse(props)?.into(),
7590
}),
91+
#[cfg(feature = "storage-azdls")]
92+
Scheme::Azdls => {
93+
let scheme = scheme_str.parse::<AzureStorageScheme>()?;
94+
Ok(Self::Azdls {
95+
config: super::azdls_config_parse(props)?.into(),
96+
configured_scheme: scheme,
97+
})
98+
}
7699
// Update doc on [`FileIO`] when adding new schemes.
77100
_ => Err(Error::new(
78101
ErrorKind::FeatureUnsupported,
@@ -118,12 +141,15 @@ impl Storage {
118141
}
119142
}
120143
#[cfg(feature = "storage-s3")]
121-
Storage::S3 { scheme_str, config } => {
144+
Storage::S3 {
145+
configured_scheme,
146+
config,
147+
} => {
122148
let op = super::s3_config_build(config, path)?;
123149
let op_info = op.info();
124150

125151
// Check prefix of s3 path.
126-
let prefix = format!("{}://{}/", scheme_str, op_info.name());
152+
let prefix = format!("{}://{}/", configured_scheme, op_info.name());
127153
if path.starts_with(&prefix) {
128154
Ok((op, &path[prefix.len()..]))
129155
} else {
@@ -133,7 +159,6 @@ impl Storage {
133159
))
134160
}
135161
}
136-
137162
#[cfg(feature = "storage-gcs")]
138163
Storage::Gcs { config } => {
139164
let operator = super::gcs_config_build(config, path)?;
@@ -162,11 +187,17 @@ impl Storage {
162187
))
163188
}
164189
}
190+
#[cfg(feature = "storage-azdls")]
191+
Storage::Azdls {
192+
configured_scheme,
193+
config,
194+
} => super::azdls_create_operator(path, config, configured_scheme),
165195
#[cfg(all(
166196
not(feature = "storage-s3"),
167197
not(feature = "storage-fs"),
168198
not(feature = "storage-gcs"),
169-
not(feature = "storage-oss")
199+
not(feature = "storage-oss"),
200+
not(feature = "storage-azdls"),
170201
))]
171202
_ => Err(Error::new(
172203
ErrorKind::FeatureUnsupported,
@@ -189,6 +220,7 @@ impl Storage {
189220
"s3" | "s3a" => Ok(Scheme::S3),
190221
"gs" | "gcs" => Ok(Scheme::Gcs),
191222
"oss" => Ok(Scheme::Oss),
223+
"abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
192224
s => Ok(s.parse::<Scheme>()?),
193225
}
194226
}

0 commit comments

Comments
 (0)