diff --git a/Cargo.lock b/Cargo.lock index 28dd83775..06f3782b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1029,9 +1029,9 @@ dependencies = [ [[package]] name = "backon" -version = "1.3.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7" +checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7" dependencies = [ "fastrand", "gloo-timers", @@ -3490,6 +3490,7 @@ dependencies = [ "arrow-string", "async-std", "async-trait", + "backon", "base64 0.22.1", "bimap", "bytes", @@ -3510,6 +3511,7 @@ dependencies = [ "parquet", "pretty_assertions", "rand 0.8.5", + "regex", "reqwest", "roaring", "rust_decimal", diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 7454de126..370749dcf 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -283,6 +283,44 @@ impl Catalog for MemoryCatalog { "MemoryCatalog does not currently support updating tables.", )) } + + // async fn commit_table(&self, base: &Table, current: Table) -> Result { + // if base.metadata() == current.metadata() { + // // no change + // return Ok(current); + // } + // + // let mut root_namespace_state = self.root_namespace_state.lock().await; + // // TODO: caller needs to retry on the error below + // let _ = root_namespace_state + // .check_metadata_location(base.identifier(), base.metadata_location())?; + // + // let next_metadata_version = if let Some(base_metadata_location) = base.metadata_location() { + // self.parse_metadata_version(base_metadata_location) + 1 + // } else { + // 0 + // }; + // + // // write metadata + // let metadata_location = format!( + // "{}/metadata/{}-{}.metadata.json", + // current.metadata().location(), + // next_metadata_version, + // Uuid::new_v4() + // ); + // + // // TODO instead of using current.metadata(), build a new metadata with some properties like last_updated_ms updated + // self.file_io + // .new_output(&metadata_location)? + // .write(serde_json::to_vec(current.metadata())?.into()) + // .await?; + // + // root_namespace_state + // .update_existing_table_location(current.identifier(), current.metadata_location())?; + // + // // TODO same here, need to update the metadata location + // Ok(current) + // } } #[cfg(test)] diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs index e324e7a3d..547c6bcd0 100644 --- a/crates/catalog/memory/src/namespace_state.rs +++ b/crates/catalog/memory/src/namespace_state.rs @@ -262,6 +262,45 @@ impl NamespaceState { } } + pub(crate) fn check_metadata_location( + &self, + table_ident: &TableIdent, + metadata_location: Option<&str>, + ) -> Result<()> { + let namespace = self.get_namespace(table_ident.namespace())?; + + if namespace + .table_metadata_locations + .get(table_ident.name()) + .map(|s| s.as_str()) + != metadata_location + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Metadata location does not match for table: {table_ident}!"), + )); + } + + Ok(()) + } + + pub(crate) fn update_existing_table_location( + &mut self, + table_ident: &TableIdent, + new_metadata_location: Option<&str>, + ) -> Result<()> { + if new_metadata_location.is_none() { + return Ok(()); + } + + let namespace = self.get_mut_namespace(table_ident.namespace())?; + namespace + .table_metadata_locations + .entry(table_ident.name().to_string()) + .insert_entry(new_metadata_location.unwrap().to_string()); + Ok(()) + } + // Inserts the given table or returns an error if it already exists pub(crate) fn insert_new_table( &mut self, diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 851819069..5d70b5fff 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -2125,10 +2125,10 @@ mod tests { .unwrap() }; - let table = Transaction::new(&table1) + let table = Transaction::new(table1) .upgrade_table_version(FormatVersion::V2) .unwrap() - .commit(&catalog) + .commit(Arc::new(&catalog)) .await .unwrap(); @@ -2250,10 +2250,10 @@ mod tests { .unwrap() }; - let table_result = Transaction::new(&table1) + let table_result = Transaction::new(table1) .upgrade_table_version(FormatVersion::V2) .unwrap() - .commit(&catalog) + .commit(Arc::new(&catalog)) .await; assert!(table_result.is_err()); diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index ab7ea3d62..ddf574d5d 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use ctor::{ctor, dtor}; use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type}; @@ -347,10 +347,10 @@ async fn test_update_table() { ); // Update table by committing transaction - let table2 = Transaction::new(&table) + let table2 = Transaction::new(table) .set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())])) .unwrap() - .commit(&catalog) + .commit(Arc::new(&catalog)) .await .unwrap(); diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 3324a4e9e..cd0fedc70 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -55,6 +55,7 @@ arrow-select = { workspace = true } arrow-string = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } async-trait = { workspace = true } +backon = { version = "1.5.1"} base64 = { workspace = true } bimap = { workspace = true } bytes = { workspace = true } @@ -86,6 +87,7 @@ typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } zstd = { workspace = true } +regex = "1.11.1" [dev-dependencies] ctor = { workspace = true } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 3457f8361..7c30d29aa 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::mem::take; use std::ops::Deref; +use std::sync::Arc; use _serde::deserialize_snapshot; use async_trait::async_trait; @@ -300,6 +301,27 @@ impl TableCommit { pub fn take_updates(&mut self) -> Vec { take(&mut self.updates) } + + /// Apply updates to a table + pub fn apply(&mut self, mut table: Table) -> Result
{ + // 1. check requirements + let requirements = self.take_requirements(); + for requirement in requirements { + requirement.check(Some(table.metadata()))?; + } + + // 2. Apply updates to metadata builder + let mut metadata_builder = table.metadata().clone().into_builder(None); + + let updates = self.take_updates(); + for update in updates { + metadata_builder = update.apply(metadata_builder)?; + } + + table.with_metadata(Arc::new(metadata_builder.build()?.metadata)); + + Ok(table) + } } /// TableRequirement represents a requirement for a table in the catalog. diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 37529ee6f..07fd26413 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -39,7 +39,7 @@ pub enum ErrorKind { /// Iceberg data is invalid. /// /// This error is returned when we try to read a table from iceberg but - /// failed to parse it's metadata or data file correctly. + /// failed to parse its metadata or data file correctly. /// /// The table could be invalid or corrupted. DataInvalid, diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index fcd510920..68aac252c 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use _serde::TableMetadataEnum; use chrono::{DateTime, Utc}; +use regex::Regex; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; @@ -40,6 +41,7 @@ use crate::error::{Result, timestamp_ms_to_utc}; use crate::{Error, ErrorKind}; static MAIN_BRANCH: &str = "main"; +const TABLE_METADATA_FILE_NAME_REGEX: &str = r"(\d+)-([\w-]{36})(?:\.\w+)?\.metadata\.json"; pub(crate) static ONE_MINUTE_MS: i64 = 60_000; pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1; @@ -432,6 +434,30 @@ impl TableMetadata { self.encryption_keys.get(key_id) } + /// Parse metadata version and uuid from metadata filename + fn parse_metadata_filename(metadata_location: &str) -> Result<(i32, Uuid)> { + if let Some(metadata_file_name) = metadata_location.split('/').last() { + let re = Regex::new(TABLE_METADATA_FILE_NAME_REGEX) + .expect("Failed to parse regex for metadata file!"); + if let Some(caps) = re.captures(metadata_file_name) { + let metadata_version_str = &caps[1]; + let uuid_str = &caps[2]; + + let metadata_version = metadata_version_str + .parse() + .expect(format!("Invalid metadata version: {metadata_version_str}").as_str()); + let uuid = Uuid::parse_str(uuid_str)?; + + return Ok((metadata_version, uuid)); + } + } + + Err(Error::new( + ErrorKind::Unexpected, + format!("Unrecognizable metadata location: {metadata_location}"), + )) + } + /// Normalize this partition spec. /// /// This is an internal method diff --git a/crates/iceberg/src/transaction/action/mod.rs b/crates/iceberg/src/transaction/action/mod.rs new file mode 100644 index 000000000..cda0c21a7 --- /dev/null +++ b/crates/iceberg/src/transaction/action/mod.rs @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; +use std::mem::take; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::TableUpdate::UpgradeFormatVersion; +use crate::spec::FormatVersion; +use crate::table::Table; +use crate::transaction::Transaction; +use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate}; + +/// TODO doc +pub type BoxedTransactionAction = Arc; + +/// TODO doc +#[async_trait] +pub trait TransactionAction: Sync + Send { + /// Commit the changes and apply the changes to the transaction + async fn commit(self: Arc, table: &Table) -> Result; +} + +/// TODO doc +pub trait ApplyTransactionAction { + /// TODO doc + fn apply(self, tx: Transaction) -> Result; +} + +impl ApplyTransactionAction for T { + fn apply(self, mut tx: Transaction) -> Result + where Self: Sized { + tx.actions.push(Arc::new(self)); + Ok(tx) + } +} + +/// TODO doc +pub struct ActionCommit { + updates: Vec, + requirements: Vec, +} + +/// TODO doc +impl ActionCommit { + /// TODO doc + pub fn new(updates: Vec, requirements: Vec) -> Self { + Self { + updates, + requirements, + } + } + + /// TODO doc + pub fn take_updates(&mut self) -> Vec { + take(&mut self.updates) + } + + /// TODO doc + pub fn take_requirements(&mut self) -> Vec { + take(&mut self.requirements) + } +} + +/// TODO doc +pub struct UpdateLocationAction { + location: Option, +} + +impl UpdateLocationAction { + /// TODO doc + pub fn new() -> Self { + UpdateLocationAction { location: None } + } + + /// TODO doc + pub fn set_location(mut self, location: String) -> Self { + self.location = Some(location); + self + } +} + +#[async_trait] +impl TransactionAction for UpdateLocationAction { + async fn commit(self: Arc, _table: &Table) -> Result { + let updates: Vec; + let requirements: Vec; + if let Some(location) = self.location.clone() { + updates = vec![TableUpdate::SetLocation { location }]; + requirements = vec![]; + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Location is not set for UpdateLocationAction!", + )); + } + + Ok(ActionCommit::new(updates, requirements)) + } +} + +/// TODO doc +pub struct UpgradeFormatVersionAction { + format_version: Option, +} + +impl UpgradeFormatVersionAction { + /// TODO doc + pub fn new() -> Self { + UpgradeFormatVersionAction { + format_version: None, + } + } + + /// TODO doc + pub fn set_format_version(mut self, format_version: FormatVersion) -> Self { + self.format_version = Some(format_version); + self + } +} + +#[async_trait] +impl TransactionAction for UpgradeFormatVersionAction { + async fn commit(self: Arc, table: &Table) -> Result { + let current_version = table.metadata().format_version(); + let updates: Vec; + let requirements: Vec; + + if let Some(format_version) = self.format_version { + match current_version.cmp(&format_version) { + Ordering::Greater => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot downgrade table version from {} to {}", + current_version, format_version + ), + )); + } + Ordering::Less => { + updates = vec![UpgradeFormatVersion { format_version }]; + requirements = vec![]; + } + Ordering::Equal => { + // do nothing + updates = vec![]; + requirements = vec![]; + } + } + } else { + // error + return Err(Error::new( + ErrorKind::DataInvalid, + "FormatVersion is not set for UpgradeFormatVersionAction!", + )); + } + + Ok(ActionCommit::new(updates, requirements)) + } +} + +/// TODO doc +pub struct UpdatePropertiesAction { + updates: HashMap, + removals: HashSet, +} + +impl UpdatePropertiesAction { + /// TODO doc + pub fn new() -> Self { + UpdatePropertiesAction { + updates: HashMap::default(), + removals: HashSet::default(), + } + } + + /// TODO doc + pub fn set(mut self, key: String, value: String) -> Self { + assert!(!self.removals.contains(&key)); + self.updates.insert(key, value); + self + } + + /// TODO doc + pub fn remove(mut self, key: String) -> Self { + assert!(!self.updates.contains_key(&key)); + self.removals.insert(key); + self + } +} + +#[async_trait] +impl TransactionAction for UpdatePropertiesAction { + async fn commit(self: Arc, _table: &Table) -> Result { + let updates: Vec = vec![ + TableUpdate::SetProperties { + updates: self.updates.clone(), + }, + TableUpdate::RemoveProperties { + removals: self.removals.clone().into_iter().collect::>(), + }, + ]; + let requirements: Vec = vec![]; + + Ok(ActionCommit::new(updates, requirements)) + } +} diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index d3b3cb2e3..1a930d9ca 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -16,14 +16,17 @@ // under the License. use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use arrow_array::StringArray; +use async_trait::async_trait; use futures::TryStreamExt; use uuid::Uuid; use crate::error::Result; use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; -use crate::transaction::Transaction; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; use crate::transaction::snapshot::{ DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation, }; @@ -31,15 +34,17 @@ use crate::writer::file_writer::ParquetWriter; use crate::{Error, ErrorKind}; /// FastAppendAction is a transaction action for fast append data files to the table. -pub struct FastAppendAction<'a> { - snapshot_produce_action: SnapshotProduceAction<'a>, +pub struct FastAppendAction { + snapshot_produce_action: SnapshotProduceAction, check_duplicate: bool, + // below are properties used to create SnapshotProduceAction when commit + snapshot_properties: HashMap, + pub added_data_files: Vec, } -impl<'a> FastAppendAction<'a> { +impl FastAppendAction { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - tx: Transaction<'a>, snapshot_id: i64, commit_uuid: Uuid, key_metadata: Vec, @@ -47,13 +52,14 @@ impl<'a> FastAppendAction<'a> { ) -> Result { Ok(Self { snapshot_produce_action: SnapshotProduceAction::new( - tx, snapshot_id, key_metadata, commit_uuid, - snapshot_properties, + snapshot_properties.clone(), )?, check_duplicate: true, + snapshot_properties, + added_data_files: vec![], }) } @@ -68,7 +74,17 @@ impl<'a> FastAppendAction<'a> { &mut self, data_files: impl IntoIterator, ) -> Result<&mut Self> { - self.snapshot_produce_action.add_data_files(data_files)?; + let data_files: Vec = data_files.into_iter().collect(); + for data_file in &data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + } + self.added_data_files.extend(data_files); + Ok(self) } @@ -77,8 +93,7 @@ impl<'a> FastAppendAction<'a> { &mut self, snapshot_properties: HashMap, ) -> Result<&mut Self> { - self.snapshot_produce_action - .set_snapshot_properties(snapshot_properties)?; + self.snapshot_properties = snapshot_properties; Ok(self) } @@ -89,54 +104,44 @@ impl<'a> FastAppendAction<'a> { /// Specifically, schema compatibility checks and support for adding to partitioned tables /// have not yet been implemented. #[allow(dead_code)] - async fn add_parquet_files(mut self, file_path: Vec) -> Result> { - if !self - .snapshot_produce_action - .tx - .current_table - .metadata() - .default_spec - .is_unpartitioned() - { + async fn add_parquet_files(mut self, table: &Table, file_path: Vec) -> Result<()> { + if !table.metadata().default_spec.is_unpartitioned() { return Err(Error::new( ErrorKind::FeatureUnsupported, "Appending to partitioned tables is not supported", )); } - let table_metadata = self.snapshot_produce_action.tx.current_table.metadata(); + let table_metadata = table.metadata(); - let data_files = ParquetWriter::parquet_files_to_data_files( - self.snapshot_produce_action.tx.current_table.file_io(), - file_path, - table_metadata, - ) - .await?; + let data_files = + ParquetWriter::parquet_files_to_data_files(table.file_io(), file_path, table_metadata) + .await?; self.add_data_files(data_files)?; - self.apply().await + Ok(()) } +} + +#[async_trait] +impl TransactionAction for FastAppendAction { + async fn commit(self: Arc, table: &Table) -> Result { + let mut snapshot_produce_action = self.snapshot_produce_action.clone(); + + let snapshot_produce_action = snapshot_produce_action + .add_data_files(table, self.added_data_files.clone())? + .set_snapshot_properties(self.snapshot_properties.clone())?; - /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result> { // Checks duplicate files if self.check_duplicate { - let new_files: HashSet<&str> = self - .snapshot_produce_action + let new_files: HashSet<&str> = snapshot_produce_action .added_data_files .iter() .map(|df| df.file_path.as_str()) .collect(); - let mut manifest_stream = self - .snapshot_produce_action - .tx - .current_table - .inspect() - .manifests() - .scan() - .await?; + let mut manifest_stream = table.inspect().manifests().scan().await?; let mut referenced_files = Vec::new(); while let Some(batch) = manifest_stream.try_next().await? { @@ -170,8 +175,8 @@ impl<'a> FastAppendAction<'a> { } } - self.snapshot_produce_action - .apply(FastAppendOperation, DefaultManifestProcess) + snapshot_produce_action + .apply(table, FastAppendOperation, DefaultManifestProcess) .await } } @@ -185,29 +190,22 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn delete_entries( &self, - _snapshot_produce: &SnapshotProduceAction<'_>, + _snapshot_produce: &SnapshotProduceAction, ) -> Result> { Ok(vec![]) } async fn existing_manifest( &self, - snapshot_produce: &SnapshotProduceAction<'_>, + table: &Table, + _snapshot_produce: &SnapshotProduceAction, ) -> Result> { - let Some(snapshot) = snapshot_produce - .tx - .current_table - .metadata() - .current_snapshot() - else { + let Some(snapshot) = table.metadata().current_snapshot() else { return Ok(vec![]); }; let manifest_list = snapshot - .load_manifest_list( - snapshot_produce.tx.current_table.file_io(), - &snapshot_produce.tx.current_table.metadata_ref(), - ) + .load_manifest_list(table.file_io(), &table.metadata_ref()) .await?; Ok(manifest_list @@ -222,28 +220,30 @@ impl SnapshotProduceOperation for FastAppendOperation { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; use crate::scan::tests::TableTestFixture; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct, }; use crate::transaction::Transaction; + use crate::transaction::action::TransactionAction; use crate::transaction::tests::make_v2_minimal_table; use crate::{TableRequirement, TableUpdate}; #[tokio::test] async fn test_empty_data_append_action() { let table = make_v2_minimal_table(); - let tx = Transaction::new(&table); + let mut tx = Transaction::new(table); let mut action = tx.fast_append(None, vec![]).unwrap(); action.add_data_files(vec![]).unwrap(); - assert!(action.apply().await.is_err()); + assert!(Arc::new(action).commit(&table).await.is_err()); } #[tokio::test] async fn test_set_snapshot_properties() { let table = make_v2_minimal_table(); - let tx = Transaction::new(&table); + let mut tx = Transaction::new(table.clone()); let mut action = tx.fast_append(None, vec![]).unwrap(); let mut snapshot_properties = HashMap::new(); @@ -260,7 +260,7 @@ mod tests { .build() .unwrap(); action.add_data_files(vec![data_file]).unwrap(); - let tx = action.apply().await.unwrap(); + Arc::new(action).commit(&table).await.unwrap(); // Check customized properties is contained in snapshot summary properties. let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { @@ -281,7 +281,7 @@ mod tests { #[tokio::test] async fn test_fast_append_action() { let table = make_v2_minimal_table(); - let tx = Transaction::new(&table); + let mut tx = Transaction::new(table.clone()); let mut action = tx.fast_append(None, vec![]).unwrap(); // check add data file with incompatible partition value @@ -308,7 +308,7 @@ mod tests { .build() .unwrap(); action.add_data_files(vec![data_file.clone()]).unwrap(); - let tx = action.apply().await.unwrap(); + Arc::new(action).commit(&tx.current_table).await.unwrap(); // check updates and requirements assert!( @@ -367,7 +367,7 @@ mod tests { async fn test_add_existing_parquet_files_to_unpartitioned_table() { let mut fixture = TableTestFixture::new_unpartitioned(); fixture.setup_unpartitioned_manifest_files().await; - let tx = crate::transaction::Transaction::new(&fixture.table); + let mut tx = Transaction::new(fixture.table.clone()); let file_paths = vec![ format!("{}/1.parquet", &fixture.table_location), @@ -378,14 +378,14 @@ mod tests { let fast_append_action = tx.fast_append(None, vec![]).unwrap(); // Attempt to add the existing Parquet files with fast append. - let new_tx = fast_append_action - .add_parquet_files(file_paths.clone()) + fast_append_action + .add_parquet_files(&tx.base_table, file_paths.clone()) .await .expect("Adding existing Parquet files should succeed"); let mut found_add_snapshot = false; let mut found_set_snapshot_ref = false; - for update in new_tx.updates.iter() { + for update in tx.updates.iter() { match update { TableUpdate::AddSnapshot { .. } => { found_add_snapshot = true; @@ -404,7 +404,7 @@ mod tests { assert!(found_add_snapshot); assert!(found_set_snapshot_ref); - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] { + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { snapshot } else { panic!("Expected the first update to be an AddSnapshot update"); diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index ba79d60bb..6a666d51a 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -17,79 +17,83 @@ //! This module contains transaction api. +/// TODO doc +pub mod action; mod append; mod snapshot; mod sort_order; -use std::cmp::Ordering; use std::collections::HashMap; use std::mem::discriminant; use std::sync::Arc; +use std::time::Duration; +use backon::{BackoffBuilder, ExponentialBuilder, RetryableWithContext}; use uuid::Uuid; -use crate::TableUpdate::UpgradeFormatVersion; use crate::error::Result; -use crate::spec::FormatVersion; use crate::table::Table; +use crate::transaction::action::{ + ApplyTransactionAction, BoxedTransactionAction, UpdateLocationAction, UpdatePropertiesAction, + UpgradeFormatVersionAction, +}; use crate::transaction::append::FastAppendAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; /// Table transaction. -pub struct Transaction<'a> { - base_table: &'a Table, - current_table: Table, - updates: Vec, - requirements: Vec, +#[derive(Clone)] +pub struct Transaction { + base_table: Table, + actions: Vec, } -impl<'a> Transaction<'a> { +impl Transaction { /// Creates a new transaction. - pub fn new(table: &'a Table) -> Self { + pub fn new(table: Table) -> Self { Self { - base_table: table, - current_table: table.clone(), - updates: vec![], - requirements: vec![], + base_table: table.clone(), + actions: vec![], } } - fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { - let mut metadata_builder = self.current_table.metadata().clone().into_builder(None); + fn update_table_metadata(&self, table: &mut Table, updates: &[TableUpdate]) -> Result<()> { + let mut metadata_builder = table.metadata().clone().into_builder(None); for update in updates { metadata_builder = update.clone().apply(metadata_builder)?; } - self.current_table - .with_metadata(Arc::new(metadata_builder.build()?.metadata)); + table.with_metadata(Arc::new(metadata_builder.build()?.metadata)); Ok(()) } - fn apply( - &mut self, + /// TODO documentation + pub fn apply( + &self, + table: &mut Table, updates: Vec, requirements: Vec, + existing_updates: &mut Vec, + existing_requirements: &mut Vec, ) -> Result<()> { for requirement in &requirements { - requirement.check(Some(self.current_table.metadata()))?; + requirement.check(Some(table.metadata()))?; } - self.update_table_metadata(&updates)?; + self.update_table_metadata(table, &updates)?; - self.updates.extend(updates); + existing_updates.extend(updates); // For the requirements, it does not make sense to add a requirement more than once // For example, you cannot assert that the current schema has two different IDs for new_requirement in requirements { - if self - .requirements + if existing_requirements .iter() .map(discriminant) .all(|d| d != discriminant(&new_requirement)) { - self.requirements.push(new_requirement); + existing_requirements.push(new_requirement); } } @@ -100,32 +104,13 @@ impl<'a> Transaction<'a> { } /// Sets table to a new version. - pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { - let current_version = self.current_table.metadata().format_version(); - match current_version.cmp(&format_version) { - Ordering::Greater => { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot downgrade table version from {} to {}", - current_version, format_version - ), - )); - } - Ordering::Less => { - self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?; - } - Ordering::Equal => { - // Do nothing. - } - } - Ok(self) + pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction { + UpgradeFormatVersionAction::new() } /// Update table's property. - pub fn set_properties(mut self, props: HashMap) -> Result { - self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?; - Ok(self) + pub fn update_properties(&self) -> UpdatePropertiesAction { + UpdatePropertiesAction::new() } fn generate_unique_snapshot_id(&self) -> i64 { @@ -140,7 +125,7 @@ impl<'a> Transaction<'a> { }; let mut snapshot_id = generate_random_id(); while self - .current_table + .base_table .metadata() .snapshots() .any(|s| s.snapshot_id() == snapshot_id) @@ -152,13 +137,12 @@ impl<'a> Transaction<'a> { /// Creates a fast append action. pub fn fast_append( - self, + &mut self, commit_uuid: Option, key_metadata: Vec, - ) -> Result> { + ) -> Result { let snapshot_id = self.generate_unique_snapshot_id(); FastAppendAction::new( - self, snapshot_id, commit_uuid.unwrap_or_else(Uuid::now_v7), key_metadata, @@ -167,34 +151,83 @@ impl<'a> Transaction<'a> { } /// Creates replace sort order action. - pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> { + pub fn replace_sort_order(self) -> ReplaceSortOrderAction { ReplaceSortOrderAction { - tx: self, sort_fields: vec![], } } - /// Remove properties in table. - pub fn remove_properties(mut self, keys: Vec) -> Result { - self.apply( - vec![TableUpdate::RemoveProperties { removals: keys }], - vec![], - )?; - Ok(self) - } - /// Set the location of table - pub fn set_location(mut self, location: String) -> Result { - self.apply(vec![TableUpdate::SetLocation { location }], vec![])?; - Ok(self) + pub fn update_location(&mut self) -> UpdateLocationAction { + UpdateLocationAction::new() } /// Commit transaction. - pub async fn commit(self, catalog: &dyn Catalog) -> Result
{ + pub async fn commit(&mut self, catalog: Arc<&dyn Catalog>) -> Result
{ + if self.actions.is_empty() { + // nothing to commit + return Ok(self.base_table.clone()); + } + + let tx = self.clone(); + (|mut tx: Transaction| async { + let result = tx.do_commit(catalog.clone()).await; + (tx, result) + }) + .retry( + ExponentialBuilder::new() + // TODO retry strategy should be configurable + .with_min_delay(Duration::from_millis(100)) + .with_max_delay(Duration::from_millis(60 * 1000)) + .with_total_delay(Some(Duration::from_millis(30 * 60 * 1000))) + .with_max_times(4) + .with_factor(2.0) + .build(), + ) + .context(tx) + .sleep(tokio::time::sleep) + // todo use a specific commit failure + .when(|e| e.kind() == ErrorKind::DataInvalid) + .await + .1 + } + + async fn do_commit(&mut self, catalog: Arc<&dyn Catalog>) -> Result
{ + let base_table_identifier = self.base_table.identifier().to_owned(); + + let refreshed = catalog + .load_table(&base_table_identifier.clone()) + .await + .expect(format!("Failed to refresh table {}", base_table_identifier).as_str()); + + let mut existing_updates: Vec = vec![]; + let mut existing_requirements: Vec = vec![]; + + if self.base_table.metadata() != refreshed.metadata() + || self.base_table.metadata_location() != refreshed.metadata_location() + { + // current base is stale, use refreshed as base and re-apply transaction actions + self.base_table = refreshed.clone(); + } + + let mut current_table = self.base_table.clone(); + + for action in self.actions.clone() { + let mut action_commit = action.commit(¤t_table).await?; + // apply changes to current_table + self.apply( + &mut current_table, + action_commit.take_updates(), + action_commit.take_requirements(), + &mut existing_updates, + &mut existing_requirements, + )?; + } + let table_commit = TableCommit::builder() - .ident(self.base_table.identifier().clone()) - .updates(self.updates) - .requirements(self.requirements) + .ident(base_table_identifier.clone()) + .updates(existing_updates) + .requirements(existing_requirements) .build(); catalog.update_table(table_commit).await @@ -206,11 +239,13 @@ mod tests { use std::collections::HashMap; use std::fs::File; use std::io::BufReader; + use std::sync::Arc; use crate::io::FileIOBuilder; use crate::spec::{FormatVersion, TableMetadata}; use crate::table::Table; use crate::transaction::Transaction; + use crate::transaction::action::TransactionAction; use crate::{TableIdent, TableUpdate}; fn make_v1_table() -> Table { @@ -273,7 +308,7 @@ mod tests { #[test] fn test_upgrade_table_version_v1_to_v2() { let table = make_v1_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); assert_eq!( @@ -287,7 +322,7 @@ mod tests { #[test] fn test_upgrade_table_version_v2_to_v2() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); assert!( @@ -303,7 +338,7 @@ mod tests { #[test] fn test_downgrade_table_version() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx.upgrade_table_version(FormatVersion::V1); assert!(tx.is_err(), "Downgrade table version should fail!"); @@ -312,7 +347,7 @@ mod tests { #[test] fn test_set_table_property() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx .set_properties(HashMap::from([("a".to_string(), "b".to_string())])) .unwrap(); @@ -328,7 +363,7 @@ mod tests { #[test] fn test_remove_property() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx .remove_properties(vec!["a".to_string(), "b".to_string()]) .unwrap(); @@ -341,13 +376,16 @@ mod tests { ); } - #[test] - fn test_set_location() { + #[tokio::test] + async fn test_set_location() { let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .set_location(String::from("s3://bucket/prefix/new_table")) - .unwrap(); + let mut tx = Transaction::new(table); + let update_location_action = tx + .update_location() + .unwrap() + .set_location(String::from("s3://bucket/prefix/new_table")); + + let _res = Arc::new(update_location_action).commit(&mut tx).await; assert_eq!( vec![TableUpdate::SetLocation { @@ -360,7 +398,7 @@ mod tests { #[tokio::test] async fn test_transaction_apply_upgrade() { let table = make_v1_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); // Upgrade v1 to v1, do nothing. let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap(); // Upgrade v1 to v2, success. diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 012fb52bb..a9217835b 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -29,7 +29,8 @@ use crate::spec::{ PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, }; -use crate::transaction::Transaction; +use crate::table::Table; +use crate::transaction::action::ActionCommit; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; @@ -43,6 +44,7 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { ) -> impl Future>> + Send; fn existing_manifest( &self, + table: &Table, snapshot_produce: &SnapshotProduceAction, ) -> impl Future>> + Send; } @@ -59,8 +61,8 @@ pub(crate) trait ManifestProcess: Send + Sync { fn process_manifests(&self, manifests: Vec) -> Vec; } -pub(crate) struct SnapshotProduceAction<'a> { - pub tx: Transaction<'a>, +#[derive(Clone)] +pub(crate) struct SnapshotProduceAction { snapshot_id: i64, key_metadata: Vec, commit_uuid: Uuid, @@ -72,16 +74,14 @@ pub(crate) struct SnapshotProduceAction<'a> { manifest_counter: RangeFrom, } -impl<'a> SnapshotProduceAction<'a> { +impl SnapshotProduceAction { pub(crate) fn new( - tx: Transaction<'a>, snapshot_id: i64, key_metadata: Vec, commit_uuid: Uuid, snapshot_properties: HashMap, ) -> Result { Ok(Self { - tx, snapshot_id, commit_uuid, snapshot_properties, @@ -122,6 +122,7 @@ impl<'a> SnapshotProduceAction<'a> { Ok(()) } + // todo move this fast_append /// Set snapshot summary properties. pub fn set_snapshot_properties( &mut self, @@ -131,9 +132,11 @@ impl<'a> SnapshotProduceAction<'a> { Ok(self) } + // TODO moving this to fast append /// Add data files to the snapshot. pub fn add_data_files( &mut self, + table: &Table, data_files: impl IntoIterator, ) -> Result<&mut Self> { let data_files: Vec = data_files.into_iter().collect(); @@ -145,9 +148,7 @@ impl<'a> SnapshotProduceAction<'a> { )); } // Check if the data file partition spec id matches the table default partition spec id. - if self.tx.current_table.metadata().default_partition_spec_id() - != data_file.partition_spec_id - { + if table.metadata().default_partition_spec_id() != data_file.partition_spec_id { return Err(Error::new( ErrorKind::DataInvalid, "Data file partition spec id does not match table default partition spec id", @@ -155,32 +156,29 @@ impl<'a> SnapshotProduceAction<'a> { } Self::validate_partition_value( data_file.partition(), - self.tx.current_table.metadata().default_partition_type(), + table.metadata().default_partition_type(), )?; } self.added_data_files.extend(data_files); Ok(self) } - fn new_manifest_output(&mut self) -> Result { + fn new_manifest_output(&mut self, table: &Table) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", - self.tx.current_table.metadata().location(), + table.metadata().location(), META_ROOT_PATH, self.commit_uuid, self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - self.tx - .current_table - .file_io() - .new_output(new_manifest_path) + table.file_io().new_output(new_manifest_path) } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); - if added_data_files.is_empty() { + async fn write_added_manifest(&mut self, table: &Table) -> Result { + // let added_data_files = std::mem::take(&mut self.added_data_files); + if self.added_data_files.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, "No added data files found when write a manifest file", @@ -188,8 +186,8 @@ impl<'a> SnapshotProduceAction<'a> { } let snapshot_id = self.snapshot_id; - let format_version = self.tx.current_table.metadata().format_version(); - let manifest_entries = added_data_files.into_iter().map(|data_file| { + let format_version = table.metadata().format_version(); + let manifest_entries = self.added_data_files.clone().into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) .data_file(data_file); @@ -203,18 +201,13 @@ impl<'a> SnapshotProduceAction<'a> { }); let mut writer = { let builder = ManifestWriterBuilder::new( - self.new_manifest_output()?, + self.new_manifest_output(table)?, Some(self.snapshot_id), self.key_metadata.clone(), - self.tx.current_table.metadata().current_schema().clone(), - self.tx - .current_table - .metadata() - .default_partition_spec() - .as_ref() - .clone(), + table.metadata().current_schema().clone(), + table.metadata().default_partition_spec().as_ref().clone(), ); - if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { + if table.metadata().format_version() == FormatVersion::V1 { builder.build_v1() } else { builder.build_v2_data() @@ -228,11 +221,14 @@ impl<'a> SnapshotProduceAction<'a> { async fn manifest_file( &mut self, + table: &Table, snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - let added_manifest = self.write_added_manifest().await?; - let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; + let added_manifest = self.write_added_manifest(table).await?; + let existing_manifests = snapshot_produce_operation + .existing_manifest(table, self) + .await?; // # TODO // Support process delete entries. @@ -245,10 +241,11 @@ impl<'a> SnapshotProduceAction<'a> { // Returns a `Summary` of the current snapshot fn summary( &self, + table: &Table, snapshot_produce_operation: &OP, ) -> Result { let mut summary_collector = SnapshotSummaryCollector::default(); - let table_metadata = self.tx.current_table.metadata_ref(); + let table_metadata = table.metadata_ref(); let partition_summary_limit = if let Some(limit) = table_metadata .properties() @@ -293,10 +290,10 @@ impl<'a> SnapshotProduceAction<'a> { ) } - fn generate_manifest_list_file_path(&self, attempt: i64) -> String { + fn generate_manifest_list_file_path(&self, table: &Table, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", - self.tx.current_table.metadata().location(), + table.metadata().location(), META_ROOT_PATH, self.snapshot_id, attempt, @@ -307,41 +304,36 @@ impl<'a> SnapshotProduceAction<'a> { /// Finished building the action and apply it to the transaction. pub async fn apply( - mut self, + &mut self, + table: &Table, snapshot_produce_operation: OP, process: MP, - ) -> Result> { + ) -> Result { let new_manifests = self - .manifest_file(&snapshot_produce_operation, &process) + .manifest_file(table, &snapshot_produce_operation, &process) .await?; - let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); + let next_seq_num = table.metadata().next_sequence_number(); let summary = self - .summary(&snapshot_produce_operation) + .summary(table, &snapshot_produce_operation) .map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.") .with_source(err) }) .unwrap(); - let manifest_list_path = self.generate_manifest_list_file_path(0); + let manifest_list_path = self.generate_manifest_list_file_path(table, 0); - let mut manifest_list_writer = match self.tx.current_table.metadata().format_version() { + let mut manifest_list_writer = match table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( - self.tx - .current_table - .file_io() - .new_output(manifest_list_path.clone())?, + table.file_io().new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.current_table.metadata().current_snapshot_id(), + table.metadata().current_snapshot_id(), ), FormatVersion::V2 => ManifestListWriter::v2( - self.tx - .current_table - .file_io() - .new_output(manifest_list_path.clone())?, + table.file_io().new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.current_table.metadata().current_snapshot_id(), + table.metadata().current_snapshot_id(), next_seq_num, ), }; @@ -352,37 +344,36 @@ impl<'a> SnapshotProduceAction<'a> { let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(self.tx.current_table.metadata().current_snapshot_id()) + .with_parent_snapshot_id(table.metadata().current_snapshot_id()) .with_sequence_number(next_seq_num) .with_summary(summary) - .with_schema_id(self.tx.current_table.metadata().current_schema_id()) + .with_schema_id(table.metadata().current_schema_id()) .with_timestamp_ms(commit_ts) .build(); - self.tx.apply( - vec![ - TableUpdate::AddSnapshot { - snapshot: new_snapshot, - }, - TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRANCH.to_string(), - reference: SnapshotReference::new( - self.snapshot_id, - SnapshotRetention::branch(None, None, None), - ), - }, - ], - vec![ - TableRequirement::UuidMatch { - uuid: self.tx.current_table.metadata().uuid(), - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.current_table.metadata().current_snapshot_id(), - }, - ], - )?; - - Ok(self.tx) + let updates = vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot, + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference::new( + self.snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ]; + + let requirements = vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id(), + }, + ]; + + Ok(ActionCommit::new(updates, requirements)) } } diff --git a/crates/iceberg/src/transaction/sort_order.rs b/crates/iceberg/src/transaction/sort_order.rs index f925e602a..643d7cf6a 100644 --- a/crates/iceberg/src/transaction/sort_order.rs +++ b/crates/iceberg/src/transaction/sort_order.rs @@ -15,74 +15,40 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + +use async_trait::async_trait; + use crate::error::Result; use crate::spec::{NullOrder, SortDirection, SortField, SortOrder, Transform}; -use crate::transaction::Transaction; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; /// Transaction action for replacing sort order. -pub struct ReplaceSortOrderAction<'a> { - pub tx: Transaction<'a>, +pub struct ReplaceSortOrderAction { pub sort_fields: Vec, } -impl<'a> ReplaceSortOrderAction<'a> { +impl ReplaceSortOrderAction { /// Adds a field for sorting in ascending order. - pub fn asc(self, name: &str, null_order: NullOrder) -> Result { - self.add_sort_field(name, SortDirection::Ascending, null_order) + pub fn asc(self, table: &Table, name: &str, null_order: NullOrder) -> Result { + self.add_sort_field(table, name, SortDirection::Ascending, null_order) } /// Adds a field for sorting in descending order. - pub fn desc(self, name: &str, null_order: NullOrder) -> Result { - self.add_sort_field(name, SortDirection::Descending, null_order) - } - - /// Finished building the action and apply it to the transaction. - pub fn apply(mut self) -> Result> { - let unbound_sort_order = SortOrder::builder() - .with_fields(self.sort_fields) - .build_unbound()?; - - let updates = vec![ - TableUpdate::AddSortOrder { - sort_order: unbound_sort_order, - }, - TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }, - ]; - - let requirements = vec![ - TableRequirement::CurrentSchemaIdMatch { - current_schema_id: self - .tx - .current_table - .metadata() - .current_schema() - .schema_id(), - }, - TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: self - .tx - .current_table - .metadata() - .default_sort_order() - .order_id, - }, - ]; - - self.tx.apply(updates, requirements)?; - - Ok(self.tx) + pub fn desc(self, table: &Table, name: &str, null_order: NullOrder) -> Result { + self.add_sort_field(table, name, SortDirection::Descending, null_order) } fn add_sort_field( mut self, + table: &Table, name: &str, sort_direction: SortDirection, null_order: NullOrder, ) -> Result { - let field_id = self - .tx - .current_table + let field_id = table .metadata() .current_schema() .field_id_by_name(name) @@ -105,6 +71,33 @@ impl<'a> ReplaceSortOrderAction<'a> { } } +#[async_trait] +impl TransactionAction for ReplaceSortOrderAction { + async fn commit(self: Arc, table: &Table) -> Result { + let unbound_sort_order = SortOrder::builder() + .with_fields(self.sort_fields.clone()) + .build_unbound()?; + + let updates = vec![ + TableUpdate::AddSortOrder { + sort_order: unbound_sort_order, + }, + TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }, + ]; + + let requirements = vec![ + TableRequirement::CurrentSchemaIdMatch { + current_schema_id: table.metadata().current_schema().schema_id(), + }, + TableRequirement::DefaultSortOrderIdMatch { + default_sort_order_id: table.metadata().default_sort_order().order_id, + }, + ]; + + Ok(ActionCommit::new(updates, requirements)) + } +} + #[cfg(test)] mod tests { use crate::transaction::Transaction; @@ -114,7 +107,7 @@ mod tests { #[test] fn test_replace_sort_order() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx.replace_sort_order().apply().unwrap(); assert_eq!( diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index f3ee17c75..aeeda4466 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::transaction::Transaction; +use iceberg::transaction::action::TransactionAction; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -111,11 +112,13 @@ async fn test_append_data_file() { assert_eq!(field_ids, vec![1, 2, 3]); // commit result - let tx = Transaction::new(&table); + let mut tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); - let table = tx.commit(&rest_catalog).await.unwrap(); + append_action + .add_data_files(&tx, data_file.clone()) + .unwrap(); + Arc::new(append_action).commit(&mut tx).await.unwrap(); + let table = tx.commit(Arc::new(&rest_catalog)).await.unwrap(); // check result let batch_stream = table @@ -131,11 +134,13 @@ async fn test_append_data_file() { assert_eq!(batches[0], batch); // commit result again - let tx = Transaction::new(&table); + let mut tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); - let table = tx.commit(&rest_catalog).await.unwrap(); + append_action + .add_data_files(&tx, data_file.clone()) + .unwrap(); + Arc::new(append_action).commit(&mut tx).await.unwrap(); + let table = tx.commit(Arc::new(&rest_catalog)).await.unwrap(); // check result again let batch_stream = table diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index c5c029a45..f9e2de10e 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -24,6 +24,7 @@ use futures::TryStreamExt; use iceberg::spec::{Literal, PrimitiveLiteral, Struct, Transform, UnboundPartitionSpec}; use iceberg::table::Table; use iceberg::transaction::Transaction; +use iceberg::transaction::action::TransactionAction; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -119,13 +120,13 @@ async fn test_append_partition_data_file() { let data_file_valid = data_file_writer_valid.close().await.unwrap(); // commit result - let tx = Transaction::new(&table); + let mut tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action - .add_data_files(data_file_valid.clone()) + .add_data_files(&tx, data_file_valid.clone()) .unwrap(); - let tx = append_action.apply().await.unwrap(); - let table = tx.commit(&rest_catalog).await.unwrap(); + Arc::new(append_action).commit(&mut tx).await.unwrap(); + let table = tx.commit(Arc::new(&rest_catalog)).await.unwrap(); // check result let batch_stream = table @@ -179,10 +180,10 @@ async fn test_schema_incompatible_partition_type( data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); - let tx = Transaction::new(&table); + let mut tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); if append_action - .add_data_files(data_file_invalid.clone()) + .add_data_files(&tx, data_file_invalid.clone()) .is_ok() { panic!("diverging partition info should have returned error"); @@ -219,10 +220,10 @@ async fn test_schema_incompatible_partition_fields( data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); - let tx = Transaction::new(&table); + let mut tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); if append_action - .add_data_files(data_file_invalid.clone()) + .add_data_files(&tx, data_file_invalid.clone()) .is_ok() { panic!("passing different number of partition fields should have returned error"); diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index d277e12e5..0f5302247 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::transaction::Transaction; +use iceberg::transaction::action::TransactionAction; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -89,17 +90,21 @@ async fn test_append_data_file_conflict() { let data_file = data_file_writer.close().await.unwrap(); // start two transaction and commit one of them - let tx1 = Transaction::new(&table); + let mut tx1 = Transaction::new(table.clone()); let mut append_action = tx1.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx1 = append_action.apply().await.unwrap(); + append_action + .add_data_files(&tx1, data_file.clone()) + .unwrap(); + Arc::new(append_action).commit(&mut tx1).await.unwrap(); - let tx2 = Transaction::new(&table); + let mut tx2 = Transaction::new(table.clone()); let mut append_action = tx2.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx2 = append_action.apply().await.unwrap(); + append_action + .add_data_files(&tx2, data_file.clone()) + .unwrap(); + Arc::new(append_action).commit(&mut tx2).await.unwrap(); let table = tx2 - .commit(&rest_catalog) + .commit(Arc::new(&rest_catalog)) .await .expect("The first commit should not fail."); @@ -117,5 +122,5 @@ async fn test_append_data_file_conflict() { assert_eq!(batches[0], batch); // another commit should fail - assert!(tx1.commit(&rest_catalog).await.is_err()); + assert!(tx1.commit(Arc::new(&rest_catalog)).await.is_err()); } diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 5ff982720..9f607271c 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -34,6 +34,7 @@ use iceberg::spec::{ PrimitiveType, Schema, StructType, Type, }; use iceberg::transaction::Transaction; +use iceberg::transaction::action::TransactionAction; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -308,11 +309,13 @@ async fn test_scan_all_type() { let data_file = data_file_writer.close().await.unwrap(); // commit result - let tx = Transaction::new(&table); + let mut tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); - let table = tx.commit(&rest_catalog).await.unwrap(); + append_action + .add_data_files(&tx, data_file.clone()) + .unwrap(); + Arc::new(append_action).commit(&mut tx).await.unwrap(); + let table = tx.commit(Arc::new(&rest_catalog)).await.unwrap(); // check result let batch_stream = table