Skip to content

feat(memory): Implement update_table for MemoryCatalog #1381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 248 additions & 19 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,55 @@ impl MemoryCatalog {
warehouse_location,
}
}

async fn load_table_helper(
&self,
table_ident: &TableIdent,
metadata_location: &str,
) -> Result<Table> {
let input_file = self.file_io.new_input(metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(table_ident.clone())
.build()
}

fn extract_version(metadata_location: &str) -> Result<u32> {
if !metadata_location.ends_with(".metadata.json") {
return Err(Error::new(
ErrorKind::DataInvalid,
"path does not end with .metadata.json",
)
.with_context("metadata_location", metadata_location));
}

let trimmed = metadata_location.strip_suffix(".metadata.json").unwrap();
let parts: Vec<&str> = trimmed.rsplitn(2, "/metadata/").collect();
if parts.len() != 2 {
return Err(Error::new(
ErrorKind::DataInvalid,
"missing `/metadata/` segment in path",
));
}

let metadata_part = parts[0];
let _location = parts[1];
let mut split = metadata_part.splitn(2, '-');
let version_str = split.next().ok_or_else(|| {
Error::new(ErrorKind::DataInvalid, "missing version part before dash")
})?;
let version = version_str.parse::<u32>().map_err(|e| {
Error::new(ErrorKind::DataInvalid, "invalid version number")
.with_source(e)
.with_context("version_str", version_str.to_string())
})?;

Ok(version)
}
}

#[async_trait]
Expand Down Expand Up @@ -230,16 +279,7 @@ impl Catalog for MemoryCatalog {
let root_namespace_state = self.root_namespace_state.lock().await;

let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
let input_file = self.file_io.new_input(metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location.clone())
.metadata(metadata)
.identifier(table_ident.clone())
.build()
self.load_table_helper(table_ident, metadata_location).await
}

/// Drop a table from the catalog.
Expand Down Expand Up @@ -277,11 +317,53 @@ impl Catalog for MemoryCatalog {
}

/// Update a table to the catalog.
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"MemoryCatalog does not currently support updating tables.",
))
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
let mut root_namespace_state = self.root_namespace_state.lock().await;

let table_ident = commit.identifier().clone();
let base_metadata_location =
root_namespace_state.get_existing_table_location(&table_ident)?;
let base_table = self
.load_table_helper(&table_ident, base_metadata_location)
.await?;
let base_metadata = base_table.metadata();

let requirements = commit.take_requirements();
for requirement in &requirements {
requirement.check(Some(base_metadata))?;
}

let mut metadata_builder = base_metadata
.clone()
.into_builder(Some(base_metadata_location.clone()));
for update in commit.take_updates() {
metadata_builder = update.clone().apply(metadata_builder)?;
}
if !metadata_builder.has_changes() {
// updates did not change the table metadata, no need to write a new version
return Ok(base_table);
}

let new_metadata = metadata_builder.build()?.metadata;
let new_metadata_location = format!(
"{}/metadata/{}-{}.metadata.json",
&base_metadata.location(),
Self::extract_version(base_metadata_location)? + 1,
Uuid::new_v4()
);
self.file_io
.new_output(&new_metadata_location)?
.write(serde_json::to_vec(&new_metadata)?.into())
.await?;
root_namespace_state
.update_existing_table_location(&table_ident, new_metadata_location.clone())?;

Table::builder()
.file_io(self.file_io.clone())
.metadata_location(new_metadata_location)
.metadata(new_metadata)
.identifier(table_ident.clone())
.build()
}
}

Expand All @@ -292,7 +374,10 @@ mod tests {
use std::iter::FromIterator;

use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use iceberg::spec::{
FormatVersion, NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type,
};
use iceberg::transaction::Transaction;
use regex::Regex;
use tempfile::TempDir;

Expand Down Expand Up @@ -335,8 +420,17 @@ mod tests {
.unwrap()
}

async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) {
let _ = catalog
// Useful if you don't care about namespace or table name
async fn setup_simple_table<C: Catalog>(catalog: &C) -> Table {
let namespace_ident = NamespaceIdent::new("a".into());
create_namespace(catalog, &namespace_ident).await;
let table_name = "tbl1";
let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
create_table(catalog, &table_ident).await
}

async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
catalog
.create_table(
&table_ident.namespace,
TableCreation::builder()
Expand All @@ -345,7 +439,7 @@ mod tests {
.build(),
)
.await
.unwrap();
.unwrap()
}

async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
Expand Down Expand Up @@ -1697,4 +1791,139 @@ mod tests {
),
);
}

#[tokio::test]
async fn test_update_table_no_change() {
let catalog = new_memory_catalog();
let table = setup_simple_table(&catalog).await;

// tables are created in V2 format so the upgrade is a no-op
let updated_table = Transaction::new(&table)
.upgrade_table_version(FormatVersion::V2)
.unwrap()
.commit(&catalog)
.await
.unwrap();

assert_eq!(table.metadata_location(), updated_table.metadata_location());
assert!(table.metadata().metadata_log().is_empty());
}

#[tokio::test]
async fn test_update_set_location() {
let catalog = new_memory_catalog();
let table = setup_simple_table(&catalog).await;

let new_location = format!("{}{}", table.metadata().location(), "updatedLocation");
let updated_table = Transaction::new(&table)
.set_location(new_location.clone())
.unwrap()
.commit(&catalog)
.await
.unwrap();
let metadata_log = updated_table.metadata().metadata_log();

assert_eq!(updated_table.metadata().location(), new_location);
assert_eq!(
metadata_log.first().unwrap().metadata_file,
table.metadata_location().unwrap()
);
}

#[tokio::test]
async fn test_update_then_load() {
let catalog = new_memory_catalog();
let table = setup_simple_table(&catalog).await;

let new_location = format!("{}{}", table.metadata().location(), "updatedLocation");
Transaction::new(&table)
.set_location(new_location.clone())
.unwrap()
.commit(&catalog)
.await
.unwrap();

let updated_table = catalog.load_table(table.identifier()).await.unwrap();
let metadata_log = updated_table.metadata().metadata_log();
assert_eq!(updated_table.metadata().location(), new_location);
assert_eq!(
metadata_log.first().unwrap().metadata_file,
table.metadata_location().unwrap()
);
}

#[tokio::test]
async fn test_multiple_updates_single_call() {
let catalog = new_memory_catalog();
let table = setup_simple_table(&catalog).await;
assert_eq!(
table.metadata().properties().clone(),
HashMap::<String, String>::new()
);

let new_location = format!("{}{}", table.metadata().location(), "updatedLocation");
let properties = HashMap::from([(String::from("a"), String::from("1"))]);
Transaction::new(&table)
.set_location(new_location.clone())
.unwrap()
.set_properties(properties.clone())
.unwrap()
.commit(&catalog)
.await
.unwrap();

let updated_table = catalog.load_table(table.identifier()).await.unwrap();
let metadata_log = updated_table.metadata().metadata_log();
assert_eq!(updated_table.metadata().location(), new_location);
assert_eq!(updated_table.metadata().properties().clone(), properties);
assert_eq!(
metadata_log.first().unwrap().metadata_file,
table.metadata_location().unwrap()
);
}

#[tokio::test]
async fn test_multiple_updates_multiple_calls() {
let catalog = new_memory_catalog();
let table_v1 = setup_simple_table(&catalog).await;
assert_eq!(
table_v1.metadata().properties().clone(),
HashMap::<String, String>::new()
);

let v2_properties = HashMap::from([(String::from("a"), String::from("1"))]);
Transaction::new(&table_v1)
.set_properties(v2_properties.clone())
.unwrap()
.commit(&catalog)
.await
.unwrap();
let table_v2 = catalog.load_table(table_v1.identifier()).await.unwrap();
let metadata_log_v2 = table_v2.metadata().metadata_log();
assert_eq!(table_v2.metadata().properties().clone(), v2_properties);
assert_eq!(
metadata_log_v2.first().unwrap().metadata_file,
table_v1.metadata_location().unwrap()
);

let mut v3_properties = HashMap::from([(String::from("b"), String::from("2"))]);
Transaction::new(&table_v2)
.set_properties(v3_properties.clone())
.unwrap()
.commit(&catalog)
.await
.unwrap();
let table_v3 = catalog.load_table(table_v1.identifier()).await.unwrap();
let metadata_log_v3 = table_v3.metadata().metadata_log();
v3_properties.extend(v2_properties);
assert_eq!(table_v3.metadata().properties().clone(), v3_properties);
assert_eq!(
metadata_log_v3.first().unwrap().metadata_file,
table_v1.metadata_location().unwrap()
);
assert_eq!(
metadata_log_v3.get(1).unwrap().metadata_file,
table_v2.metadata_location().unwrap()
);
}
}
20 changes: 20 additions & 0 deletions crates/catalog/memory/src/namespace_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,26 @@ impl NamespaceState {
}
}

// Updates the metadata location of the given table or throws an error if it doesn't exist
pub(crate) fn update_existing_table_location(
&mut self,
table_ident: &TableIdent,
new_metadata_location: String,
) -> Result<()> {
let namespace = self.get_mut_namespace(table_ident.namespace())?;
if !namespace
.table_metadata_locations
.contains_key(table_ident.name())
{
return no_such_table_err(table_ident);
}
namespace
.table_metadata_locations
.insert(table_ident.name().to_string(), new_metadata_location);

Ok(())
}

// Inserts the given table or returns an error if it already exists
pub(crate) fn insert_new_table(
&mut self,
Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg/src/spec/table_metadata_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ impl TableMetadataBuilder {
)
}

/// Returns whether or not the builder is tracking any changes
pub fn has_changes(&self) -> bool {
!self.changes.is_empty()
}

/// Changes uuid of table metadata.
pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
if self.metadata.table_uuid != uuid {
Expand Down
Loading