diff --git a/Cargo.lock b/Cargo.lock index 4411365b0..6f3974b23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3313,6 +3313,16 @@ dependencies = [ "volo-thrift", ] +[[package]] +name = "iceberg-catalog-loader" +version = "0.4.0" +dependencies = [ + "async-trait", + "iceberg", + "iceberg-catalog-rest", + "tokio", +] + [[package]] name = "iceberg-catalog-memory" version = "0.4.0" diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml new file mode 100644 index 000000000..75df84e87 --- /dev/null +++ b/crates/catalog/loader/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "iceberg-catalog-loader" +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Catalog Loader API" +keywords = ["iceberg", "catalog"] +license = { workspace = true } +repository = { workspace = true } + +[dependencies] +iceberg = { workspace = true } +iceberg-catalog-rest = {workspace = true} +tokio = { workspace = true } +async-trait = {workspace = true} diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs new file mode 100644 index 000000000..3f5bb43a8 --- /dev/null +++ b/crates/catalog/loader/src/lib.rs @@ -0,0 +1,44 @@ +use async_trait::async_trait; +use std::collections::HashMap; +use std::future::Future; +use std::sync::Arc; + +use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; +use iceberg_catalog_rest::RestCatalogBuilder; + +#[async_trait] +pub trait BoxedCatalogBuilder { + async fn load(self: Box, name: String, props: HashMap) -> Result>; +} + +#[async_trait] +impl BoxedCatalogBuilder for T { + async fn load(self: Box, name: String, props: HashMap) -> Result> { + let builder = *self; + Ok(Arc::new(builder.load(name, props).await.unwrap()) as Arc) + } +} + +pub fn load(r#type: &str) -> Result> { + match r#type { + "rest" => Ok(Box::new(RestCatalogBuilder::default()) as Box), + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported catalog type: {}", r#type), + )), + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use crate::load; + + #[tokio::test] + async fn test_load() { + let catalog = load("rest").unwrap(); + catalog.load("rest".to_string(), HashMap::from( + [("key".to_string(), "value".to_string())] + )).await.unwrap(); + } +} diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 7bf54d4d8..74b49c448 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -18,14 +18,15 @@ //! This module contains the iceberg REST catalog implementation. use std::collections::HashMap; +use std::future::Future; use std::str::FromStr; use async_trait::async_trait; use iceberg::io::FileIO; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, - TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, + TableCreation, TableIdent, }; use itertools::Itertools; use reqwest::header::{ @@ -44,6 +45,8 @@ use crate::types::{ RenameTableRequest, }; +const REST_CATALOG_PROP_URI: &str = "uri"; +const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); const PATH_V1: &str = "v1"; @@ -51,6 +54,8 @@ const PATH_V1: &str = "v1"; /// Rest catalog configuration. #[derive(Clone, Debug, TypedBuilder)] pub struct RestCatalogConfig { + #[builder(default, setter(strip_option))] + name: Option, uri: String, #[builder(default, setter(strip_option(fallback = warehouse_opt)))] @@ -223,6 +228,70 @@ impl RestCatalogConfig { } } +/// Builder for [`RestCatalog`]. +#[derive(Debug)] +pub struct RestCatalogBuilder(RestCatalogConfig); + +impl Default for RestCatalogBuilder { + fn default() -> Self { + Self(RestCatalogConfig { + name: None, + uri: "".to_string(), + warehouse: None, + props: HashMap::new(), + client: None, + }) + } +} + +impl CatalogBuilder for RestCatalogBuilder { + type C = RestCatalog; + + fn load(mut self, name: impl Into, props: HashMap) -> impl Future> { + self.0.name = Some(name.into()); + if props.contains_key(REST_CATALOG_PROP_URI) { + self.0.uri = props + .get(REST_CATALOG_PROP_URI) + .cloned() + .unwrap_or_default(); + } + + if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) { + self.0.warehouse = props + .get(REST_CATALOG_PROP_WAREHOUSE) + .cloned() + .map(|s| s.into()); + } + + let result = { + if self.0.name.is_none() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name is required", + )) + } else if self.0.uri.is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog uri is required", + )) + } else { + Ok(RestCatalog::new(self.0)) + } + }; + + std::future::ready(result) + + } +} + +impl RestCatalogBuilder { + /// Configures the catalog with a custom HTTP client. + pub fn with_client(&mut self, client: Client) -> &mut Self { + self.0.client = Some(client); + self + } +} + #[derive(Debug)] struct RestContext { client: HttpClient, @@ -2257,4 +2326,17 @@ mod tests { config_mock.assert_async().await; update_table_mock.assert_async().await; } + + #[tokio::test] + async fn test_create_rest_catalog() { + let mut builder = RestCatalogBuilder::default(); + builder.with_client(Client::new()); + + let catalog = builder.load("test", HashMap::from([ + ("uri".to_string(), "http://localhost:8080".to_string()), + ("a".to_string(), "b".to_string()), + ])).await; + + assert!(catalog.is_ok()); + } } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index e57152abc..ad6a81dbc 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display}; +use std::future::Future; use std::mem::take; use std::ops::Deref; @@ -96,6 +97,14 @@ pub trait Catalog: Debug + Sync + Send { async fn update_table(&self, commit: TableCommit) -> Result; } +/// Common interface for all catalog builders. +pub trait CatalogBuilder: Default + Debug + Send + Sync { + /// The catalog type that this builder creates. + type C: Catalog; + /// Create a new catalog instance. + fn load(self, name: impl Into, props: HashMap) -> impl Future> + Send; +} + /// NamespaceIdent represents the identifier of a namespace in the catalog. /// /// The namespace identifier is a list of strings, where each string is a diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 556ff3e02..f94b2b113 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -62,10 +62,7 @@ pub use error::{Error, ErrorKind, Result}; mod catalog; -pub use catalog::{ - Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, - TableUpdate, ViewCreation, -}; +pub use catalog::*; pub mod table;