diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 7454de126..609c80b3a 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -53,6 +53,55 @@ impl MemoryCatalog { warehouse_location, } } + + async fn load_table_helper( + &self, + table_ident: &TableIdent, + metadata_location: &str, + ) -> Result { + let input_file = self.file_io.new_input(metadata_location)?; + let metadata_content = input_file.read().await?; + let metadata = serde_json::from_slice::(&metadata_content)?; + Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(table_ident.clone()) + .build() + } + + fn extract_version(metadata_location: &str) -> Result { + if !metadata_location.ends_with(".metadata.json") { + return Err(Error::new( + ErrorKind::DataInvalid, + "path does not end with .metadata.json", + ) + .with_context("metadata_location", metadata_location)); + } + + let trimmed = metadata_location.strip_suffix(".metadata.json").unwrap(); + let parts: Vec<&str> = trimmed.rsplitn(2, "/metadata/").collect(); + if parts.len() != 2 { + return Err(Error::new( + ErrorKind::DataInvalid, + "missing `/metadata/` segment in path", + )); + } + + let metadata_part = parts[0]; + let _location = parts[1]; + let mut split = metadata_part.splitn(2, '-'); + let version_str = split.next().ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "missing version part before dash") + })?; + let version = version_str.parse::().map_err(|e| { + Error::new(ErrorKind::DataInvalid, "invalid version number") + .with_source(e) + .with_context("version_str", version_str.to_string()) + })?; + + Ok(version) + } } #[async_trait] @@ -230,16 +279,7 @@ impl Catalog for MemoryCatalog { let root_namespace_state = self.root_namespace_state.lock().await; let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; - let input_file = self.file_io.new_input(metadata_location)?; - let metadata_content = input_file.read().await?; - let metadata = serde_json::from_slice::(&metadata_content)?; - - Table::builder() - .file_io(self.file_io.clone()) - .metadata_location(metadata_location.clone()) - .metadata(metadata) - .identifier(table_ident.clone()) - .build() + self.load_table_helper(table_ident, metadata_location).await } /// Drop a table from the catalog. @@ -277,11 +317,53 @@ impl Catalog for MemoryCatalog { } /// Update a table to the catalog. - async fn update_table(&self, _commit: TableCommit) -> Result
{ - Err(Error::new( - ErrorKind::FeatureUnsupported, - "MemoryCatalog does not currently support updating tables.", - )) + async fn update_table(&self, mut commit: TableCommit) -> Result
{ + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let table_ident = commit.identifier().clone(); + let base_metadata_location = + root_namespace_state.get_existing_table_location(&table_ident)?; + let base_table = self + .load_table_helper(&table_ident, base_metadata_location) + .await?; + let base_metadata = base_table.metadata(); + + let requirements = commit.take_requirements(); + for requirement in &requirements { + requirement.check(Some(base_metadata))?; + } + + let mut metadata_builder = base_metadata + .clone() + .into_builder(Some(base_metadata_location.clone())); + for update in commit.take_updates() { + metadata_builder = update.clone().apply(metadata_builder)?; + } + if !metadata_builder.has_changes() { + // updates did not change the table metadata, no need to write a new version + return Ok(base_table); + } + + let new_metadata = metadata_builder.build()?.metadata; + let new_metadata_location = format!( + "{}/metadata/{}-{}.metadata.json", + &base_metadata.location(), + Self::extract_version(base_metadata_location)? + 1, + Uuid::new_v4() + ); + self.file_io + .new_output(&new_metadata_location)? + .write(serde_json::to_vec(&new_metadata)?.into()) + .await?; + root_namespace_state + .update_existing_table_location(&table_ident, new_metadata_location.clone())?; + + Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(new_metadata_location) + .metadata(new_metadata) + .identifier(table_ident.clone()) + .build() } } @@ -292,7 +374,10 @@ mod tests { use std::iter::FromIterator; use iceberg::io::FileIOBuilder; - use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::spec::{ + FormatVersion, NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type, + }; + use iceberg::transaction::Transaction; use regex::Regex; use tempfile::TempDir; @@ -335,8 +420,17 @@ mod tests { .unwrap() } - async fn create_table(catalog: &C, table_ident: &TableIdent) { - let _ = catalog + // Useful if you don't care about namespace or table name + async fn setup_simple_table(catalog: &C) -> Table { + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(catalog, &namespace_ident).await; + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_table(catalog, &table_ident).await + } + + async fn create_table(catalog: &C, table_ident: &TableIdent) -> Table { + catalog .create_table( &table_ident.namespace, TableCreation::builder() @@ -345,7 +439,7 @@ mod tests { .build(), ) .await - .unwrap(); + .unwrap() } async fn create_tables(catalog: &C, table_idents: Vec<&TableIdent>) { @@ -1697,4 +1791,139 @@ mod tests { ), ); } + + #[tokio::test] + async fn test_update_table_no_change() { + let catalog = new_memory_catalog(); + let table = setup_simple_table(&catalog).await; + + // tables are created in V2 format so the upgrade is a no-op + let updated_table = Transaction::new(&table) + .upgrade_table_version(FormatVersion::V2) + .unwrap() + .commit(&catalog) + .await + .unwrap(); + + assert_eq!(table.metadata_location(), updated_table.metadata_location()); + assert!(table.metadata().metadata_log().is_empty()); + } + + #[tokio::test] + async fn test_update_set_location() { + let catalog = new_memory_catalog(); + let table = setup_simple_table(&catalog).await; + + let new_location = format!("{}{}", table.metadata().location(), "updatedLocation"); + let updated_table = Transaction::new(&table) + .set_location(new_location.clone()) + .unwrap() + .commit(&catalog) + .await + .unwrap(); + let metadata_log = updated_table.metadata().metadata_log(); + + assert_eq!(updated_table.metadata().location(), new_location); + assert_eq!( + metadata_log.first().unwrap().metadata_file, + table.metadata_location().unwrap() + ); + } + + #[tokio::test] + async fn test_update_then_load() { + let catalog = new_memory_catalog(); + let table = setup_simple_table(&catalog).await; + + let new_location = format!("{}{}", table.metadata().location(), "updatedLocation"); + Transaction::new(&table) + .set_location(new_location.clone()) + .unwrap() + .commit(&catalog) + .await + .unwrap(); + + let updated_table = catalog.load_table(table.identifier()).await.unwrap(); + let metadata_log = updated_table.metadata().metadata_log(); + assert_eq!(updated_table.metadata().location(), new_location); + assert_eq!( + metadata_log.first().unwrap().metadata_file, + table.metadata_location().unwrap() + ); + } + + #[tokio::test] + async fn test_multiple_updates_single_call() { + let catalog = new_memory_catalog(); + let table = setup_simple_table(&catalog).await; + assert_eq!( + table.metadata().properties().clone(), + HashMap::::new() + ); + + let new_location = format!("{}{}", table.metadata().location(), "updatedLocation"); + let properties = HashMap::from([(String::from("a"), String::from("1"))]); + Transaction::new(&table) + .set_location(new_location.clone()) + .unwrap() + .set_properties(properties.clone()) + .unwrap() + .commit(&catalog) + .await + .unwrap(); + + let updated_table = catalog.load_table(table.identifier()).await.unwrap(); + let metadata_log = updated_table.metadata().metadata_log(); + assert_eq!(updated_table.metadata().location(), new_location); + assert_eq!(updated_table.metadata().properties().clone(), properties); + assert_eq!( + metadata_log.first().unwrap().metadata_file, + table.metadata_location().unwrap() + ); + } + + #[tokio::test] + async fn test_multiple_updates_multiple_calls() { + let catalog = new_memory_catalog(); + let table_v1 = setup_simple_table(&catalog).await; + assert_eq!( + table_v1.metadata().properties().clone(), + HashMap::::new() + ); + + let v2_properties = HashMap::from([(String::from("a"), String::from("1"))]); + Transaction::new(&table_v1) + .set_properties(v2_properties.clone()) + .unwrap() + .commit(&catalog) + .await + .unwrap(); + let table_v2 = catalog.load_table(table_v1.identifier()).await.unwrap(); + let metadata_log_v2 = table_v2.metadata().metadata_log(); + assert_eq!(table_v2.metadata().properties().clone(), v2_properties); + assert_eq!( + metadata_log_v2.first().unwrap().metadata_file, + table_v1.metadata_location().unwrap() + ); + + let mut v3_properties = HashMap::from([(String::from("b"), String::from("2"))]); + Transaction::new(&table_v2) + .set_properties(v3_properties.clone()) + .unwrap() + .commit(&catalog) + .await + .unwrap(); + let table_v3 = catalog.load_table(table_v1.identifier()).await.unwrap(); + let metadata_log_v3 = table_v3.metadata().metadata_log(); + v3_properties.extend(v2_properties); + assert_eq!(table_v3.metadata().properties().clone(), v3_properties); + assert_eq!( + metadata_log_v3.first().unwrap().metadata_file, + table_v1.metadata_location().unwrap() + ); + assert_eq!( + metadata_log_v3.get(1).unwrap().metadata_file, + table_v2.metadata_location().unwrap() + ); + } } diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs index e324e7a3d..c7c64ccee 100644 --- a/crates/catalog/memory/src/namespace_state.rs +++ b/crates/catalog/memory/src/namespace_state.rs @@ -262,6 +262,26 @@ impl NamespaceState { } } + // Updates the metadata location of the given table or throws an error if it doesn't exist + pub(crate) fn update_existing_table_location( + &mut self, + table_ident: &TableIdent, + new_metadata_location: String, + ) -> Result<()> { + let namespace = self.get_mut_namespace(table_ident.namespace())?; + if !namespace + .table_metadata_locations + .contains_key(table_ident.name()) + { + return no_such_table_err(table_ident); + } + namespace + .table_metadata_locations + .insert(table_ident.name().to_string(), new_metadata_location); + + Ok(()) + } + // Inserts the given table or returns an error if it already exists pub(crate) fn insert_new_table( &mut self, diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 1f3f89533..06545c065 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -194,6 +194,11 @@ impl TableMetadataBuilder { ) } + /// Returns whether or not the builder is tracking any changes + pub fn has_changes(&self) -> bool { + !self.changes.is_empty() + } + /// Changes uuid of table metadata. pub fn assign_uuid(mut self, uuid: Uuid) -> Self { if self.metadata.table_uuid != uuid {