From ebeae82dacb9efbf3ef71afaa358752af0f65695 Mon Sep 17 00:00:00 2001 From: Gokul Soundararajan Date: Fri, 16 May 2025 12:54:14 -0700 Subject: [PATCH 1/3] Add support for update_table() function in Sql Catalog - Takes in a TableCommit object and evaluates requirements with respect to current table metadata. - Generates a new metadata file based on updates provided. - Writes the new metadata file and atomically tries to commit into the SQL database. - Added test for a simple append - More tests (needed) - Upgrade table version - Update sort order - Alter schema - Update partition spec --- crates/catalog/sql/Cargo.toml | 2 + crates/catalog/sql/src/catalog.rs | 179 ++++++++++++++++++++++++++++-- 2 files changed, 174 insertions(+), 7 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 3c8792464..c66fe6e43 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -37,8 +37,10 @@ typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } [dev-dependencies] +arrow-array = { version = "55" } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } itertools = { workspace = true } +parquet = "55" regex = "1.10.5" sqlx = { version = "0.8.1", features = [ "tls-rustls", diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 78767c5b5..8dc4a0404 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -23,8 +23,7 @@ use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, - TableIdent, + Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers}; use sqlx::{Any, AnyPool, Row, Transaction}; @@ -769,11 +768,91 @@ impl Catalog for SqlCatalog { Ok(()) } - async fn update_table(&self, _commit: TableCommit) -> Result { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Updating a table is not supported yet", - )) + async fn update_table(&self, mut commit: TableCommit) -> Result
{ + let table_ident = commit.identifier().clone(); + if !self.table_exists(&table_ident).await? { + return no_such_table_err(&table_ident); + } + let current = self.load_table(&table_ident).await?; + + // Extract requirements and updates + let requirements = commit.take_requirements(); + let updates = commit.take_updates(); + + // Take each requirement and check against metadata. + for requirement in requirements { + requirement.check(Some(current.metadata()))?; + } + + // Take existing metadata and apply each update. + let mut updater = TableMetadataBuilder::new_from_metadata( + current.metadata().clone(), + current.metadata_location().map(|s| s.to_string()), + ); + for update in updates { + updater = update.apply(updater)?; + } + let updated_metadata_build = updater.build()?; + let updated_metadata = updated_metadata_build.metadata; + + if updated_metadata == *current.metadata() { + println!("No-op commit"); + return Ok(current); + } + + // Generate a new metadata file location + let location = current.metadata().location(); + let timestamp_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis(); + let new_metadata_location = format!( + "{}/metadata/{}-{}.metadata.json", + location, + timestamp_ms, + Uuid::new_v4() + ); + + // Write the new metadata to the file + let file = self.fileio.new_output(&new_metadata_location)?; + file.write(serde_json::to_vec(&updated_metadata)?.into()) + .await?; + + // Update the catalog table with the new metadata location + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + self.execute( + &format!( + "UPDATE {CATALOG_TABLE_NAME} + SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, + {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + vec![ + Some(&new_metadata_location), + current.metadata_location(), + Some(&self.name), + Some(table_ident.name()), + Some(&table_ident.namespace().join(".")), + ], + Some(&mut tx), + ) + .await?; + + tx.commit().await.map_err(from_sqlx_error)?; + + // Return the updated table + Ok(Table::builder() + .file_io(self.fileio.clone()) + .identifier(table_ident.clone()) + .metadata_location(new_metadata_location) + .metadata(updated_metadata) + .build()?) } } @@ -781,15 +860,26 @@ impl Catalog for SqlCatalog { mod tests { use std::collections::{HashMap, HashSet}; use std::hash::Hash; + use std::sync::Arc; + use arrow_array::{Int32Array, RecordBatch}; use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; use iceberg::table::Table; + use iceberg::transaction::Transaction; + use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use iceberg::writer::file_writer::ParquetWriterBuilder; + use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; + use parquet::file::properties::WriterProperties; use regex::Regex; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; + use uuid::Uuid; use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY; use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; @@ -866,6 +956,47 @@ mod tests { } } + async fn add_rows( + table: &Table, + num_rows: u32, + ) -> Result, Box> { + let writer_properties = WriterProperties::builder().build(); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "datafile".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + writer_properties, + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator, + file_name_generator, + ); + let file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + let mut data_file_writer = file_writer_builder.build().await.unwrap(); + + // Generate num_rows. + let schema = simple_table_schema(); + let mut foo_array = Vec::::with_capacity(num_rows as usize); + for i in 0..num_rows { + foo_array.push(i as i32); + } + + // Package into a single batch and write to Parquet file. + let batch = RecordBatch::try_new( + Arc::new(iceberg::arrow::schema_to_arrow_schema(&schema).unwrap()), + vec![Arc::new(Int32Array::from(foo_array))], + ) + .unwrap(); + data_file_writer.write(batch).await?; + + let data_files = data_file_writer.close().await?; + assert_eq!(data_files.len(), 1); + Ok(data_files) + } + fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) { assert_eq!(table.identifier(), expected_table_ident); @@ -1779,4 +1910,38 @@ mod tests { "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }" ); } + + #[tokio::test] + async fn test_update_table_append() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespaces(&catalog, &vec![&namespace_ident]).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![ + table_ident.clone() + ],); + + let table = &catalog.load_table(&table_ident).await.unwrap(); + + // Generate some rows to add to table. + let data_files = add_rows(&table, 10).await.unwrap(); + + // Append to the table in a transaction. + let mut transaction = Transaction::new(&table); + let mut fast_append = transaction + .fast_append(Some(Uuid::new_v4()), vec![]) + .expect("Created"); + let _ = fast_append.add_data_files(data_files); + transaction = fast_append.apply().await.expect("Appended"); + let _ = transaction.commit(&catalog).await; + + let table_v2 = &catalog.load_table(&table_ident).await.unwrap(); + assert_eq!( + table.metadata().snapshots().len() + 1, + table_v2.metadata().snapshots().len() + ); + } } From 7374c8beac466f7e8c1f1660ce589d60aadaac6d Mon Sep 17 00:00:00 2001 From: Gokul Soundararajan Date: Tue, 20 May 2025 09:29:32 -0700 Subject: [PATCH 2/3] fix format Address CI error for formatting --- crates/catalog/sql/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 8dc4a0404..5bbd07555 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -868,10 +868,10 @@ mod tests { use iceberg::table::Table; use iceberg::transaction::Transaction; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; - use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; From 36cedf27bcd67f88a32a5c74b171cd7090097e45 Mon Sep 17 00:00:00 2001 From: Gokul Soundararajan Date: Tue, 20 May 2025 09:29:32 -0700 Subject: [PATCH 3/3] fix format/clippy errors Address CI error for formatting Address clippy error --- crates/catalog/sql/src/catalog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 5bbd07555..6067f095f 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -1927,10 +1927,10 @@ mod tests { let table = &catalog.load_table(&table_ident).await.unwrap(); // Generate some rows to add to table. - let data_files = add_rows(&table, 10).await.unwrap(); + let data_files = add_rows(table, 10).await.unwrap(); // Append to the table in a transaction. - let mut transaction = Transaction::new(&table); + let mut transaction = Transaction::new(table); let mut fast_append = transaction .fast_append(Some(Uuid::new_v4()), vec![]) .expect("Created");