diff --git a/Cargo.toml b/Cargo.toml index 43d035200..f865371d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,3 +120,4 @@ uuid = { version = "1.16", features = ["v7"] } volo = "0.10.6" volo-thrift = "0.10.6" zstd = "0.13.2" +hdfs-native = { version = "0.10" } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 9f5d49012..95ffd1246 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,8 +29,8 @@ license = { workspace = true } repository = { workspace = true } [features] -default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] -storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] +default = ["storage-memory", "storage-fs", "storage-s3","storage-hdfs-native", "tokio"] +storage-all = ["storage-memory", "storage-fs", "storage-s3","storage-hdfs-native", "storage-gcs"] storage-azdls = ["opendal/services-azdls"] storage-fs = ["opendal/services-fs"] @@ -38,7 +38,7 @@ storage-gcs = ["opendal/services-gcs"] storage-memory = ["opendal/services-memory"] storage-oss = ["opendal/services-oss"] storage-s3 = ["opendal/services-s3"] - +storage-hdfs-native = ["opendal/services-hdfs-native"] async-std = ["dep:async-std"] tokio = ["tokio/rt-multi-thread"] diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 389397eca..1d22ec17d 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -43,6 +43,8 @@ use crate::{Error, ErrorKind, Result}; /// | GCS | `storage-gcs` | `gs`, `gcs` | `gs:///path/to/file` | /// | OSS | `storage-oss` | `oss` | `oss:///path/to/file` | /// | Azure Datalake | `storage-azdls` | `abfs`, `abfss`, `wasb`, `wasbs` | `abfs://@.dfs.core.windows.net/path/to/file` or `wasb://@.blob.core.windows.net/path/to/file` | +/// | Hdfs | `storage-hdfs-native` | `hdfs`| + #[derive(Clone, Debug)] pub struct FileIO { builder: FileIOBuilder, @@ -533,6 +535,9 @@ mod tests { let io = FileIO::from_path("s3://bucket/a").unwrap(); assert_eq!("s3", io.scheme_str.unwrap().as_str()); + let io = FileIO::from_path("hdfs://tmp/a").unwrap(); + assert_eq!("hdfs", io.scheme_str.unwrap().as_str()); + let io = FileIO::from_path("tmp/||c"); assert!(io.is_err()); } diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 5eb596434..52974a297 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -98,6 +98,11 @@ pub use storage_oss::*; #[cfg(feature = "storage-s3")] pub use storage_s3::*; +#[cfg(feature = "storage-hdfs-native")] +mod storage_hdfs_native; +#[cfg(feature = "storage-hdfs-native")] +pub use storage_hdfs_native::*; + pub(crate) fn is_truthy(value: &str) -> bool { ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str()) } diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index a847977e5..8b7b207de 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -22,6 +22,8 @@ use opendal::layers::RetryLayer; use opendal::services::AzdlsConfig; #[cfg(feature = "storage-gcs")] use opendal::services::GcsConfig; +#[cfg(feature = "storage-hdfs-native")] +use opendal::services::HdfsNativeConfig; #[cfg(feature = "storage-oss")] use opendal::services::OssConfig; #[cfg(feature = "storage-s3")] @@ -62,6 +64,8 @@ pub(crate) enum Storage { configured_scheme: AzureStorageScheme, config: Arc, }, + #[cfg(feature = "storage-hdfs-native")] + HdfsNative { config: Arc }, } impl Storage { @@ -96,6 +100,10 @@ impl Storage { configured_scheme: scheme, }) } + #[cfg(feature = "storage-hdfs-native")] + Scheme::HdfsNative => Ok(Self::HdfsNative { + config: super::hdfs_native_config_parse(props)?.into(), + }), // Update doc on [`FileIO`] when adding new schemes. _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -192,12 +200,28 @@ impl Storage { configured_scheme, config, } => super::azdls_create_operator(path, config, configured_scheme), + #[cfg(feature = "storage-hdfs-native")] + Storage::HdfsNative { config } => { + let op = super::hdfs_native_config_build(config)?; + + // Check prefix of hdfs path. + let prefix = config.name_node.clone().unwrap_or_default(); + if path.starts_with(&prefix) { + Ok((op, &path[prefix.len()..])) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid hdfs url: {}, should start with {}", path, prefix), + )) + } + } #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), not(feature = "storage-gcs"), not(feature = "storage-oss"), not(feature = "storage-azdls"), + not(feature = "storage-hdfs-native") ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -221,6 +245,7 @@ impl Storage { "gs" | "gcs" => Ok(Scheme::Gcs), "oss" => Ok(Scheme::Oss), "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls), + "hdfs" => Ok(Scheme::HdfsNative), s => Ok(s.parse::()?), } } diff --git a/crates/iceberg/src/io/storage_hdfs_native.rs b/crates/iceberg/src/io/storage_hdfs_native.rs new file mode 100644 index 000000000..e0e4b0f2d --- /dev/null +++ b/crates/iceberg/src/io/storage_hdfs_native.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use opendal::services::HdfsNativeConfig; +use opendal::{Configurator, Operator}; + +use crate::Result; + +/// The configuration key for the default filesystem in core-site.xml. +/// This key is typically used to specify the HDFS namenode address. +pub const FS_DEFAULTFS: &str = "fs.defaultFS"; + +pub(crate) fn hdfs_native_config_parse(mut m: HashMap) -> Result { + let mut cfg = HdfsNativeConfig::default(); + cfg.root = Some("/".to_string()); + + if let Some(default_fs) = m.remove(FS_DEFAULTFS) { + cfg.name_node = Some(default_fs); + } + + Ok(cfg) +} + +/// Build new opendal operator from give path. +pub(crate) fn hdfs_native_config_build(cfg: &HdfsNativeConfig) -> Result { + let builder = cfg.clone().into_builder(); + + Ok(Operator::new(builder)?.finish()) +} diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index 8396888c4..c849fc3fc 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -66,6 +66,14 @@ pub const S3_DISABLE_EC2_METADATA: &str = "s3.disable-ec2-metadata"; /// Option to skip loading configuration from config file and the env. pub const S3_DISABLE_CONFIG_LOAD: &str = "s3.disable-config-load"; + +/// Property aws region +pub const AWS_REGION_NAME: &str = "region_name"; +/// Property aws access key +pub const AWS_ACCESS_KEY_ID: &str = "aws_access_key_id"; +/// Property aws secret access key +pub const AWS_SECRET_ACCESS_KEY: &str = "aws_secret_access_key"; + /// Parse iceberg props to s3 config. pub(crate) fn s3_config_parse(mut m: HashMap) -> Result { let mut cfg = S3Config::default(); @@ -75,15 +83,24 @@ pub(crate) fn s3_config_parse(mut m: HashMap) -> Result) -> Result