Skip to content

Add hdfs fs io for hive #1450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e2b7a6d
feat:Add hadoop catalog mode
awol2005ex Apr 30, 2025
6a08fd3
feat:Add hadoop catalog mode(load s3 table meta)
awol2005ex May 6, 2025
891b07d
feat:Add hadoop catalog mode(s3 list_namepaces)
awol2005ex May 8, 2025
c38a229
feat:Add hadoop catalog mode(s3 list_namepaces)
awol2005ex May 8, 2025
2ee8079
feat:Add hadoop catalog mode(s3 create_namepace)
awol2005ex May 8, 2025
8942623
feat:Add hadoop catalog mode(s3 get_namepace)
awol2005ex May 8, 2025
4e91545
feat:Add hadoop catalog mode(s3 namespace_exists)
awol2005ex May 8, 2025
4f9cb2a
feat:Add hadoop catalog mode(s3 drop_namepace)
awol2005ex May 8, 2025
a5b0bad
feat:Add hadoop catalog mode(s3 list_tables)
awol2005ex May 8, 2025
45ab097
feat:Add hadoop catalog mode(s3 create table ,drop table)
awol2005ex May 9, 2025
e633e9a
feat:Add hadoop catalog mode(s3 table_exists)
awol2005ex May 9, 2025
bf4fd96
Merge remote-tracking branch 'origin/main' into awol2005ex-hadoop-cat…
awol2005ex May 9, 2025
5c1309c
feat:Add hadoop catalog mode(add hdfs-native io)
awol2005ex May 9, 2025
660ec6e
feat:Add hadoop catalog mode(hdfs_native list_namespaces)
awol2005ex May 9, 2025
813947f
feat:Add hadoop catalog mode(hdfs_native create_namespace)
awol2005ex May 9, 2025
45e1bb7
feat:Add hadoop catalog mode(hdfs_native get_namespace)
awol2005ex May 9, 2025
1d4cda4
feat:Add hadoop catalog mode(hdfs_native namespace_exists)
awol2005ex May 9, 2025
b13a01d
feat:Add hadoop catalog mode(hdfs_native drop_namespace)
awol2005ex May 9, 2025
899228c
feat:Add hadoop catalog mode(hdfs_native list_tables)
awol2005ex May 9, 2025
20540b9
feat:Add hadoop catalog mode(hdfs_native create_table)
awol2005ex May 9, 2025
794216f
feat:Add hadoop catalog mode(hdfs_native load_table)
awol2005ex May 9, 2025
f368bc4
feat:Add hadoop catalog mode(hdfs_native drop_table)
awol2005ex May 9, 2025
29b4f3c
feat:Add hadoop catalog mode(hdfs_native table_exists)
awol2005ex May 9, 2025
6ee7136
feat:Add hadoop catalog mode(hdfs_native create table)
awol2005ex May 12, 2025
a9ea6ea
feat:Add hadoop catalog mode(hdfs_native load table)
awol2005ex May 12, 2025
f71095e
Merge remote-tracking branch 'origin/main' into awol2005ex-hadoop-cat…
awol2005ex May 12, 2025
274426c
Merge remote-tracking branch 'github/main' into awol2005ex-hadoop-cat…
awol2005ex Jun 18, 2025
44b16ba
merge master
awol2005ex Jun 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,4 @@ uuid = { version = "1.16", features = ["v7"] }
volo = "0.10.6"
volo-thrift = "0.10.6"
zstd = "0.13.2"
hdfs-native = { version = "0.10" }
6 changes: 3 additions & 3 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ license = { workspace = true }
repository = { workspace = true }

[features]
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"]
default = ["storage-memory", "storage-fs", "storage-s3","storage-hdfs-native", "tokio"]
storage-all = ["storage-memory", "storage-fs", "storage-s3","storage-hdfs-native", "storage-gcs"]

storage-azdls = ["opendal/services-azdls"]
storage-fs = ["opendal/services-fs"]
storage-gcs = ["opendal/services-gcs"]
storage-memory = ["opendal/services-memory"]
storage-oss = ["opendal/services-oss"]
storage-s3 = ["opendal/services-s3"]

storage-hdfs-native = ["opendal/services-hdfs-native"]
async-std = ["dep:async-std"]
tokio = ["tokio/rt-multi-thread"]

Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use crate::{Error, ErrorKind, Result};
/// | GCS | `storage-gcs` | `gs`, `gcs` | `gs://<bucket>/path/to/file` |
/// | OSS | `storage-oss` | `oss` | `oss://<bucket>/path/to/file` |
/// | Azure Datalake | `storage-azdls` | `abfs`, `abfss`, `wasb`, `wasbs` | `abfs://<filesystem>@<account>.dfs.core.windows.net/path/to/file` or `wasb://<container>@<account>.blob.core.windows.net/path/to/file` |
/// | Hdfs | `storage-hdfs-native` | `hdfs`|

#[derive(Clone, Debug)]
pub struct FileIO {
builder: FileIOBuilder,
Expand Down Expand Up @@ -533,6 +535,9 @@ mod tests {
let io = FileIO::from_path("s3://bucket/a").unwrap();
assert_eq!("s3", io.scheme_str.unwrap().as_str());

let io = FileIO::from_path("hdfs://tmp/a").unwrap();
assert_eq!("hdfs", io.scheme_str.unwrap().as_str());

let io = FileIO::from_path("tmp/||c");
assert!(io.is_err());
}
Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ pub use storage_oss::*;
#[cfg(feature = "storage-s3")]
pub use storage_s3::*;

#[cfg(feature = "storage-hdfs-native")]
mod storage_hdfs_native;
#[cfg(feature = "storage-hdfs-native")]
pub use storage_hdfs_native::*;

pub(crate) fn is_truthy(value: &str) -> bool {
["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
}
25 changes: 25 additions & 0 deletions crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use opendal::layers::RetryLayer;
use opendal::services::AzdlsConfig;
#[cfg(feature = "storage-gcs")]
use opendal::services::GcsConfig;
#[cfg(feature = "storage-hdfs-native")]
use opendal::services::HdfsNativeConfig;
#[cfg(feature = "storage-oss")]
use opendal::services::OssConfig;
#[cfg(feature = "storage-s3")]
Expand Down Expand Up @@ -62,6 +64,8 @@ pub(crate) enum Storage {
configured_scheme: AzureStorageScheme,
config: Arc<AzdlsConfig>,
},
#[cfg(feature = "storage-hdfs-native")]
HdfsNative { config: Arc<HdfsNativeConfig> },
}

impl Storage {
Expand Down Expand Up @@ -96,6 +100,10 @@ impl Storage {
configured_scheme: scheme,
})
}
#[cfg(feature = "storage-hdfs-native")]
Scheme::HdfsNative => Ok(Self::HdfsNative {
config: super::hdfs_native_config_parse(props)?.into(),
}),
// Update doc on [`FileIO`] when adding new schemes.
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
Expand Down Expand Up @@ -192,12 +200,28 @@ impl Storage {
configured_scheme,
config,
} => super::azdls_create_operator(path, config, configured_scheme),
#[cfg(feature = "storage-hdfs-native")]
Storage::HdfsNative { config } => {
let op = super::hdfs_native_config_build(config)?;

// Check prefix of hdfs path.
let prefix = config.name_node.clone().unwrap_or_default();
if path.starts_with(&prefix) {
Ok((op, &path[prefix.len()..]))
} else {
Err(Error::new(
ErrorKind::DataInvalid,
format!("Invalid hdfs url: {}, should start with {}", path, prefix),
))
}
}
#[cfg(all(
not(feature = "storage-s3"),
not(feature = "storage-fs"),
not(feature = "storage-gcs"),
not(feature = "storage-oss"),
not(feature = "storage-azdls"),
not(feature = "storage-hdfs-native")
))]
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
Expand All @@ -221,6 +245,7 @@ impl Storage {
"gs" | "gcs" => Ok(Scheme::Gcs),
"oss" => Ok(Scheme::Oss),
"abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls),
"hdfs" => Ok(Scheme::HdfsNative),
s => Ok(s.parse::<Scheme>()?),
}
}
Expand Down
45 changes: 45 additions & 0 deletions crates/iceberg/src/io/storage_hdfs_native.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use opendal::services::HdfsNativeConfig;
use opendal::{Configurator, Operator};

use crate::Result;

/// The configuration key for the default filesystem in core-site.xml.
/// This key is typically used to specify the HDFS namenode address.
pub const FS_DEFAULTFS: &str = "fs.defaultFS";

pub(crate) fn hdfs_native_config_parse(mut m: HashMap<String, String>) -> Result<HdfsNativeConfig> {
let mut cfg = HdfsNativeConfig::default();
cfg.root = Some("/".to_string());

if let Some(default_fs) = m.remove(FS_DEFAULTFS) {
cfg.name_node = Some(default_fs);
}

Ok(cfg)
}

/// Build new opendal operator from give path.
pub(crate) fn hdfs_native_config_build(cfg: &HdfsNativeConfig) -> Result<Operator> {
let builder = cfg.clone().into_builder();

Ok(Operator::new(builder)?.finish())
}
19 changes: 19 additions & 0 deletions crates/iceberg/src/io/storage_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ pub const S3_DISABLE_EC2_METADATA: &str = "s3.disable-ec2-metadata";
/// Option to skip loading configuration from config file and the env.
pub const S3_DISABLE_CONFIG_LOAD: &str = "s3.disable-config-load";


/// Property aws region
pub const AWS_REGION_NAME: &str = "region_name";
/// Property aws access key
pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id";
/// Property aws secret access key
pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key";

/// Parse iceberg props to s3 config.
pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
let mut cfg = S3Config::default();
Expand All @@ -75,15 +83,24 @@ pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config
if let Some(access_key_id) = m.remove(S3_ACCESS_KEY_ID) {
cfg.access_key_id = Some(access_key_id);
};
if let Some(access_key_id) = m.remove(AWS_ACCESS_KEY_ID) {
cfg.access_key_id = Some(access_key_id);
};
if let Some(secret_access_key) = m.remove(S3_SECRET_ACCESS_KEY) {
cfg.secret_access_key = Some(secret_access_key);
};
if let Some(secret_access_key) = m.remove(AWS_SECRET_ACCESS_KEY) {
cfg.secret_access_key = Some(secret_access_key);
};
if let Some(session_token) = m.remove(S3_SESSION_TOKEN) {
cfg.session_token = Some(session_token);
};
if let Some(region) = m.remove(S3_REGION) {
cfg.region = Some(region);
};
if let Some(region) = m.remove(AWS_REGION_NAME) {
cfg.region = Some(region);
};
if let Some(region) = m.remove(CLIENT_REGION) {
cfg.region = Some(region);
};
Expand Down Expand Up @@ -147,6 +164,8 @@ pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config
}
};



Ok(cfg)
}

Expand Down
Loading