diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml
index 3c8792464..c66fe6e43 100644
--- a/crates/catalog/sql/Cargo.toml
+++ b/crates/catalog/sql/Cargo.toml
@@ -37,8 +37,10 @@ typed-builder = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
+arrow-array = { version = "55" }
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
itertools = { workspace = true }
+parquet = "55"
regex = "1.10.5"
sqlx = { version = "0.8.1", features = [
"tls-rustls",
diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs
index 78767c5b5..6067f095f 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -23,8 +23,7 @@ use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
- Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
- TableIdent,
+ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
};
use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers};
use sqlx::{Any, AnyPool, Row, Transaction};
@@ -769,11 +768,91 @@ impl Catalog for SqlCatalog {
Ok(())
}
- async fn update_table(&self, _commit: TableCommit) -> Result
{
- Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Updating a table is not supported yet",
- ))
+ async fn update_table(&self, mut commit: TableCommit) -> Result {
+ let table_ident = commit.identifier().clone();
+ if !self.table_exists(&table_ident).await? {
+ return no_such_table_err(&table_ident);
+ }
+ let current = self.load_table(&table_ident).await?;
+
+ // Extract requirements and updates
+ let requirements = commit.take_requirements();
+ let updates = commit.take_updates();
+
+ // Take each requirement and check against metadata.
+ for requirement in requirements {
+ requirement.check(Some(current.metadata()))?;
+ }
+
+ // Take existing metadata and apply each update.
+ let mut updater = TableMetadataBuilder::new_from_metadata(
+ current.metadata().clone(),
+ current.metadata_location().map(|s| s.to_string()),
+ );
+ for update in updates {
+ updater = update.apply(updater)?;
+ }
+ let updated_metadata_build = updater.build()?;
+ let updated_metadata = updated_metadata_build.metadata;
+
+ if updated_metadata == *current.metadata() {
+ println!("No-op commit");
+ return Ok(current);
+ }
+
+ // Generate a new metadata file location
+ let location = current.metadata().location();
+ let timestamp_ms = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap()
+ .as_millis();
+ let new_metadata_location = format!(
+ "{}/metadata/{}-{}.metadata.json",
+ location,
+ timestamp_ms,
+ Uuid::new_v4()
+ );
+
+ // Write the new metadata to the file
+ let file = self.fileio.new_output(&new_metadata_location)?;
+ file.write(serde_json::to_vec(&updated_metadata)?.into())
+ .await?;
+
+ // Update the catalog table with the new metadata location
+ let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?;
+ self.execute(
+ &format!(
+ "UPDATE {CATALOG_TABLE_NAME}
+ SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?,
+ {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
+ WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ AND (
+ {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+ )"
+ ),
+ vec![
+ Some(&new_metadata_location),
+ current.metadata_location(),
+ Some(&self.name),
+ Some(table_ident.name()),
+ Some(&table_ident.namespace().join(".")),
+ ],
+ Some(&mut tx),
+ )
+ .await?;
+
+ tx.commit().await.map_err(from_sqlx_error)?;
+
+ // Return the updated table
+ Ok(Table::builder()
+ .file_io(self.fileio.clone())
+ .identifier(table_ident.clone())
+ .metadata_location(new_metadata_location)
+ .metadata(updated_metadata)
+ .build()?)
}
}
@@ -781,15 +860,26 @@ impl Catalog for SqlCatalog {
mod tests {
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
+ use std::sync::Arc;
+ use arrow_array::{Int32Array, RecordBatch};
use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use iceberg::table::Table;
+ use iceberg::transaction::Transaction;
+ use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
+ use iceberg::writer::file_writer::ParquetWriterBuilder;
+ use iceberg::writer::file_writer::location_generator::{
+ DefaultFileNameGenerator, DefaultLocationGenerator,
+ };
+ use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
use itertools::Itertools;
+ use parquet::file::properties::WriterProperties;
use regex::Regex;
use sqlx::migrate::MigrateDatabase;
use tempfile::TempDir;
+ use uuid::Uuid;
use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY;
use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
@@ -866,6 +956,47 @@ mod tests {
}
}
+ async fn add_rows(
+ table: &Table,
+ num_rows: u32,
+ ) -> Result, Box> {
+ let writer_properties = WriterProperties::builder().build();
+ let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
+ let file_name_generator = DefaultFileNameGenerator::new(
+ "datafile".to_string(),
+ None,
+ iceberg::spec::DataFileFormat::Parquet,
+ );
+ let parquet_writer_builder = ParquetWriterBuilder::new(
+ writer_properties,
+ table.metadata().current_schema().clone(),
+ table.file_io().clone(),
+ location_generator,
+ file_name_generator,
+ );
+ let file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+ let mut data_file_writer = file_writer_builder.build().await.unwrap();
+
+ // Generate num_rows.
+ let schema = simple_table_schema();
+ let mut foo_array = Vec::::with_capacity(num_rows as usize);
+ for i in 0..num_rows {
+ foo_array.push(i as i32);
+ }
+
+ // Package into a single batch and write to Parquet file.
+ let batch = RecordBatch::try_new(
+ Arc::new(iceberg::arrow::schema_to_arrow_schema(&schema).unwrap()),
+ vec![Arc::new(Int32Array::from(foo_array))],
+ )
+ .unwrap();
+ data_file_writer.write(batch).await?;
+
+ let data_files = data_file_writer.close().await?;
+ assert_eq!(data_files.len(), 1);
+ Ok(data_files)
+ }
+
fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
assert_eq!(table.identifier(), expected_table_ident);
@@ -1779,4 +1910,38 @@ mod tests {
"Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
);
}
+
+ #[tokio::test]
+ async fn test_update_table_append() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+ let namespace_ident = NamespaceIdent::new("a".into());
+ create_namespaces(&catalog, &vec![&namespace_ident]).await;
+ let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
+ create_table(&catalog, &table_ident).await;
+
+ assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
+ table_ident.clone()
+ ],);
+
+ let table = &catalog.load_table(&table_ident).await.unwrap();
+
+ // Generate some rows to add to table.
+ let data_files = add_rows(table, 10).await.unwrap();
+
+ // Append to the table in a transaction.
+ let mut transaction = Transaction::new(table);
+ let mut fast_append = transaction
+ .fast_append(Some(Uuid::new_v4()), vec![])
+ .expect("Created");
+ let _ = fast_append.add_data_files(data_files);
+ transaction = fast_append.apply().await.expect("Appended");
+ let _ = transaction.commit(&catalog).await;
+
+ let table_v2 = &catalog.load_table(&table_ident).await.unwrap();
+ assert_eq!(
+ table.metadata().snapshots().len() + 1,
+ table_v2.metadata().snapshots().len()
+ );
+ }
}