diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs
index 7454de126..609c80b3a 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -53,6 +53,55 @@ impl MemoryCatalog {
warehouse_location,
}
}
+
+ async fn load_table_helper(
+ &self,
+ table_ident: &TableIdent,
+ metadata_location: &str,
+ ) -> Result
{
+ let input_file = self.file_io.new_input(metadata_location)?;
+ let metadata_content = input_file.read().await?;
+ let metadata = serde_json::from_slice::(&metadata_content)?;
+ Table::builder()
+ .file_io(self.file_io.clone())
+ .metadata_location(metadata_location)
+ .metadata(metadata)
+ .identifier(table_ident.clone())
+ .build()
+ }
+
+ fn extract_version(metadata_location: &str) -> Result {
+ if !metadata_location.ends_with(".metadata.json") {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "path does not end with .metadata.json",
+ )
+ .with_context("metadata_location", metadata_location));
+ }
+
+ let trimmed = metadata_location.strip_suffix(".metadata.json").unwrap();
+ let parts: Vec<&str> = trimmed.rsplitn(2, "/metadata/").collect();
+ if parts.len() != 2 {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "missing `/metadata/` segment in path",
+ ));
+ }
+
+ let metadata_part = parts[0];
+ let _location = parts[1];
+ let mut split = metadata_part.splitn(2, '-');
+ let version_str = split.next().ok_or_else(|| {
+ Error::new(ErrorKind::DataInvalid, "missing version part before dash")
+ })?;
+ let version = version_str.parse::().map_err(|e| {
+ Error::new(ErrorKind::DataInvalid, "invalid version number")
+ .with_source(e)
+ .with_context("version_str", version_str.to_string())
+ })?;
+
+ Ok(version)
+ }
}
#[async_trait]
@@ -230,16 +279,7 @@ impl Catalog for MemoryCatalog {
let root_namespace_state = self.root_namespace_state.lock().await;
let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
- let input_file = self.file_io.new_input(metadata_location)?;
- let metadata_content = input_file.read().await?;
- let metadata = serde_json::from_slice::(&metadata_content)?;
-
- Table::builder()
- .file_io(self.file_io.clone())
- .metadata_location(metadata_location.clone())
- .metadata(metadata)
- .identifier(table_ident.clone())
- .build()
+ self.load_table_helper(table_ident, metadata_location).await
}
/// Drop a table from the catalog.
@@ -277,11 +317,53 @@ impl Catalog for MemoryCatalog {
}
/// Update a table to the catalog.
- async fn update_table(&self, _commit: TableCommit) -> Result {
- Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "MemoryCatalog does not currently support updating tables.",
- ))
+ async fn update_table(&self, mut commit: TableCommit) -> Result {
+ let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+ let table_ident = commit.identifier().clone();
+ let base_metadata_location =
+ root_namespace_state.get_existing_table_location(&table_ident)?;
+ let base_table = self
+ .load_table_helper(&table_ident, base_metadata_location)
+ .await?;
+ let base_metadata = base_table.metadata();
+
+ let requirements = commit.take_requirements();
+ for requirement in &requirements {
+ requirement.check(Some(base_metadata))?;
+ }
+
+ let mut metadata_builder = base_metadata
+ .clone()
+ .into_builder(Some(base_metadata_location.clone()));
+ for update in commit.take_updates() {
+ metadata_builder = update.clone().apply(metadata_builder)?;
+ }
+ if !metadata_builder.has_changes() {
+ // updates did not change the table metadata, no need to write a new version
+ return Ok(base_table);
+ }
+
+ let new_metadata = metadata_builder.build()?.metadata;
+ let new_metadata_location = format!(
+ "{}/metadata/{}-{}.metadata.json",
+ &base_metadata.location(),
+ Self::extract_version(base_metadata_location)? + 1,
+ Uuid::new_v4()
+ );
+ self.file_io
+ .new_output(&new_metadata_location)?
+ .write(serde_json::to_vec(&new_metadata)?.into())
+ .await?;
+ root_namespace_state
+ .update_existing_table_location(&table_ident, new_metadata_location.clone())?;
+
+ Table::builder()
+ .file_io(self.file_io.clone())
+ .metadata_location(new_metadata_location)
+ .metadata(new_metadata)
+ .identifier(table_ident.clone())
+ .build()
}
}
@@ -292,7 +374,10 @@ mod tests {
use std::iter::FromIterator;
use iceberg::io::FileIOBuilder;
- use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
+ use iceberg::spec::{
+ FormatVersion, NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type,
+ };
+ use iceberg::transaction::Transaction;
use regex::Regex;
use tempfile::TempDir;
@@ -335,8 +420,17 @@ mod tests {
.unwrap()
}
- async fn create_table(catalog: &C, table_ident: &TableIdent) {
- let _ = catalog
+ // Useful if you don't care about namespace or table name
+ async fn setup_simple_table(catalog: &C) -> Table {
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespace(catalog, &namespace_ident).await;
+ let table_name = "tbl1";
+ let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
+ create_table(catalog, &table_ident).await
+ }
+
+ async fn create_table(catalog: &C, table_ident: &TableIdent) -> Table {
+ catalog
.create_table(
&table_ident.namespace,
TableCreation::builder()
@@ -345,7 +439,7 @@ mod tests {
.build(),
)
.await
- .unwrap();
+ .unwrap()
}
async fn create_tables(catalog: &C, table_idents: Vec<&TableIdent>) {
@@ -1697,4 +1791,139 @@ mod tests {
),
);
}
+
+ #[tokio::test]
+ async fn test_update_table_no_change() {
+ let catalog = new_memory_catalog();
+ let table = setup_simple_table(&catalog).await;
+
+ // tables are created in V2 format so the upgrade is a no-op
+ let updated_table = Transaction::new(&table)
+ .upgrade_table_version(FormatVersion::V2)
+ .unwrap()
+ .commit(&catalog)
+ .await
+ .unwrap();
+
+ assert_eq!(table.metadata_location(), updated_table.metadata_location());
+ assert!(table.metadata().metadata_log().is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_update_set_location() {
+ let catalog = new_memory_catalog();
+ let table = setup_simple_table(&catalog).await;
+
+ let new_location = format!("{}{}", table.metadata().location(), "updatedLocation");
+ let updated_table = Transaction::new(&table)
+ .set_location(new_location.clone())
+ .unwrap()
+ .commit(&catalog)
+ .await
+ .unwrap();
+ let metadata_log = updated_table.metadata().metadata_log();
+
+ assert_eq!(updated_table.metadata().location(), new_location);
+ assert_eq!(
+ metadata_log.first().unwrap().metadata_file,
+ table.metadata_location().unwrap()
+ );
+ }
+
+ #[tokio::test]
+ async fn test_update_then_load() {
+ let catalog = new_memory_catalog();
+ let table = setup_simple_table(&catalog).await;
+
+ let new_location = format!("{}{}", table.metadata().location(), "updatedLocation");
+ Transaction::new(&table)
+ .set_location(new_location.clone())
+ .unwrap()
+ .commit(&catalog)
+ .await
+ .unwrap();
+
+ let updated_table = catalog.load_table(table.identifier()).await.unwrap();
+ let metadata_log = updated_table.metadata().metadata_log();
+ assert_eq!(updated_table.metadata().location(), new_location);
+ assert_eq!(
+ metadata_log.first().unwrap().metadata_file,
+ table.metadata_location().unwrap()
+ );
+ }
+
+ #[tokio::test]
+ async fn test_multiple_updates_single_call() {
+ let catalog = new_memory_catalog();
+ let table = setup_simple_table(&catalog).await;
+ assert_eq!(
+ table.metadata().properties().clone(),
+ HashMap::::new()
+ );
+
+ let new_location = format!("{}{}", table.metadata().location(), "updatedLocation");
+ let properties = HashMap::from([(String::from("a"), String::from("1"))]);
+ Transaction::new(&table)
+ .set_location(new_location.clone())
+ .unwrap()
+ .set_properties(properties.clone())
+ .unwrap()
+ .commit(&catalog)
+ .await
+ .unwrap();
+
+ let updated_table = catalog.load_table(table.identifier()).await.unwrap();
+ let metadata_log = updated_table.metadata().metadata_log();
+ assert_eq!(updated_table.metadata().location(), new_location);
+ assert_eq!(updated_table.metadata().properties().clone(), properties);
+ assert_eq!(
+ metadata_log.first().unwrap().metadata_file,
+ table.metadata_location().unwrap()
+ );
+ }
+
+ #[tokio::test]
+ async fn test_multiple_updates_multiple_calls() {
+ let catalog = new_memory_catalog();
+ let table_v1 = setup_simple_table(&catalog).await;
+ assert_eq!(
+ table_v1.metadata().properties().clone(),
+ HashMap::::new()
+ );
+
+ let v2_properties = HashMap::from([(String::from("a"), String::from("1"))]);
+ Transaction::new(&table_v1)
+ .set_properties(v2_properties.clone())
+ .unwrap()
+ .commit(&catalog)
+ .await
+ .unwrap();
+ let table_v2 = catalog.load_table(table_v1.identifier()).await.unwrap();
+ let metadata_log_v2 = table_v2.metadata().metadata_log();
+ assert_eq!(table_v2.metadata().properties().clone(), v2_properties);
+ assert_eq!(
+ metadata_log_v2.first().unwrap().metadata_file,
+ table_v1.metadata_location().unwrap()
+ );
+
+ let mut v3_properties = HashMap::from([(String::from("b"), String::from("2"))]);
+ Transaction::new(&table_v2)
+ .set_properties(v3_properties.clone())
+ .unwrap()
+ .commit(&catalog)
+ .await
+ .unwrap();
+ let table_v3 = catalog.load_table(table_v1.identifier()).await.unwrap();
+ let metadata_log_v3 = table_v3.metadata().metadata_log();
+ v3_properties.extend(v2_properties);
+ assert_eq!(table_v3.metadata().properties().clone(), v3_properties);
+ assert_eq!(
+ metadata_log_v3.first().unwrap().metadata_file,
+ table_v1.metadata_location().unwrap()
+ );
+ assert_eq!(
+ metadata_log_v3.get(1).unwrap().metadata_file,
+ table_v2.metadata_location().unwrap()
+ );
+ }
}
diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs
index e324e7a3d..c7c64ccee 100644
--- a/crates/catalog/memory/src/namespace_state.rs
+++ b/crates/catalog/memory/src/namespace_state.rs
@@ -262,6 +262,26 @@ impl NamespaceState {
}
}
+ // Updates the metadata location of the given table or throws an error if it doesn't exist
+ pub(crate) fn update_existing_table_location(
+ &mut self,
+ table_ident: &TableIdent,
+ new_metadata_location: String,
+ ) -> Result<()> {
+ let namespace = self.get_mut_namespace(table_ident.namespace())?;
+ if !namespace
+ .table_metadata_locations
+ .contains_key(table_ident.name())
+ {
+ return no_such_table_err(table_ident);
+ }
+ namespace
+ .table_metadata_locations
+ .insert(table_ident.name().to_string(), new_metadata_location);
+
+ Ok(())
+ }
+
// Inserts the given table or returns an error if it already exists
pub(crate) fn insert_new_table(
&mut self,
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs
index 1f3f89533..06545c065 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -194,6 +194,11 @@ impl TableMetadataBuilder {
)
}
+ /// Returns whether or not the builder is tracking any changes
+ pub fn has_changes(&self) -> bool {
+ !self.changes.is_empty()
+ }
+
/// Changes uuid of table metadata.
pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
if self.metadata.table_uuid != uuid {