From f162400af948628ab6f25ddf6b148ed87662328e Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 21 Apr 2025 18:34:12 +0800 Subject: [PATCH 1/7] Experiment implementation for catalog builder --- crates/catalog/rest/src/catalog.rs | 88 +++++++++++++++++++++++++++++- crates/iceberg/src/catalog/mod.rs | 17 ++++++ crates/iceberg/src/lib.rs | 4 +- 3 files changed, 105 insertions(+), 4 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 7bf54d4d8b..a324ef3b00 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -18,14 +18,15 @@ //! This module contains the iceberg REST catalog implementation. use std::collections::HashMap; +use std::future::Future; use std::str::FromStr; use async_trait::async_trait; use iceberg::io::FileIO; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, - TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, + TableCreation, TableIdent, }; use itertools::Itertools; use reqwest::header::{ @@ -51,6 +52,8 @@ const PATH_V1: &str = "v1"; /// Rest catalog configuration. #[derive(Clone, Debug, TypedBuilder)] pub struct RestCatalogConfig { + #[builder(default, setter(strip_option))] + name: Option, uri: String, #[builder(default, setter(strip_option(fallback = warehouse_opt)))] @@ -223,6 +226,74 @@ impl RestCatalogConfig { } } +/// Builder for [`RestCatalog`]. +#[derive(Debug)] +pub struct RestCatalogBuilder(RestCatalogConfig); + +impl Default for RestCatalogBuilder { + fn default() -> Self { + Self(RestCatalogConfig { + name: None, + uri: "".to_string(), + warehouse: None, + props: HashMap::new(), + client: None, + }) + } +} + +impl CatalogBuilder for RestCatalogBuilder { + type C = RestCatalog; + + fn name(mut self, name: impl Into) -> Self { + self.0.name = Some(name.into()); + self + } + + fn uri(mut self, uri: impl Into) -> Self { + self.0.uri = uri.into(); + self + } + + fn warehouse(mut self, warehouse: impl Into) -> Self { + self.0.warehouse = Some(warehouse.into()); + self + } + + fn with_prop(mut self, key: impl Into, value: impl Into) -> Self { + self.0.props.insert(key.into(), value.into()); + self + } + + fn build(self) -> impl Future> { + let result = { + if self.0.name.is_none() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name is required", + )) + } else if self.0.uri.is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog uri is required", + )) + } else { + Ok(RestCatalog::new(self.0)) + } + }; + + std::future::ready(result) + } +} + +impl RestCatalogBuilder { + /// Configures the catalog with a custom HTTP client. + pub fn with_client(mut self, client: Client) -> Self { + self.0.client = Some(client); + self + } +} + #[derive(Debug)] struct RestContext { client: HttpClient, @@ -2257,4 +2328,17 @@ mod tests { config_mock.assert_async().await; update_table_mock.assert_async().await; } + + #[tokio::test] + async fn test_create_rest_catalog() { + let catalog = RestCatalogBuilder::default() + .name("test") + .uri("http://localhost:8080") + .with_client(Client::new()) + .with_prop("a", "b") + .build() + .await; + + assert!(catalog.is_ok()); + } } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index e57152abc0..d385bffe67 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display}; +use std::future::Future; use std::mem::take; use std::ops::Deref; @@ -96,6 +97,22 @@ pub trait Catalog: Debug + Sync + Send { async fn update_table(&self, commit: TableCommit) -> Result; } +/// Common interface for all catalog builders. +pub trait CatalogBuilder: Default + Debug + Send + Sync { + /// The catalog type that this builder creates. + type C: Catalog; + /// Configure name of the catalog. + fn name(self, name: impl Into) -> Self; + /// Configure uri of the catalog. + fn uri(self, uri: impl Into) -> Self; + /// Configure warehouse location of the catalog. + fn warehouse(self, warehouse: impl Into) -> Self; + /// Configure properties of the catalog. + fn with_prop(self, key: impl Into, value: impl Into) -> Self; + /// Create the catalog + fn build(self) -> impl Future>; +} + /// NamespaceIdent represents the identifier of a namespace in the catalog. /// /// The namespace identifier is a list of strings, where each string is a diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 556ff3e02f..9c8e9c12fb 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -63,8 +63,8 @@ pub use error::{Error, ErrorKind, Result}; mod catalog; pub use catalog::{ - Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, - TableUpdate, ViewCreation, + Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, + TableRequirement, TableUpdate, ViewCreation, }; pub mod table; From f6c5937a186184909c7689d65288c8bcd74794c2 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 25 Apr 2025 11:06:58 +0800 Subject: [PATCH 2/7] Loader example --- Cargo.lock | 8 +++++ crates/catalog/loader/Cargo.toml | 16 ++++++++++ crates/catalog/loader/src/lib.rs | 52 ++++++++++++++++++++++++++++++++ crates/iceberg/src/lib.rs | 5 +-- 4 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 crates/catalog/loader/Cargo.toml create mode 100644 crates/catalog/loader/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4411365b0d..5d935fd4c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3313,6 +3313,14 @@ dependencies = [ "volo-thrift", ] +[[package]] +name = "iceberg-catalog-loader" +version = "0.4.0" +dependencies = [ + "iceberg", + "iceberg-catalog-rest", +] + [[package]] name = "iceberg-catalog-memory" version = "0.4.0" diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml new file mode 100644 index 0000000000..058f569207 --- /dev/null +++ b/crates/catalog/loader/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "iceberg-catalog-loader" +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Catalog Loader API" +keywords = ["iceberg", "catalog"] +license = { workspace = true } +repository = { workspace = true } + +[dependencies] +iceberg = { workspace = true } +iceberg-catalog-rest = {workspace = true} diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs new file mode 100644 index 0000000000..6fa58c3a61 --- /dev/null +++ b/crates/catalog/loader/src/lib.rs @@ -0,0 +1,52 @@ +use std::sync::Arc; +use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; +use iceberg_catalog_rest::RestCatalogBuilder; + + +pub enum CatalogBuilderDef { + Rest(RestCatalogBuilder), +} + +pub fn load(r#type: &str) -> Result { + match r#type { + "rest" => Ok(CatalogBuilderDef::Rest(RestCatalogBuilder::default())), + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported catalog type: {}", r#type), + )), + } +} + +impl CatalogBuilderDef { + pub fn name(self, name: impl Into) -> Self { + match self { + CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.name(name)), + } + } + + pub fn uri(self, uri: impl Into) -> Self { + match self { + CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.uri(uri)), + } + } + + pub fn warehouse(self, warehouse: impl Into) -> Self { + match self { + CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.warehouse(warehouse)), + } + } + + pub fn with_prop(self, key: impl Into, value: impl Into) -> Self { + match self { + CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.with_prop(key, value)), + } + } + + pub async fn build(self) -> Result> { + match self { + CatalogBuilderDef::Rest(builder) => builder.build() + .await + .map(|c| Arc::new(c) as Arc), + } + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 9c8e9c12fb..f94b2b113e 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -62,10 +62,7 @@ pub use error::{Error, ErrorKind, Result}; mod catalog; -pub use catalog::{ - Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, - TableRequirement, TableUpdate, ViewCreation, -}; +pub use catalog::*; pub mod table; From a569f7d0ac0a4bb1299d7826ef378f4179693361 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 25 Apr 2025 11:08:05 +0800 Subject: [PATCH 3/7] format --- crates/catalog/loader/src/lib.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 6fa58c3a61..559c06f92d 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -1,8 +1,8 @@ use std::sync::Arc; + use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_rest::RestCatalogBuilder; - pub enum CatalogBuilderDef { Rest(RestCatalogBuilder), } @@ -23,28 +23,33 @@ impl CatalogBuilderDef { CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.name(name)), } } - + pub fn uri(self, uri: impl Into) -> Self { match self { CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.uri(uri)), } } - + pub fn warehouse(self, warehouse: impl Into) -> Self { match self { - CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.warehouse(warehouse)), + CatalogBuilderDef::Rest(builder) => { + CatalogBuilderDef::Rest(builder.warehouse(warehouse)) + } } } - + pub fn with_prop(self, key: impl Into, value: impl Into) -> Self { match self { - CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.with_prop(key, value)), + CatalogBuilderDef::Rest(builder) => { + CatalogBuilderDef::Rest(builder.with_prop(key, value)) + } } } - + pub async fn build(self) -> Result> { match self { - CatalogBuilderDef::Rest(builder) => builder.build() + CatalogBuilderDef::Rest(builder) => builder + .build() .await .map(|c| Arc::new(c) as Arc), } From 2d494d51f09c4ba341ac7ac85eabb1f7f941543a Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 30 Apr 2025 14:53:50 +0800 Subject: [PATCH 4/7] Keep both --- crates/catalog/loader/src/lib.rs | 66 +++++++++++++----------------- crates/catalog/rest/src/catalog.rs | 18 ++++---- crates/iceberg/src/catalog/mod.rs | 12 +++--- 3 files changed, 43 insertions(+), 53 deletions(-) diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 559c06f92d..434a4fb370 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -1,57 +1,47 @@ +use std::future::Future; use std::sync::Arc; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_rest::RestCatalogBuilder; -pub enum CatalogBuilderDef { - Rest(RestCatalogBuilder), +pub trait BoxedCatalogBuilder { + fn name(&mut self, name: String); + fn uri(&mut self, uri: String); + fn warehouse(&mut self, warehouse: String); + fn with_prop(&mut self, key: String, value: String); + + fn build(self: Box) -> Box>>>; } -pub fn load(r#type: &str) -> Result { - match r#type { - "rest" => Ok(CatalogBuilderDef::Rest(RestCatalogBuilder::default())), - _ => Err(Error::new( - ErrorKind::FeatureUnsupported, - format!("Unsupported catalog type: {}", r#type), - )), +impl BoxedCatalogBuilder for T { + fn name(&mut self, name: String) { + self.name(name); } -} -impl CatalogBuilderDef { - pub fn name(self, name: impl Into) -> Self { - match self { - CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.name(name)), - } + fn uri(&mut self, uri: String) { + self.uri(uri); } - pub fn uri(self, uri: impl Into) -> Self { - match self { - CatalogBuilderDef::Rest(builder) => CatalogBuilderDef::Rest(builder.uri(uri)), - } + fn warehouse(&mut self, warehouse: String) { + self.warehouse(warehouse); } - pub fn warehouse(self, warehouse: impl Into) -> Self { - match self { - CatalogBuilderDef::Rest(builder) => { - CatalogBuilderDef::Rest(builder.warehouse(warehouse)) - } - } + fn with_prop(&mut self, key: String, value: String) { + self.with_prop(key, value); } - pub fn with_prop(self, key: impl Into, value: impl Into) -> Self { - match self { - CatalogBuilderDef::Rest(builder) => { - CatalogBuilderDef::Rest(builder.with_prop(key, value)) - } - } + fn build(self: Box) -> Box>>> { + let builder = *self; + Box::new(async move { Ok(Arc::new(builder.build().await.unwrap()) as Arc) }) } +} - pub async fn build(self) -> Result> { - match self { - CatalogBuilderDef::Rest(builder) => builder - .build() - .await - .map(|c| Arc::new(c) as Arc), - } +pub fn load(r#type: &str) -> Result> { + match r#type { + "rest" => Ok(Box::new(RestCatalogBuilder::default()) as Box), + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Unsupported catalog type: {}", r#type), + )), } } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index a324ef3b00..bd58e7d6ea 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -245,22 +245,22 @@ impl Default for RestCatalogBuilder { impl CatalogBuilder for RestCatalogBuilder { type C = RestCatalog; - fn name(mut self, name: impl Into) -> Self { + fn name(&mut self, name: impl Into) -> &mut Self { self.0.name = Some(name.into()); self } - fn uri(mut self, uri: impl Into) -> Self { + fn uri(&mut self, uri: impl Into) -> &mut Self { self.0.uri = uri.into(); self } - fn warehouse(mut self, warehouse: impl Into) -> Self { + fn warehouse(&mut self, warehouse: impl Into) -> &mut Self { self.0.warehouse = Some(warehouse.into()); self } - fn with_prop(mut self, key: impl Into, value: impl Into) -> Self { + fn with_prop(&mut self, key: impl Into, value: impl Into) -> &mut Self { self.0.props.insert(key.into(), value.into()); self } @@ -288,7 +288,7 @@ impl CatalogBuilder for RestCatalogBuilder { impl RestCatalogBuilder { /// Configures the catalog with a custom HTTP client. - pub fn with_client(mut self, client: Client) -> Self { + pub fn with_client(&mut self, client: Client) -> &mut Self { self.0.client = Some(client); self } @@ -2331,13 +2331,13 @@ mod tests { #[tokio::test] async fn test_create_rest_catalog() { - let catalog = RestCatalogBuilder::default() + let mut builder = RestCatalogBuilder::default(); + builder .name("test") .uri("http://localhost:8080") .with_client(Client::new()) - .with_prop("a", "b") - .build() - .await; + .with_prop("a", "b"); + let catalog = builder.build().await; assert!(catalog.is_ok()); } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index d385bffe67..147eacbdbc 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -101,14 +101,14 @@ pub trait Catalog: Debug + Sync + Send { pub trait CatalogBuilder: Default + Debug + Send + Sync { /// The catalog type that this builder creates. type C: Catalog; - /// Configure name of the catalog. - fn name(self, name: impl Into) -> Self; + /// Configure the name of the catalog. + fn name(&mut self, name: impl Into) -> &mut Self; /// Configure uri of the catalog. - fn uri(self, uri: impl Into) -> Self; - /// Configure warehouse location of the catalog. - fn warehouse(self, warehouse: impl Into) -> Self; + fn uri(&mut self, uri: impl Into) -> &mut Self; + /// Configure the warehouse location of the catalog. + fn warehouse(&mut self, warehouse: impl Into) -> &mut Self; /// Configure properties of the catalog. - fn with_prop(self, key: impl Into, value: impl Into) -> Self; + fn with_prop(&mut self, key: impl Into, value: impl Into) -> &mut Self; /// Create the catalog fn build(self) -> impl Future>; } From f4a2efea385230cd330026705316e16331a1c612 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Wed, 30 Apr 2025 15:10:55 +0800 Subject: [PATCH 5/7] Fix clippy --- Cargo.lock | 1 + crates/catalog/loader/Cargo.toml | 1 + crates/catalog/loader/src/lib.rs | 23 ++++++++++++++++++++--- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d935fd4c0..c4458eb09b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3319,6 +3319,7 @@ version = "0.4.0" dependencies = [ "iceberg", "iceberg-catalog-rest", + "tokio", ] [[package]] diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml index 058f569207..f1d38c90a9 100644 --- a/crates/catalog/loader/Cargo.toml +++ b/crates/catalog/loader/Cargo.toml @@ -14,3 +14,4 @@ repository = { workspace = true } [dependencies] iceberg = { workspace = true } iceberg-catalog-rest = {workspace = true} +tokio = { workspace = true } diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 434a4fb370..336d7defd2 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -1,16 +1,19 @@ use std::future::Future; +use std::pin::Pin; use std::sync::Arc; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_rest::RestCatalogBuilder; +type BoxedCatalogBuilderFuture = Pin>>>>; + pub trait BoxedCatalogBuilder { fn name(&mut self, name: String); fn uri(&mut self, uri: String); fn warehouse(&mut self, warehouse: String); fn with_prop(&mut self, key: String, value: String); - fn build(self: Box) -> Box>>>; + fn build(self: Box) -> BoxedCatalogBuilderFuture; } impl BoxedCatalogBuilder for T { @@ -30,9 +33,9 @@ impl BoxedCatalogBuilder for T { self.with_prop(key, value); } - fn build(self: Box) -> Box>>> { + fn build(self: Box) -> BoxedCatalogBuilderFuture { let builder = *self; - Box::new(async move { Ok(Arc::new(builder.build().await.unwrap()) as Arc) }) + Box::pin(async move { Ok(Arc::new(builder.build().await.unwrap()) as Arc) }) } } @@ -45,3 +48,17 @@ pub fn load(r#type: &str) -> Result> { )), } } + +#[cfg(test)] +mod tests { + use crate::load; + + #[tokio::test] + async fn test_load() { + let mut catalog = load("rest").unwrap(); + catalog.name("rest".to_string()); + catalog.with_prop("key".to_string(), "value".to_string()); + + catalog.build().await.unwrap(); + } +} From 59ca3159ce086d84bbfe20023c0728a32361ac6f Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 26 May 2025 17:07:33 +0800 Subject: [PATCH 6/7] Update deom --- crates/catalog/loader/src/lib.rs | 36 +++++----------------- crates/catalog/rest/src/catalog.rs | 48 ++++++++++++++---------------- crates/iceberg/src/catalog/mod.rs | 12 ++------ 3 files changed, 33 insertions(+), 63 deletions(-) diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 336d7defd2..04bbcce9d9 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -8,34 +9,13 @@ use iceberg_catalog_rest::RestCatalogBuilder; type BoxedCatalogBuilderFuture = Pin>>>>; pub trait BoxedCatalogBuilder { - fn name(&mut self, name: String); - fn uri(&mut self, uri: String); - fn warehouse(&mut self, warehouse: String); - fn with_prop(&mut self, key: String, value: String); - - fn build(self: Box) -> BoxedCatalogBuilderFuture; + fn load(self: Box, name: String, props: HashMap) -> BoxedCatalogBuilderFuture; } impl BoxedCatalogBuilder for T { - fn name(&mut self, name: String) { - self.name(name); - } - - fn uri(&mut self, uri: String) { - self.uri(uri); - } - - fn warehouse(&mut self, warehouse: String) { - self.warehouse(warehouse); - } - - fn with_prop(&mut self, key: String, value: String) { - self.with_prop(key, value); - } - - fn build(self: Box) -> BoxedCatalogBuilderFuture { + fn load(self: Box, name: String, props: HashMap) -> BoxedCatalogBuilderFuture { let builder = *self; - Box::pin(async move { Ok(Arc::new(builder.build().await.unwrap()) as Arc) }) + Box::pin(async move { Ok(Arc::new(builder.load(name, props).await.unwrap()) as Arc) }) } } @@ -51,14 +31,14 @@ pub fn load(r#type: &str) -> Result> { #[cfg(test)] mod tests { + use std::collections::HashMap; use crate::load; #[tokio::test] async fn test_load() { let mut catalog = load("rest").unwrap(); - catalog.name("rest".to_string()); - catalog.with_prop("key".to_string(), "value".to_string()); - - catalog.build().await.unwrap(); + catalog.load("rest".to_string(), HashMap::from( + [("key".to_string(), "value".to_string())] + )).await.unwrap(); } } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index bd58e7d6ea..74b49c4483 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -45,6 +45,8 @@ use crate::types::{ RenameTableRequest, }; +const REST_CATALOG_PROP_URI: &str = "uri"; +const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); const PATH_V1: &str = "v1"; @@ -245,27 +247,22 @@ impl Default for RestCatalogBuilder { impl CatalogBuilder for RestCatalogBuilder { type C = RestCatalog; - fn name(&mut self, name: impl Into) -> &mut Self { + fn load(mut self, name: impl Into, props: HashMap) -> impl Future> { self.0.name = Some(name.into()); - self - } - - fn uri(&mut self, uri: impl Into) -> &mut Self { - self.0.uri = uri.into(); - self - } - - fn warehouse(&mut self, warehouse: impl Into) -> &mut Self { - self.0.warehouse = Some(warehouse.into()); - self - } - - fn with_prop(&mut self, key: impl Into, value: impl Into) -> &mut Self { - self.0.props.insert(key.into(), value.into()); - self - } + if props.contains_key(REST_CATALOG_PROP_URI) { + self.0.uri = props + .get(REST_CATALOG_PROP_URI) + .cloned() + .unwrap_or_default(); + } + + if props.contains_key(REST_CATALOG_PROP_WAREHOUSE) { + self.0.warehouse = props + .get(REST_CATALOG_PROP_WAREHOUSE) + .cloned() + .map(|s| s.into()); + } - fn build(self) -> impl Future> { let result = { if self.0.name.is_none() { Err(Error::new( @@ -283,6 +280,7 @@ impl CatalogBuilder for RestCatalogBuilder { }; std::future::ready(result) + } } @@ -2332,12 +2330,12 @@ mod tests { #[tokio::test] async fn test_create_rest_catalog() { let mut builder = RestCatalogBuilder::default(); - builder - .name("test") - .uri("http://localhost:8080") - .with_client(Client::new()) - .with_prop("a", "b"); - let catalog = builder.build().await; + builder.with_client(Client::new()); + + let catalog = builder.load("test", HashMap::from([ + ("uri".to_string(), "http://localhost:8080".to_string()), + ("a".to_string(), "b".to_string()), + ])).await; assert!(catalog.is_ok()); } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 147eacbdbc..b35c760f8a 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -101,16 +101,8 @@ pub trait Catalog: Debug + Sync + Send { pub trait CatalogBuilder: Default + Debug + Send + Sync { /// The catalog type that this builder creates. type C: Catalog; - /// Configure the name of the catalog. - fn name(&mut self, name: impl Into) -> &mut Self; - /// Configure uri of the catalog. - fn uri(&mut self, uri: impl Into) -> &mut Self; - /// Configure the warehouse location of the catalog. - fn warehouse(&mut self, warehouse: impl Into) -> &mut Self; - /// Configure properties of the catalog. - fn with_prop(&mut self, key: impl Into, value: impl Into) -> &mut Self; - /// Create the catalog - fn build(self) -> impl Future>; + /// Create a new catalog instance. + fn load(self, name: impl Into, props: HashMap) -> impl Future>; } /// NamespaceIdent represents the identifier of a namespace in the catalog. From 1e3eeb4cabb858b0b32a3c09154dac1dcb6c8279 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 29 May 2025 17:06:20 +0800 Subject: [PATCH 7/7] More concise design --- Cargo.lock | 1 + crates/catalog/loader/Cargo.toml | 1 + crates/catalog/loader/src/lib.rs | 14 +++++++------- crates/iceberg/src/catalog/mod.rs | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4458eb09b..6f3974b232 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3317,6 +3317,7 @@ dependencies = [ name = "iceberg-catalog-loader" version = "0.4.0" dependencies = [ + "async-trait", "iceberg", "iceberg-catalog-rest", "tokio", diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml index f1d38c90a9..75df84e87f 100644 --- a/crates/catalog/loader/Cargo.toml +++ b/crates/catalog/loader/Cargo.toml @@ -15,3 +15,4 @@ repository = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = {workspace = true} tokio = { workspace = true } +async-trait = {workspace = true} diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 04bbcce9d9..3f5bb43a82 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -1,21 +1,21 @@ +use async_trait::async_trait; use std::collections::HashMap; use std::future::Future; -use std::pin::Pin; use std::sync::Arc; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_rest::RestCatalogBuilder; -type BoxedCatalogBuilderFuture = Pin>>>>; - +#[async_trait] pub trait BoxedCatalogBuilder { - fn load(self: Box, name: String, props: HashMap) -> BoxedCatalogBuilderFuture; + async fn load(self: Box, name: String, props: HashMap) -> Result>; } +#[async_trait] impl BoxedCatalogBuilder for T { - fn load(self: Box, name: String, props: HashMap) -> BoxedCatalogBuilderFuture { + async fn load(self: Box, name: String, props: HashMap) -> Result> { let builder = *self; - Box::pin(async move { Ok(Arc::new(builder.load(name, props).await.unwrap()) as Arc) }) + Ok(Arc::new(builder.load(name, props).await.unwrap()) as Arc) } } @@ -36,7 +36,7 @@ mod tests { #[tokio::test] async fn test_load() { - let mut catalog = load("rest").unwrap(); + let catalog = load("rest").unwrap(); catalog.load("rest".to_string(), HashMap::from( [("key".to_string(), "value".to_string())] )).await.unwrap(); diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index b35c760f8a..ad6a81dbc1 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -102,7 +102,7 @@ pub trait CatalogBuilder: Default + Debug + Send + Sync { /// The catalog type that this builder creates. type C: Catalog; /// Create a new catalog instance. - fn load(self, name: impl Into, props: HashMap) -> impl Future>; + fn load(self, name: impl Into, props: HashMap) -> impl Future> + Send; } /// NamespaceIdent represents the identifier of a namespace in the catalog.