diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 23f869b83..a23ff36b3 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -266,7 +266,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), - vec![], + None, current_schema.clone(), current_partition_spec.as_ref().clone(), ) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index f7a43a647..27dc1af98 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -752,7 +752,7 @@ pub mod tests { let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), - vec![], + None, current_schema.clone(), current_partition_spec.as_ref().clone(), ) @@ -964,7 +964,7 @@ pub mod tests { let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), - vec![], + None, current_schema.clone(), current_partition_spec.as_ref().clone(), ) diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index eba69dbac..33b7d3870 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -232,7 +232,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(1), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -417,7 +417,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(2), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -514,7 +514,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(3), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -623,7 +623,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(2), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -731,7 +731,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(2), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -1010,7 +1010,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(1), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 92d356fb8..39945a513 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -40,7 +40,7 @@ use crate::{Error, ErrorKind}; pub struct ManifestWriterBuilder { output: OutputFile, snapshot_id: Option, - key_metadata: Vec, + key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, } @@ -50,7 +50,7 @@ impl ManifestWriterBuilder { pub fn new( output: OutputFile, snapshot_id: Option, - key_metadata: Vec, + key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, ) -> Self { @@ -115,7 +115,7 @@ pub struct ManifestWriter { min_seq_num: Option, - key_metadata: Vec, + key_metadata: Option>, manifest_entries: Vec, @@ -127,7 +127,7 @@ impl ManifestWriter { pub(crate) fn new( output: OutputFile, snapshot_id: Option, - key_metadata: Vec, + key_metadata: Option>, metadata: ManifestMetadata, ) -> Self { Self { @@ -416,7 +416,7 @@ impl ManifestWriter { existing_rows_count: Some(self.existing_rows), deleted_rows_count: Some(self.deleted_rows), partitions: Some(partition_summary), - key_metadata: Some(self.key_metadata), + key_metadata: self.key_metadata, }) } } @@ -622,7 +622,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(3), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 405422558..5195b99a3 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -15,44 +15,41 @@ // specific language governing permissions and limitations // under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; +use std::sync::Arc; +use async_trait::async_trait; use uuid::Uuid; use crate::error::Result; use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; -use crate::transaction::Transaction; +use crate::table::Table; use crate::transaction::snapshot::{ - DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation, + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, }; -use crate::writer::file_writer::ParquetWriter; -use crate::{Error, ErrorKind}; +use crate::transaction::{ActionCommit, TransactionAction}; /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction { - snapshot_produce_action: SnapshotProduceAction, check_duplicate: bool, + // below are properties used to create SnapshotProducer when commit + snapshot_id: i64, + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, } impl FastAppendAction { - #[allow(clippy::too_many_arguments)] - pub(crate) fn new( - tx: Transaction, - snapshot_id: i64, - commit_uuid: Uuid, - key_metadata: Vec, - snapshot_properties: HashMap, - ) -> Result { - Ok(Self { - snapshot_produce_action: SnapshotProduceAction::new( - tx, - snapshot_id, - key_metadata, - commit_uuid, - snapshot_properties, - )?, + pub(crate) fn new(snapshot_id: i64) -> Self { + Self { check_duplicate: true, - }) + snapshot_id, + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + } } /// Set whether to check duplicate files @@ -62,101 +59,51 @@ impl FastAppendAction { } /// Add data files to the snapshot. - pub fn add_data_files( - &mut self, - data_files: impl IntoIterator, - ) -> Result<&mut Self> { - self.snapshot_produce_action.add_data_files(data_files)?; - Ok(self) + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.added_data_files.extend(data_files); + self } - /// Set snapshot summary properties. - pub fn set_snapshot_properties( - &mut self, - snapshot_properties: HashMap, - ) -> Result<&mut Self> { - self.snapshot_produce_action - .set_snapshot_properties(snapshot_properties)?; - Ok(self) + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self } - /// Adds existing parquet files - /// - /// Note: This API is not yet fully supported in version 0.5.x. - /// It is currently incomplete and should not be used in production. - /// Specifically, schema compatibility checks and support for adding to partitioned tables - /// have not yet been implemented. - #[allow(dead_code)] - async fn add_parquet_files(mut self, file_path: Vec) -> Result { - if !self - .snapshot_produce_action - .tx - .current_table - .metadata() - .default_spec - .is_unpartitioned() - { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Appending to partitioned tables is not supported", - )); - } - - let table_metadata = self.snapshot_produce_action.tx.current_table.metadata(); - - let data_files = ParquetWriter::parquet_files_to_data_files( - self.snapshot_produce_action.tx.current_table.file_io(), - file_path, - table_metadata, - ) - .await?; - - self.add_data_files(data_files)?; + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } - self.apply().await + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self } +} + +#[async_trait] +impl TransactionAction for FastAppendAction { + async fn commit(self: Arc, table: &Table) -> Result { + // validate added files + SnapshotProducer::validate_added_data_files(table, &self.added_data_files)?; - /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result { // Checks duplicate files if self.check_duplicate { - let new_files: HashSet<&str> = self - .snapshot_produce_action - .added_data_files - .iter() - .map(|df| df.file_path.as_str()) - .collect(); - - let mut referenced_files = Vec::new(); - let table = &self.snapshot_produce_action.tx.current_table; - if let Some(current_snapshot) = table.metadata().current_snapshot() { - let manifest_list = current_snapshot - .load_manifest_list(table.file_io(), &table.metadata_ref()) - .await?; - for manifest_list_entry in manifest_list.entries() { - let manifest = manifest_list_entry.load_manifest(table.file_io()).await?; - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); - } - } - } - } - - if !referenced_files.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot add files that are already referenced by table, files: {}", - referenced_files.join(", ") - ), - )); - } + SnapshotProducer::validate_duplicate_files(table, &self.added_data_files).await?; } - self.snapshot_produce_action - .apply(FastAppendOperation, DefaultManifestProcess) + let snapshot_producer = SnapshotProducer::new( + self.snapshot_id, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + ); + + snapshot_producer + .commit(table, FastAppendOperation, DefaultManifestProcess) .await } } @@ -170,29 +117,18 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn delete_entries( &self, - _snapshot_produce: &SnapshotProduceAction, + _snapshot_produce: &SnapshotProducer, ) -> Result> { Ok(vec![]) } - async fn existing_manifest( - &self, - snapshot_produce: &SnapshotProduceAction, - ) -> Result> { - let Some(snapshot) = snapshot_produce - .tx - .current_table - .metadata() - .current_snapshot() - else { + async fn existing_manifest(&self, table: &Table) -> Result> { + let Some(snapshot) = table.metadata().current_snapshot() else { return Ok(vec![]); }; let manifest_list = snapshot - .load_manifest_list( - snapshot_produce.tx.current_table.file_io(), - &snapshot_produce.tx.current_table.metadata_ref(), - ) + .load_manifest_list(table.file_io(), &table.metadata_ref()) .await?; Ok(manifest_list @@ -207,33 +143,31 @@ impl SnapshotProduceOperation for FastAppendOperation { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; - use crate::scan::tests::TableTestFixture; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct, }; - use crate::transaction::Transaction; use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; use crate::{TableRequirement, TableUpdate}; #[tokio::test] async fn test_empty_data_append_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); - action.add_data_files(vec![]).unwrap(); - assert!(action.apply().await.is_err()); + let action = tx.fast_append().add_data_files(vec![]); + assert!(Arc::new(action).commit(&table).await.is_err()); } #[tokio::test] async fn test_set_snapshot_properties() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); let mut snapshot_properties = HashMap::new(); snapshot_properties.insert("key".to_string(), "val".to_string()); - action.set_snapshot_properties(snapshot_properties).unwrap(); + let data_file = DataFileBuilder::default() .content(DataContentType::Data) .file_path("test/1.parquet".to_string()) @@ -244,11 +178,16 @@ mod tests { .partition(Struct::from_iter([Some(Literal::long(300))])) .build() .unwrap(); - action.add_data_files(vec![data_file]).unwrap(); - let tx = action.apply().await.unwrap(); + + let action = tx + .fast_append() + .set_snapshot_properties(snapshot_properties) + .add_data_files(vec![data_file]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); // Check customized properties is contained in snapshot summary properties. - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { snapshot } else { unreachable!() @@ -264,10 +203,10 @@ mod tests { } #[tokio::test] - async fn test_fast_append_action() { + async fn test_fast_append_file_with_incompatible_partition_value() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); + let action = tx.fast_append(); // check add data file with incompatible partition value let data_file = DataFileBuilder::default() @@ -280,7 +219,17 @@ mod tests { .partition(Struct::from_iter([Some(Literal::string("test"))])) .build() .unwrap(); - assert!(action.add_data_files(vec![data_file.clone()]).is_err()); + + let action = action.add_data_files(vec![data_file.clone()]); + + assert!(Arc::new(action).commit(&table).await.is_err()); + } + + #[tokio::test] + async fn test_fast_append() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let action = tx.fast_append(); let data_file = DataFileBuilder::default() .content(DataContentType::Data) @@ -292,12 +241,15 @@ mod tests { .partition(Struct::from_iter([Some(Literal::long(300))])) .build() .unwrap(); - action.add_data_files(vec![data_file.clone()]).unwrap(); - let tx = action.apply().await.unwrap(); + + let action = action.add_data_files(vec![data_file.clone()]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); // check updates and requirements assert!( - matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) + matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) ); assert_eq!( vec![ @@ -309,11 +261,11 @@ mod tests { snapshot_id: table.metadata().current_snapshot_id } ], - tx.requirements + requirements ); // check manifest list - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { snapshot } else { unreachable!() @@ -347,41 +299,4 @@ mod tests { ); assert_eq!(data_file, *manifest.entries()[0].data_file()); } - - #[tokio::test] - async fn test_add_duplicated_parquet_files_to_unpartitioned_table() { - let mut fixture = TableTestFixture::new_unpartitioned(); - fixture.setup_unpartitioned_manifest_files().await; - let tx = crate::transaction::Transaction::new(&fixture.table); - - let file_paths = vec![ - format!("{}/1.parquet", &fixture.table_location), - format!("{}/3.parquet", &fixture.table_location), - ]; - - let fast_append_action = tx.fast_append(None, vec![]).unwrap(); - - // Attempt to add duplicated Parquet files with fast append. - assert!( - fast_append_action - .add_parquet_files(file_paths.clone()) - .await - .is_err(), - "file already in table" - ); - - let file_paths = vec![format!("{}/2.parquet", &fixture.table_location)]; - - let tx = crate::transaction::Transaction::new(&fixture.table); - let fast_append_action = tx.fast_append(None, vec![]).unwrap(); - - // Attempt to add Parquet file which was deleted from table. - assert!( - fast_append_action - .add_parquet_files(file_paths.clone()) - .await - .is_ok(), - "file not in table" - ); - } } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 7db3f7ed8..a2af9aaa3 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -28,7 +28,6 @@ mod update_location; mod update_properties; mod upgrade_format_version; -use std::collections::HashMap; use std::mem::discriminant; use std::sync::Arc; @@ -142,19 +141,8 @@ impl Transaction { } /// Creates a fast append action. - pub fn fast_append( - self, - commit_uuid: Option, - key_metadata: Vec, - ) -> Result { - let snapshot_id = self.generate_unique_snapshot_id(); - FastAppendAction::new( - self, - snapshot_id, - commit_uuid.unwrap_or_else(Uuid::now_v7), - key_metadata, - HashMap::new(), - ) + pub fn fast_append(&self) -> FastAppendAction { + FastAppendAction::new(self.generate_unique_snapshot_id()) } /// Creates replace sort order action. diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 3d3fd4570..80a9f68a1 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; @@ -29,7 +29,8 @@ use crate::spec::{ PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, }; -use crate::transaction::Transaction; +use crate::table::Table; +use crate::transaction::ActionCommit; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; @@ -39,11 +40,11 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { #[allow(unused)] fn delete_entries( &self, - snapshot_produce: &SnapshotProduceAction, + snapshot_produce: &SnapshotProducer, ) -> impl Future>> + Send; fn existing_manifest( &self, - snapshot_produce: &SnapshotProduceAction, + table: &Table, ) -> impl Future>> + Send; } @@ -59,36 +60,111 @@ pub(crate) trait ManifestProcess: Send + Sync { fn process_manifests(&self, manifests: Vec) -> Vec; } -pub(crate) struct SnapshotProduceAction { - pub tx: Transaction, +pub(crate) struct SnapshotProducer { snapshot_id: i64, - key_metadata: Vec, commit_uuid: Uuid, + key_metadata: Option>, snapshot_properties: HashMap, - pub added_data_files: Vec, + added_data_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, } -impl SnapshotProduceAction { +impl SnapshotProducer { pub(crate) fn new( - tx: Transaction, snapshot_id: i64, - key_metadata: Vec, commit_uuid: Uuid, + key_metadata: Option>, snapshot_properties: HashMap, - ) -> Result { - Ok(Self { - tx, + added_data_files: Vec, + ) -> Self { + Self { snapshot_id, commit_uuid, + key_metadata, snapshot_properties, - added_data_files: vec![], + added_data_files, manifest_counter: (0..), - key_metadata, - }) + } + } + + pub(crate) fn validate_added_data_files( + table: &Table, + added_data_files: &[DataFile], + ) -> Result<()> { + for data_file in added_data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + // Check if the data file partition spec id matches the table default partition spec id. + if table.metadata().default_partition_spec_id() != data_file.partition_spec_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Data file partition spec id does not match table default partition spec id", + )); + } + Self::validate_partition_value( + data_file.partition(), + table.metadata().default_partition_type(), + )?; + } + + Ok(()) + } + + pub(crate) async fn validate_duplicate_files( + table: &Table, + added_data_files: &[DataFile], + ) -> Result<()> { + let new_files: HashSet<&str> = added_data_files + .iter() + .map(|df| df.file_path.as_str()) + .collect(); + + let mut referenced_files = Vec::new(); + if let Some(current_snapshot) = table.metadata().current_snapshot() { + let manifest_list = current_snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await?; + for manifest_list_entry in manifest_list.entries() { + let manifest = manifest_list_entry.load_manifest(table.file_io()).await?; + for entry in manifest.entries() { + let file_path = entry.file_path(); + if new_files.contains(file_path) && entry.is_alive() { + referenced_files.push(file_path.to_string()); + } + } + } + } + + if !referenced_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), + )); + } + + Ok(()) + } + + fn new_manifest_output(&mut self, table: &Table) -> Result { + let new_manifest_path = format!( + "{}/{}/{}-m{}.{}", + table.metadata().location(), + META_ROOT_PATH, + self.commit_uuid, + self.manifest_counter.next().unwrap(), + DataFileFormat::Avro + ); + table.file_io().new_output(new_manifest_path) } // Check if the partition value is compatible with the partition type. @@ -122,63 +198,8 @@ impl SnapshotProduceAction { Ok(()) } - /// Set snapshot summary properties. - pub fn set_snapshot_properties( - &mut self, - snapshot_properties: HashMap, - ) -> Result<&mut Self> { - self.snapshot_properties = snapshot_properties; - Ok(self) - } - - /// Add data files to the snapshot. - pub fn add_data_files( - &mut self, - data_files: impl IntoIterator, - ) -> Result<&mut Self> { - let data_files: Vec = data_files.into_iter().collect(); - for data_file in &data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { - return Err(Error::new( - ErrorKind::DataInvalid, - "Only data content type is allowed for fast append", - )); - } - // Check if the data file partition spec id matches the table default partition spec id. - if self.tx.current_table.metadata().default_partition_spec_id() - != data_file.partition_spec_id - { - return Err(Error::new( - ErrorKind::DataInvalid, - "Data file partition spec id does not match table default partition spec id", - )); - } - Self::validate_partition_value( - data_file.partition(), - self.tx.current_table.metadata().default_partition_type(), - )?; - } - self.added_data_files.extend(data_files); - Ok(self) - } - - fn new_manifest_output(&mut self) -> Result { - let new_manifest_path = format!( - "{}/{}/{}-m{}.{}", - self.tx.current_table.metadata().location(), - META_ROOT_PATH, - self.commit_uuid, - self.manifest_counter.next().unwrap(), - DataFileFormat::Avro - ); - self.tx - .current_table - .file_io() - .new_output(new_manifest_path) - } - // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { + async fn write_added_manifest(&mut self, table: &Table) -> Result { let added_data_files = std::mem::take(&mut self.added_data_files); if added_data_files.is_empty() { return Err(Error::new( @@ -188,7 +209,7 @@ impl SnapshotProduceAction { } let snapshot_id = self.snapshot_id; - let format_version = self.tx.current_table.metadata().format_version(); + let format_version = table.metadata().format_version(); let manifest_entries = added_data_files.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) @@ -203,18 +224,13 @@ impl SnapshotProduceAction { }); let mut writer = { let builder = ManifestWriterBuilder::new( - self.new_manifest_output()?, + self.new_manifest_output(table)?, Some(self.snapshot_id), self.key_metadata.clone(), - self.tx.current_table.metadata().current_schema().clone(), - self.tx - .current_table - .metadata() - .default_partition_spec() - .as_ref() - .clone(), + table.metadata().current_schema().clone(), + table.metadata().default_partition_spec().as_ref().clone(), ); - if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { + if table.metadata().format_version() == FormatVersion::V1 { builder.build_v1() } else { builder.build_v2_data() @@ -228,11 +244,12 @@ impl SnapshotProduceAction { async fn manifest_file( &mut self, + table: &Table, snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - let added_manifest = self.write_added_manifest().await?; - let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; + let added_manifest = self.write_added_manifest(table).await?; + let existing_manifests = snapshot_produce_operation.existing_manifest(table).await?; // # TODO // Support process delete entries. @@ -245,10 +262,11 @@ impl SnapshotProduceAction { // Returns a `Summary` of the current snapshot fn summary( &self, + table: &Table, snapshot_produce_operation: &OP, ) -> Result { let mut summary_collector = SnapshotSummaryCollector::default(); - let table_metadata = self.tx.current_table.metadata_ref(); + let table_metadata = table.metadata_ref(); let partition_summary_limit = if let Some(limit) = table_metadata .properties() @@ -293,10 +311,10 @@ impl SnapshotProduceAction { ) } - fn generate_manifest_list_file_path(&self, attempt: i64) -> String { + fn generate_manifest_list_file_path(&self, table: &Table, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", - self.tx.current_table.metadata().location(), + table.metadata().location(), META_ROOT_PATH, self.snapshot_id, attempt, @@ -305,43 +323,37 @@ impl SnapshotProduceAction { ) } - /// Finished building the action and apply it to the transaction. - pub async fn apply( + /// Finished building the action and return the [`ActionCommit`] to the transaction. + pub(crate) async fn commit( mut self, + table: &Table, snapshot_produce_operation: OP, process: MP, - ) -> Result { + ) -> Result { let new_manifests = self - .manifest_file(&snapshot_produce_operation, &process) + .manifest_file(table, &snapshot_produce_operation, &process) .await?; - let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); + let next_seq_num = table.metadata().next_sequence_number(); let summary = self - .summary(&snapshot_produce_operation) + .summary(table, &snapshot_produce_operation) .map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.") .with_source(err) - }) - .unwrap(); + })?; - let manifest_list_path = self.generate_manifest_list_file_path(0); + let manifest_list_path = self.generate_manifest_list_file_path(table, 0); - let mut manifest_list_writer = match self.tx.current_table.metadata().format_version() { + let mut manifest_list_writer = match table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( - self.tx - .current_table - .file_io() - .new_output(manifest_list_path.clone())?, + table.file_io().new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.current_table.metadata().current_snapshot_id(), + table.metadata().current_snapshot_id(), ), FormatVersion::V2 => ManifestListWriter::v2( - self.tx - .current_table - .file_io() - .new_output(manifest_list_path.clone())?, + table.file_io().new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.current_table.metadata().current_snapshot_id(), + table.metadata().current_snapshot_id(), next_seq_num, ), }; @@ -352,37 +364,36 @@ impl SnapshotProduceAction { let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(self.tx.current_table.metadata().current_snapshot_id()) + .with_parent_snapshot_id(table.metadata().current_snapshot_id()) .with_sequence_number(next_seq_num) .with_summary(summary) - .with_schema_id(self.tx.current_table.metadata().current_schema_id()) + .with_schema_id(table.metadata().current_schema_id()) .with_timestamp_ms(commit_ts) .build(); - self.tx.apply( - vec![ - TableUpdate::AddSnapshot { - snapshot: new_snapshot, - }, - TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRANCH.to_string(), - reference: SnapshotReference::new( - self.snapshot_id, - SnapshotRetention::branch(None, None, None), - ), - }, - ], - vec![ - TableRequirement::UuidMatch { - uuid: self.tx.current_table.metadata().uuid(), - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.current_table.metadata().current_snapshot_id(), - }, - ], - )?; - - Ok(self.tx) + let updates = vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot, + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference::new( + self.snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ]; + + let requirements = vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id(), + }, + ]; + + Ok(ActionCommit::new(updates, requirements)) } } diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index 38a029510..dd54ac98f 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; -use iceberg::transaction::Transaction; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -112,9 +112,8 @@ async fn test_append_data_file() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let append_action = tx.fast_append().add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index c5c029a45..36a1c3643 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -23,7 +23,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::spec::{Literal, PrimitiveLiteral, Struct, Transform, UnboundPartitionSpec}; use iceberg::table::Table; -use iceberg::transaction::Transaction; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -120,11 +120,8 @@ async fn test_append_partition_data_file() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action - .add_data_files(data_file_valid.clone()) - .unwrap(); - let tx = append_action.apply().await.unwrap(); + let append_action = tx.fast_append().add_data_files(data_file_valid.clone()); + let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result @@ -144,6 +141,7 @@ async fn test_append_partition_data_file() { parquet_writer_builder.clone(), batch.clone(), table.clone(), + &rest_catalog, ) .await; @@ -151,6 +149,7 @@ async fn test_append_partition_data_file() { parquet_writer_builder, batch, table, + &rest_catalog, first_partition_id_value, ) .await; @@ -163,6 +162,7 @@ async fn test_schema_incompatible_partition_type( >, batch: RecordBatch, table: Table, + catalog: &dyn Catalog, ) { // test writing different "type" of partition than mentioned in schema let mut data_file_writer_invalid = DataFileWriterBuilder::new( @@ -180,11 +180,10 @@ async fn test_schema_incompatible_partition_type( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - if append_action - .add_data_files(data_file_invalid.clone()) - .is_ok() - { + let append_action = tx.fast_append().add_data_files(data_file_invalid.clone()); + let tx = append_action.apply(tx).unwrap(); + + if tx.commit(catalog).await.is_ok() { panic!("diverging partition info should have returned error"); } } @@ -196,6 +195,7 @@ async fn test_schema_incompatible_partition_fields( >, batch: RecordBatch, table: Table, + catalog: &dyn Catalog, first_partition_id_value: i32, ) { // test writing different number of partition fields than mentioned in schema @@ -220,11 +220,9 @@ async fn test_schema_incompatible_partition_fields( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - if append_action - .add_data_files(data_file_invalid.clone()) - .is_ok() - { + let append_action = tx.fast_append().add_data_files(data_file_invalid.clone()); + let tx = append_action.apply(tx).unwrap(); + if tx.commit(catalog).await.is_ok() { panic!("passing different number of partition fields should have returned error"); } } diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index d277e12e5..551116bf9 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; -use iceberg::transaction::Transaction; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -90,14 +90,12 @@ async fn test_append_data_file_conflict() { // start two transaction and commit one of them let tx1 = Transaction::new(&table); - let mut append_action = tx1.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx1 = append_action.apply().await.unwrap(); + let append_action = tx1.fast_append().add_data_files(data_file.clone()); + let tx1 = append_action.apply(tx1).unwrap(); let tx2 = Transaction::new(&table); - let mut append_action = tx2.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx2 = append_action.apply().await.unwrap(); + let append_action = tx2.fast_append().add_data_files(data_file.clone()); + let tx2 = append_action.apply(tx2).unwrap(); let table = tx2 .commit(&rest_catalog) .await diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 5ff982720..51f409392 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -33,7 +33,7 @@ use iceberg::spec::{ LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedField, PrimitiveType, Schema, StructType, Type, }; -use iceberg::transaction::Transaction; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -309,9 +309,8 @@ async fn test_scan_all_type() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let append_action = tx.fast_append().add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result