From 0f4b4fc0454da755742a960f128e87ce05969743 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 13 Jun 2025 09:09:16 -0700 Subject: [PATCH 1/5] Impl TxAction for FastAppendAction, make SnapshotProducer immutable --- crates/iceberg/src/transaction/append.rs | 253 ++++++++++++--------- crates/iceberg/src/transaction/mod.rs | 9 +- crates/iceberg/src/transaction/snapshot.rs | 227 ++++++------------ 3 files changed, 220 insertions(+), 269 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 405422558..87f6bd661 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -16,43 +16,135 @@ // under the License. use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use async_trait::async_trait; use uuid::Uuid; use crate::error::Result; -use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; -use crate::transaction::Transaction; +use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation, Struct, StructType}; +use crate::table::Table; use crate::transaction::snapshot::{ - DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation, + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, }; +use crate::transaction::{ActionCommit, TransactionAction}; use crate::writer::file_writer::ParquetWriter; use crate::{Error, ErrorKind}; /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction { - snapshot_produce_action: SnapshotProduceAction, check_duplicate: bool, + // below are properties used to create SnapshotProducer when commit + snapshot_id: i64, + commit_uuid: Uuid, + key_metadata: Vec, + snapshot_properties: HashMap, + added_data_files: Vec, } impl FastAppendAction { - #[allow(clippy::too_many_arguments)] - pub(crate) fn new( - tx: Transaction, - snapshot_id: i64, - commit_uuid: Uuid, - key_metadata: Vec, - snapshot_properties: HashMap, - ) -> Result { - Ok(Self { - snapshot_produce_action: SnapshotProduceAction::new( - tx, - snapshot_id, - key_metadata, - commit_uuid, - snapshot_properties, - )?, + pub(crate) fn new(snapshot_id: i64, commit_uuid: Uuid, key_metadata: Vec) -> Self { + Self { check_duplicate: true, - }) + snapshot_id, + commit_uuid, + key_metadata, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + } + } + + fn validate_added_data_files(table: &Table, added_data_files: &Vec) -> Result<()> { + for data_file in added_data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + // Check if the data file partition spec id matches the table default partition spec id. + if table.metadata().default_partition_spec_id() != data_file.partition_spec_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Data file partition spec id does not match table default partition spec id", + )); + } + Self::validate_partition_value( + data_file.partition(), + table.metadata().default_partition_type(), + )?; + } + + Ok(()) + } + + async fn validate_duplicate_files( + table: &Table, + added_data_files: &Vec, + ) -> Result<()> { + let new_files: HashSet<&str> = added_data_files + .iter() + .map(|df| df.file_path.as_str()) + .collect(); + + let mut referenced_files = Vec::new(); + if let Some(current_snapshot) = table.metadata().current_snapshot() { + let manifest_list = current_snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await?; + for manifest_list_entry in manifest_list.entries() { + let manifest = manifest_list_entry.load_manifest(table.file_io()).await?; + for entry in manifest.entries() { + let file_path = entry.file_path(); + if new_files.contains(file_path) && entry.is_alive() { + referenced_files.push(file_path.to_string()); + } + } + } + } + + if !referenced_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), + )); + } + + Ok(()) + } + + // Check if the partition value is compatible with the partition type. + fn validate_partition_value( + partition_value: &Struct, + partition_type: &StructType, + ) -> Result<()> { + if partition_value.fields().len() != partition_type.fields().len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatible with partition type", + )); + } + + for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { + let field = field.field_type.as_primitive_type().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Partition field should only be primitive type.", + ) + })?; + if let Some(value) = value { + if !field.compatible(&value.as_primitive_literal().unwrap()) { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatible partition type", + )); + } + } + } + Ok(()) } /// Set whether to check duplicate files @@ -62,22 +154,15 @@ impl FastAppendAction { } /// Add data files to the snapshot. - pub fn add_data_files( - &mut self, - data_files: impl IntoIterator, - ) -> Result<&mut Self> { - self.snapshot_produce_action.add_data_files(data_files)?; - Ok(self) + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.added_data_files.extend(data_files); + self } /// Set snapshot summary properties. - pub fn set_snapshot_properties( - &mut self, - snapshot_properties: HashMap, - ) -> Result<&mut Self> { - self.snapshot_produce_action - .set_snapshot_properties(snapshot_properties)?; - Ok(self) + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self } /// Adds existing parquet files @@ -87,76 +172,45 @@ impl FastAppendAction { /// Specifically, schema compatibility checks and support for adding to partitioned tables /// have not yet been implemented. #[allow(dead_code)] - async fn add_parquet_files(mut self, file_path: Vec) -> Result { - if !self - .snapshot_produce_action - .tx - .current_table - .metadata() - .default_spec - .is_unpartitioned() - { + async fn add_parquet_files(self, table: &Table, file_path: Vec) -> Result { + if !table.metadata().default_spec.is_unpartitioned() { return Err(Error::new( ErrorKind::FeatureUnsupported, "Appending to partitioned tables is not supported", )); } - let table_metadata = self.snapshot_produce_action.tx.current_table.metadata(); - - let data_files = ParquetWriter::parquet_files_to_data_files( - self.snapshot_produce_action.tx.current_table.file_io(), - file_path, - table_metadata, - ) - .await?; + let table_metadata = table.metadata(); - self.add_data_files(data_files)?; + let data_files = + ParquetWriter::parquet_files_to_data_files(table.file_io(), file_path, table_metadata) + .await?; - self.apply().await + Ok(self.add_data_files(data_files)) } +} + +#[async_trait] +impl TransactionAction for FastAppendAction { + async fn commit(self: Arc, table: &Table) -> Result { + // validate added files + Self::validate_added_data_files(table, &self.added_data_files)?; - /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result { // Checks duplicate files if self.check_duplicate { - let new_files: HashSet<&str> = self - .snapshot_produce_action - .added_data_files - .iter() - .map(|df| df.file_path.as_str()) - .collect(); - - let mut referenced_files = Vec::new(); - let table = &self.snapshot_produce_action.tx.current_table; - if let Some(current_snapshot) = table.metadata().current_snapshot() { - let manifest_list = current_snapshot - .load_manifest_list(table.file_io(), &table.metadata_ref()) - .await?; - for manifest_list_entry in manifest_list.entries() { - let manifest = manifest_list_entry.load_manifest(table.file_io()).await?; - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); - } - } - } - } - - if !referenced_files.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot add files that are already referenced by table, files: {}", - referenced_files.join(", ") - ), - )); - } + Self::validate_duplicate_files(table, &self.added_data_files).await?; } - self.snapshot_produce_action - .apply(FastAppendOperation, DefaultManifestProcess) + let snapshot_producer = SnapshotProducer::new( + self.snapshot_id.clone(), + self.commit_uuid.clone(), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + ); + + snapshot_producer + .commit(table, FastAppendOperation, DefaultManifestProcess) .await } } @@ -170,29 +224,18 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn delete_entries( &self, - _snapshot_produce: &SnapshotProduceAction, + _snapshot_produce: &SnapshotProducer, ) -> Result> { Ok(vec![]) } - async fn existing_manifest( - &self, - snapshot_produce: &SnapshotProduceAction, - ) -> Result> { - let Some(snapshot) = snapshot_produce - .tx - .current_table - .metadata() - .current_snapshot() - else { + async fn existing_manifest(&self, table: &Table) -> Result> { + let Some(snapshot) = table.metadata().current_snapshot() else { return Ok(vec![]); }; let manifest_list = snapshot - .load_manifest_list( - snapshot_produce.tx.current_table.file_io(), - &snapshot_produce.tx.current_table.metadata_ref(), - ) + .load_manifest_list(table.file_io(), &table.metadata_ref()) .await?; Ok(manifest_list diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index fc2edcf99..ab8319aed 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -28,7 +28,6 @@ mod update_location; mod update_properties; mod upgrade_format_version; -use std::collections::HashMap; use std::mem::discriminant; use std::sync::Arc; @@ -142,18 +141,12 @@ impl Transaction { } /// Creates a fast append action. - pub fn fast_append( - self, - commit_uuid: Option, - key_metadata: Vec, - ) -> Result { + pub fn fast_append(&self, commit_uuid: Option, key_metadata: Vec) -> FastAppendAction { let snapshot_id = self.generate_unique_snapshot_id(); FastAppendAction::new( - self, snapshot_id, commit_uuid.unwrap_or_else(Uuid::now_v7), key_metadata, - HashMap::new(), ) } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 3d3fd4570..79066fd03 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -27,9 +27,10 @@ use crate::spec::{ DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, - SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, + SnapshotSummaryCollector, Summary, update_snapshot_summaries, }; -use crate::transaction::Transaction; +use crate::table::Table; +use crate::transaction::ActionCommit; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; @@ -39,11 +40,11 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { #[allow(unused)] fn delete_entries( &self, - snapshot_produce: &SnapshotProduceAction, + snapshot_produce: &SnapshotProducer, ) -> impl Future>> + Send; fn existing_manifest( &self, - snapshot_produce: &SnapshotProduceAction, + table: &Table, ) -> impl Future>> + Send; } @@ -59,126 +60,50 @@ pub(crate) trait ManifestProcess: Send + Sync { fn process_manifests(&self, manifests: Vec) -> Vec; } -pub(crate) struct SnapshotProduceAction { - pub tx: Transaction, +pub(crate) struct SnapshotProducer { snapshot_id: i64, - key_metadata: Vec, commit_uuid: Uuid, + key_metadata: Vec, snapshot_properties: HashMap, - pub added_data_files: Vec, + added_data_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, } -impl SnapshotProduceAction { +impl SnapshotProducer { pub(crate) fn new( - tx: Transaction, snapshot_id: i64, - key_metadata: Vec, commit_uuid: Uuid, + key_metadata: Vec, snapshot_properties: HashMap, - ) -> Result { - Ok(Self { - tx, + added_data_files: Vec, + ) -> Self { + Self { snapshot_id, commit_uuid, + key_metadata, snapshot_properties, - added_data_files: vec![], + added_data_files, manifest_counter: (0..), - key_metadata, - }) - } - - // Check if the partition value is compatible with the partition type. - fn validate_partition_value( - partition_value: &Struct, - partition_type: &StructType, - ) -> Result<()> { - if partition_value.fields().len() != partition_type.fields().len() { - return Err(Error::new( - ErrorKind::DataInvalid, - "Partition value is not compatible with partition type", - )); } - - for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { - let field = field.field_type.as_primitive_type().ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "Partition field should only be primitive type.", - ) - })?; - if let Some(value) = value { - if !field.compatible(&value.as_primitive_literal().unwrap()) { - return Err(Error::new( - ErrorKind::DataInvalid, - "Partition value is not compatible partition type", - )); - } - } - } - Ok(()) - } - - /// Set snapshot summary properties. - pub fn set_snapshot_properties( - &mut self, - snapshot_properties: HashMap, - ) -> Result<&mut Self> { - self.snapshot_properties = snapshot_properties; - Ok(self) - } - - /// Add data files to the snapshot. - pub fn add_data_files( - &mut self, - data_files: impl IntoIterator, - ) -> Result<&mut Self> { - let data_files: Vec = data_files.into_iter().collect(); - for data_file in &data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { - return Err(Error::new( - ErrorKind::DataInvalid, - "Only data content type is allowed for fast append", - )); - } - // Check if the data file partition spec id matches the table default partition spec id. - if self.tx.current_table.metadata().default_partition_spec_id() - != data_file.partition_spec_id - { - return Err(Error::new( - ErrorKind::DataInvalid, - "Data file partition spec id does not match table default partition spec id", - )); - } - Self::validate_partition_value( - data_file.partition(), - self.tx.current_table.metadata().default_partition_type(), - )?; - } - self.added_data_files.extend(data_files); - Ok(self) } - fn new_manifest_output(&mut self) -> Result { + fn new_manifest_output(&mut self, table: &Table) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", - self.tx.current_table.metadata().location(), + table.metadata().location(), META_ROOT_PATH, self.commit_uuid, self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - self.tx - .current_table - .file_io() - .new_output(new_manifest_path) + table.file_io().new_output(new_manifest_path) } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { + async fn write_added_manifest(&mut self, table: &Table) -> Result { let added_data_files = std::mem::take(&mut self.added_data_files); if added_data_files.is_empty() { return Err(Error::new( @@ -188,7 +113,7 @@ impl SnapshotProduceAction { } let snapshot_id = self.snapshot_id; - let format_version = self.tx.current_table.metadata().format_version(); + let format_version = table.metadata().format_version(); let manifest_entries = added_data_files.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) @@ -203,18 +128,13 @@ impl SnapshotProduceAction { }); let mut writer = { let builder = ManifestWriterBuilder::new( - self.new_manifest_output()?, + self.new_manifest_output(table)?, Some(self.snapshot_id), self.key_metadata.clone(), - self.tx.current_table.metadata().current_schema().clone(), - self.tx - .current_table - .metadata() - .default_partition_spec() - .as_ref() - .clone(), + table.metadata().current_schema().clone(), + table.metadata().default_partition_spec().as_ref().clone(), ); - if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { + if table.metadata().format_version() == FormatVersion::V1 { builder.build_v1() } else { builder.build_v2_data() @@ -228,11 +148,12 @@ impl SnapshotProduceAction { async fn manifest_file( &mut self, + table: &Table, snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - let added_manifest = self.write_added_manifest().await?; - let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; + let added_manifest = self.write_added_manifest(table).await?; + let existing_manifests = snapshot_produce_operation.existing_manifest(table).await?; // # TODO // Support process delete entries. @@ -245,10 +166,11 @@ impl SnapshotProduceAction { // Returns a `Summary` of the current snapshot fn summary( &self, + table: &Table, snapshot_produce_operation: &OP, ) -> Result { let mut summary_collector = SnapshotSummaryCollector::default(); - let table_metadata = self.tx.current_table.metadata_ref(); + let table_metadata = table.metadata_ref(); let partition_summary_limit = if let Some(limit) = table_metadata .properties() @@ -293,10 +215,10 @@ impl SnapshotProduceAction { ) } - fn generate_manifest_list_file_path(&self, attempt: i64) -> String { + fn generate_manifest_list_file_path(&self, table: &Table, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", - self.tx.current_table.metadata().location(), + table.metadata().location(), META_ROOT_PATH, self.snapshot_id, attempt, @@ -305,43 +227,37 @@ impl SnapshotProduceAction { ) } - /// Finished building the action and apply it to the transaction. - pub async fn apply( + /// Finished building the action and return the [`ActionCommit`] to the transaction. + pub(crate) async fn commit( mut self, + table: &Table, snapshot_produce_operation: OP, process: MP, - ) -> Result { + ) -> Result { let new_manifests = self - .manifest_file(&snapshot_produce_operation, &process) + .manifest_file(table, &snapshot_produce_operation, &process) .await?; - let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); + let next_seq_num = table.metadata().next_sequence_number(); let summary = self - .summary(&snapshot_produce_operation) + .summary(table, &snapshot_produce_operation) .map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.") .with_source(err) - }) - .unwrap(); + })?; - let manifest_list_path = self.generate_manifest_list_file_path(0); + let manifest_list_path = self.generate_manifest_list_file_path(table, 0); - let mut manifest_list_writer = match self.tx.current_table.metadata().format_version() { + let mut manifest_list_writer = match table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( - self.tx - .current_table - .file_io() - .new_output(manifest_list_path.clone())?, + table.file_io().new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.current_table.metadata().current_snapshot_id(), + table.metadata().current_snapshot_id(), ), FormatVersion::V2 => ManifestListWriter::v2( - self.tx - .current_table - .file_io() - .new_output(manifest_list_path.clone())?, + table.file_io().new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.current_table.metadata().current_snapshot_id(), + table.metadata().current_snapshot_id(), next_seq_num, ), }; @@ -352,37 +268,36 @@ impl SnapshotProduceAction { let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(self.tx.current_table.metadata().current_snapshot_id()) + .with_parent_snapshot_id(table.metadata().current_snapshot_id()) .with_sequence_number(next_seq_num) .with_summary(summary) - .with_schema_id(self.tx.current_table.metadata().current_schema_id()) + .with_schema_id(table.metadata().current_schema_id()) .with_timestamp_ms(commit_ts) .build(); - self.tx.apply( - vec![ - TableUpdate::AddSnapshot { - snapshot: new_snapshot, - }, - TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRANCH.to_string(), - reference: SnapshotReference::new( - self.snapshot_id, - SnapshotRetention::branch(None, None, None), - ), - }, - ], - vec![ - TableRequirement::UuidMatch { - uuid: self.tx.current_table.metadata().uuid(), - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.current_table.metadata().current_snapshot_id(), - }, - ], - )?; - - Ok(self.tx) + let updates = vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot, + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference::new( + self.snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ]; + + let requirements = vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id(), + }, + ]; + + Ok(ActionCommit::new(updates, requirements)) } } From 575a4a443db9385f860264b6b2143e9b69f3ce15 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 16 Jun 2025 14:58:56 -0700 Subject: [PATCH 2/5] Fix tests --- crates/iceberg/src/transaction/append.rs | 75 ++++++++++++------- crates/iceberg/src/transaction/mod.rs | 6 +- .../shared_tests/append_data_file_test.rs | 9 ++- .../append_partition_data_file_test.rs | 36 +++++---- .../shared_tests/conflict_commit_test.rs | 16 ++-- .../tests/shared_tests/scan_all_type.rs | 9 ++- 6 files changed, 94 insertions(+), 57 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 87f6bd661..12860eadf 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -250,33 +250,32 @@ impl SnapshotProduceOperation for FastAppendOperation { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; use crate::scan::tests::TableTestFixture; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct, }; - use crate::transaction::Transaction; use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; use crate::{TableRequirement, TableUpdate}; #[tokio::test] async fn test_empty_data_append_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); - action.add_data_files(vec![]).unwrap(); - assert!(action.apply().await.is_err()); + let action = tx.fast_append(None, vec![]).add_data_files(vec![]); + assert!(Arc::new(action).commit(&table).await.is_err()); } #[tokio::test] async fn test_set_snapshot_properties() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); let mut snapshot_properties = HashMap::new(); snapshot_properties.insert("key".to_string(), "val".to_string()); - action.set_snapshot_properties(snapshot_properties).unwrap(); + let data_file = DataFileBuilder::default() .content(DataContentType::Data) .file_path("test/1.parquet".to_string()) @@ -287,11 +286,16 @@ mod tests { .partition(Struct::from_iter([Some(Literal::long(300))])) .build() .unwrap(); - action.add_data_files(vec![data_file]).unwrap(); - let tx = action.apply().await.unwrap(); + + let action = tx + .fast_append(None, vec![]) + .set_snapshot_properties(snapshot_properties) + .add_data_files(vec![data_file]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); // Check customized properties is contained in snapshot summary properties. - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { snapshot } else { unreachable!() @@ -307,10 +311,10 @@ mod tests { } #[tokio::test] - async fn test_fast_append_action() { + async fn test_fast_append_file_with_incompatible_partition_value() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); + let action = tx.fast_append(None, vec![]); // check add data file with incompatible partition value let data_file = DataFileBuilder::default() @@ -323,7 +327,17 @@ mod tests { .partition(Struct::from_iter([Some(Literal::string("test"))])) .build() .unwrap(); - assert!(action.add_data_files(vec![data_file.clone()]).is_err()); + + let action = action.add_data_files(vec![data_file.clone()]); + + assert!(Arc::new(action).commit(&table).await.is_err()); + } + + #[tokio::test] + async fn test_fast_append() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let action = tx.fast_append(None, vec![]); let data_file = DataFileBuilder::default() .content(DataContentType::Data) @@ -335,12 +349,15 @@ mod tests { .partition(Struct::from_iter([Some(Literal::long(300))])) .build() .unwrap(); - action.add_data_files(vec![data_file.clone()]).unwrap(); - let tx = action.apply().await.unwrap(); + + let action = action.add_data_files(vec![data_file.clone()]); + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); // check updates and requirements assert!( - matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) + matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) ); assert_eq!( vec![ @@ -352,11 +369,11 @@ mod tests { snapshot_id: table.metadata().current_snapshot_id } ], - tx.requirements + requirements ); // check manifest list - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { snapshot } else { unreachable!() @@ -395,19 +412,23 @@ mod tests { async fn test_add_duplicated_parquet_files_to_unpartitioned_table() { let mut fixture = TableTestFixture::new_unpartitioned(); fixture.setup_unpartitioned_manifest_files().await; - let tx = crate::transaction::Transaction::new(&fixture.table); + let tx = Transaction::new(&fixture.table); let file_paths = vec![ format!("{}/1.parquet", &fixture.table_location), format!("{}/3.parquet", &fixture.table_location), ]; - let fast_append_action = tx.fast_append(None, vec![]).unwrap(); + let fast_append_action = tx + .fast_append(None, vec![]) + .add_parquet_files(&fixture.table, file_paths.clone()) + .await + .unwrap(); // Attempt to add duplicated Parquet files with fast append. assert!( - fast_append_action - .add_parquet_files(file_paths.clone()) + Arc::new(fast_append_action) + .commit(&fixture.table) .await .is_err(), "file already in table" @@ -415,13 +436,17 @@ mod tests { let file_paths = vec![format!("{}/2.parquet", &fixture.table_location)]; - let tx = crate::transaction::Transaction::new(&fixture.table); - let fast_append_action = tx.fast_append(None, vec![]).unwrap(); + let tx = Transaction::new(&fixture.table); + let fast_append_action = tx + .fast_append(None, vec![]) + .add_parquet_files(&fixture.table, file_paths.clone()) + .await + .unwrap(); // Attempt to add Parquet file which was deleted from table. assert!( - fast_append_action - .add_parquet_files(file_paths.clone()) + Arc::new(fast_append_action) + .commit(&fixture.table) .await .is_ok(), "file not in table" diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index ab8319aed..91431b0b7 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -141,7 +141,11 @@ impl Transaction { } /// Creates a fast append action. - pub fn fast_append(&self, commit_uuid: Option, key_metadata: Vec) -> FastAppendAction { + pub fn fast_append( + &self, + commit_uuid: Option, + key_metadata: Vec, + ) -> FastAppendAction { let snapshot_id = self.generate_unique_snapshot_id(); FastAppendAction::new( snapshot_id, diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index 38a029510..542ef9ccf 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; -use iceberg::transaction::Transaction; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -112,9 +112,10 @@ async fn test_append_data_file() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let append_action = tx + .fast_append(None, vec![]) + .add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index c5c029a45..53d86b6ab 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -23,7 +23,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::spec::{Literal, PrimitiveLiteral, Struct, Transform, UnboundPartitionSpec}; use iceberg::table::Table; -use iceberg::transaction::Transaction; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -120,11 +120,10 @@ async fn test_append_partition_data_file() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action - .add_data_files(data_file_valid.clone()) - .unwrap(); - let tx = append_action.apply().await.unwrap(); + let append_action = tx + .fast_append(None, vec![]) + .add_data_files(data_file_valid.clone()); + let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result @@ -144,6 +143,7 @@ async fn test_append_partition_data_file() { parquet_writer_builder.clone(), batch.clone(), table.clone(), + &rest_catalog, ) .await; @@ -151,6 +151,7 @@ async fn test_append_partition_data_file() { parquet_writer_builder, batch, table, + &rest_catalog, first_partition_id_value, ) .await; @@ -163,6 +164,7 @@ async fn test_schema_incompatible_partition_type( >, batch: RecordBatch, table: Table, + catalog: &dyn Catalog, ) { // test writing different "type" of partition than mentioned in schema let mut data_file_writer_invalid = DataFileWriterBuilder::new( @@ -180,11 +182,12 @@ async fn test_schema_incompatible_partition_type( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - if append_action - .add_data_files(data_file_invalid.clone()) - .is_ok() - { + let append_action = tx + .fast_append(None, vec![]) + .add_data_files(data_file_invalid.clone()); + let tx = append_action.apply(tx).unwrap(); + + if tx.commit(catalog).await.is_ok() { panic!("diverging partition info should have returned error"); } } @@ -196,6 +199,7 @@ async fn test_schema_incompatible_partition_fields( >, batch: RecordBatch, table: Table, + catalog: &dyn Catalog, first_partition_id_value: i32, ) { // test writing different number of partition fields than mentioned in schema @@ -220,11 +224,11 @@ async fn test_schema_incompatible_partition_fields( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - if append_action - .add_data_files(data_file_invalid.clone()) - .is_ok() - { + let append_action = tx + .fast_append(None, vec![]) + .add_data_files(data_file_invalid.clone()); + let tx = append_action.apply(tx).unwrap(); + if tx.commit(catalog).await.is_ok() { panic!("passing different number of partition fields should have returned error"); } } diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index d277e12e5..171b9aac9 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; -use iceberg::transaction::Transaction; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -90,14 +90,16 @@ async fn test_append_data_file_conflict() { // start two transaction and commit one of them let tx1 = Transaction::new(&table); - let mut append_action = tx1.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx1 = append_action.apply().await.unwrap(); + let append_action = tx1 + .fast_append(None, vec![]) + .add_data_files(data_file.clone()); + let tx1 = append_action.apply(tx1).unwrap(); let tx2 = Transaction::new(&table); - let mut append_action = tx2.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx2 = append_action.apply().await.unwrap(); + let append_action = tx2 + .fast_append(None, vec![]) + .add_data_files(data_file.clone()); + let tx2 = append_action.apply(tx2).unwrap(); let table = tx2 .commit(&rest_catalog) .await diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 5ff982720..25774d26a 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -33,7 +33,7 @@ use iceberg::spec::{ LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedField, PrimitiveType, Schema, StructType, Type, }; -use iceberg::transaction::Transaction; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ @@ -309,9 +309,10 @@ async fn test_scan_all_type() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); - append_action.add_data_files(data_file.clone()).unwrap(); - let tx = append_action.apply().await.unwrap(); + let append_action = tx + .fast_append(None, vec![]) + .add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); // check result From b64ecdacfb329058506ebd69afd4a33584fbbe88 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 16 Jun 2025 15:48:33 -0700 Subject: [PATCH 3/5] I love clippy --- crates/iceberg/src/transaction/append.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 12860eadf..54198c903 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -54,7 +54,7 @@ impl FastAppendAction { } } - fn validate_added_data_files(table: &Table, added_data_files: &Vec) -> Result<()> { + fn validate_added_data_files(table: &Table, added_data_files: &[DataFile]) -> Result<()> { for data_file in added_data_files { if data_file.content_type() != crate::spec::DataContentType::Data { return Err(Error::new( @@ -80,7 +80,7 @@ impl FastAppendAction { async fn validate_duplicate_files( table: &Table, - added_data_files: &Vec, + added_data_files: &[DataFile], ) -> Result<()> { let new_files: HashSet<&str> = added_data_files .iter() @@ -202,8 +202,8 @@ impl TransactionAction for FastAppendAction { } let snapshot_producer = SnapshotProducer::new( - self.snapshot_id.clone(), - self.commit_uuid.clone(), + self.snapshot_id, + self.commit_uuid, self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), From cc885c6c5a1b57eb96df03aadddfb6ccd1d259c8 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 16 Jun 2025 16:29:02 -0700 Subject: [PATCH 4/5] fmt --- crates/iceberg/src/transaction/append.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 54198c903..a6974df76 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -78,10 +78,7 @@ impl FastAppendAction { Ok(()) } - async fn validate_duplicate_files( - table: &Table, - added_data_files: &[DataFile], - ) -> Result<()> { + async fn validate_duplicate_files(table: &Table, added_data_files: &[DataFile]) -> Result<()> { let new_files: HashSet<&str> = added_data_files .iter() .map(|df| df.file_path.as_str()) From fa4f6a79154694dd8cef6753ef13bc3f2d896ddc Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 17 Jun 2025 20:23:53 -0700 Subject: [PATCH 5/5] Simplify tx.fast_append API, move helper methods back to snapshot producer, remove add_parquet_files --- crates/iceberg/src/io/object_cache.rs | 2 +- crates/iceberg/src/scan/mod.rs | 4 +- crates/iceberg/src/spec/manifest/mod.rs | 12 +- crates/iceberg/src/spec/manifest/writer.rs | 12 +- crates/iceberg/src/transaction/append.rs | 202 +++--------------- crates/iceberg/src/transaction/mod.rs | 13 +- crates/iceberg/src/transaction/snapshot.rs | 104 ++++++++- .../shared_tests/append_data_file_test.rs | 4 +- .../append_partition_data_file_test.rs | 12 +- .../shared_tests/conflict_commit_test.rs | 8 +- .../tests/shared_tests/scan_all_type.rs | 4 +- 11 files changed, 150 insertions(+), 227 deletions(-) diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 23f869b83..a23ff36b3 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -266,7 +266,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), - vec![], + None, current_schema.clone(), current_partition_spec.as_ref().clone(), ) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index f7a43a647..27dc1af98 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -752,7 +752,7 @@ pub mod tests { let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), - vec![], + None, current_schema.clone(), current_partition_spec.as_ref().clone(), ) @@ -964,7 +964,7 @@ pub mod tests { let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), Some(current_snapshot.snapshot_id()), - vec![], + None, current_schema.clone(), current_partition_spec.as_ref().clone(), ) diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index eba69dbac..33b7d3870 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -232,7 +232,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(1), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -417,7 +417,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(2), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -514,7 +514,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(3), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -623,7 +623,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(2), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -731,7 +731,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(2), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) @@ -1010,7 +1010,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(1), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 92d356fb8..39945a513 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -40,7 +40,7 @@ use crate::{Error, ErrorKind}; pub struct ManifestWriterBuilder { output: OutputFile, snapshot_id: Option, - key_metadata: Vec, + key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, } @@ -50,7 +50,7 @@ impl ManifestWriterBuilder { pub fn new( output: OutputFile, snapshot_id: Option, - key_metadata: Vec, + key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, ) -> Self { @@ -115,7 +115,7 @@ pub struct ManifestWriter { min_seq_num: Option, - key_metadata: Vec, + key_metadata: Option>, manifest_entries: Vec, @@ -127,7 +127,7 @@ impl ManifestWriter { pub(crate) fn new( output: OutputFile, snapshot_id: Option, - key_metadata: Vec, + key_metadata: Option>, metadata: ManifestMetadata, ) -> Self { Self { @@ -416,7 +416,7 @@ impl ManifestWriter { existing_rows_count: Some(self.existing_rows), deleted_rows_count: Some(self.deleted_rows), partitions: Some(partition_summary), - key_metadata: Some(self.key_metadata), + key_metadata: self.key_metadata, }) } } @@ -622,7 +622,7 @@ mod tests { let mut writer = ManifestWriterBuilder::new( output_file, Some(3), - vec![], + None, metadata.schema.clone(), metadata.partition_spec.clone(), ) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index a6974df76..5195b99a3 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -15,135 +15,43 @@ // specific language governing permissions and limitations // under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; use uuid::Uuid; use crate::error::Result; -use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation, Struct, StructType}; +use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; use crate::table::Table; use crate::transaction::snapshot::{ DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, }; use crate::transaction::{ActionCommit, TransactionAction}; -use crate::writer::file_writer::ParquetWriter; -use crate::{Error, ErrorKind}; /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction { check_duplicate: bool, // below are properties used to create SnapshotProducer when commit snapshot_id: i64, - commit_uuid: Uuid, - key_metadata: Vec, + commit_uuid: Option, + key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, } impl FastAppendAction { - pub(crate) fn new(snapshot_id: i64, commit_uuid: Uuid, key_metadata: Vec) -> Self { + pub(crate) fn new(snapshot_id: i64) -> Self { Self { check_duplicate: true, snapshot_id, - commit_uuid, - key_metadata, + commit_uuid: None, + key_metadata: None, snapshot_properties: HashMap::default(), added_data_files: vec![], } } - fn validate_added_data_files(table: &Table, added_data_files: &[DataFile]) -> Result<()> { - for data_file in added_data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { - return Err(Error::new( - ErrorKind::DataInvalid, - "Only data content type is allowed for fast append", - )); - } - // Check if the data file partition spec id matches the table default partition spec id. - if table.metadata().default_partition_spec_id() != data_file.partition_spec_id { - return Err(Error::new( - ErrorKind::DataInvalid, - "Data file partition spec id does not match table default partition spec id", - )); - } - Self::validate_partition_value( - data_file.partition(), - table.metadata().default_partition_type(), - )?; - } - - Ok(()) - } - - async fn validate_duplicate_files(table: &Table, added_data_files: &[DataFile]) -> Result<()> { - let new_files: HashSet<&str> = added_data_files - .iter() - .map(|df| df.file_path.as_str()) - .collect(); - - let mut referenced_files = Vec::new(); - if let Some(current_snapshot) = table.metadata().current_snapshot() { - let manifest_list = current_snapshot - .load_manifest_list(table.file_io(), &table.metadata_ref()) - .await?; - for manifest_list_entry in manifest_list.entries() { - let manifest = manifest_list_entry.load_manifest(table.file_io()).await?; - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); - } - } - } - } - - if !referenced_files.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot add files that are already referenced by table, files: {}", - referenced_files.join(", ") - ), - )); - } - - Ok(()) - } - - // Check if the partition value is compatible with the partition type. - fn validate_partition_value( - partition_value: &Struct, - partition_type: &StructType, - ) -> Result<()> { - if partition_value.fields().len() != partition_type.fields().len() { - return Err(Error::new( - ErrorKind::DataInvalid, - "Partition value is not compatible with partition type", - )); - } - - for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { - let field = field.field_type.as_primitive_type().ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "Partition field should only be primitive type.", - ) - })?; - if let Some(value) = value { - if !field.compatible(&value.as_primitive_literal().unwrap()) { - return Err(Error::new( - ErrorKind::DataInvalid, - "Partition value is not compatible partition type", - )); - } - } - } - Ok(()) - } - /// Set whether to check duplicate files pub fn with_check_duplicate(mut self, v: bool) -> Self { self.check_duplicate = v; @@ -156,34 +64,22 @@ impl FastAppendAction { self } - /// Set snapshot summary properties. - pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { - self.snapshot_properties = snapshot_properties; + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); self } - /// Adds existing parquet files - /// - /// Note: This API is not yet fully supported in version 0.5.x. - /// It is currently incomplete and should not be used in production. - /// Specifically, schema compatibility checks and support for adding to partitioned tables - /// have not yet been implemented. - #[allow(dead_code)] - async fn add_parquet_files(self, table: &Table, file_path: Vec) -> Result { - if !table.metadata().default_spec.is_unpartitioned() { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Appending to partitioned tables is not supported", - )); - } - - let table_metadata = table.metadata(); - - let data_files = - ParquetWriter::parquet_files_to_data_files(table.file_io(), file_path, table_metadata) - .await?; + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } - Ok(self.add_data_files(data_files)) + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self } } @@ -191,16 +87,16 @@ impl FastAppendAction { impl TransactionAction for FastAppendAction { async fn commit(self: Arc, table: &Table) -> Result { // validate added files - Self::validate_added_data_files(table, &self.added_data_files)?; + SnapshotProducer::validate_added_data_files(table, &self.added_data_files)?; // Checks duplicate files if self.check_duplicate { - Self::validate_duplicate_files(table, &self.added_data_files).await?; + SnapshotProducer::validate_duplicate_files(table, &self.added_data_files).await?; } let snapshot_producer = SnapshotProducer::new( self.snapshot_id, - self.commit_uuid, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), @@ -249,7 +145,6 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; - use crate::scan::tests::TableTestFixture; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct, }; @@ -261,7 +156,7 @@ mod tests { async fn test_empty_data_append_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let action = tx.fast_append(None, vec![]).add_data_files(vec![]); + let action = tx.fast_append().add_data_files(vec![]); assert!(Arc::new(action).commit(&table).await.is_err()); } @@ -285,7 +180,7 @@ mod tests { .unwrap(); let action = tx - .fast_append(None, vec![]) + .fast_append() .set_snapshot_properties(snapshot_properties) .add_data_files(vec![data_file]); let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); @@ -311,7 +206,7 @@ mod tests { async fn test_fast_append_file_with_incompatible_partition_value() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let action = tx.fast_append(None, vec![]); + let action = tx.fast_append(); // check add data file with incompatible partition value let data_file = DataFileBuilder::default() @@ -334,7 +229,7 @@ mod tests { async fn test_fast_append() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let action = tx.fast_append(None, vec![]); + let action = tx.fast_append(); let data_file = DataFileBuilder::default() .content(DataContentType::Data) @@ -404,49 +299,4 @@ mod tests { ); assert_eq!(data_file, *manifest.entries()[0].data_file()); } - - #[tokio::test] - async fn test_add_duplicated_parquet_files_to_unpartitioned_table() { - let mut fixture = TableTestFixture::new_unpartitioned(); - fixture.setup_unpartitioned_manifest_files().await; - let tx = Transaction::new(&fixture.table); - - let file_paths = vec![ - format!("{}/1.parquet", &fixture.table_location), - format!("{}/3.parquet", &fixture.table_location), - ]; - - let fast_append_action = tx - .fast_append(None, vec![]) - .add_parquet_files(&fixture.table, file_paths.clone()) - .await - .unwrap(); - - // Attempt to add duplicated Parquet files with fast append. - assert!( - Arc::new(fast_append_action) - .commit(&fixture.table) - .await - .is_err(), - "file already in table" - ); - - let file_paths = vec![format!("{}/2.parquet", &fixture.table_location)]; - - let tx = Transaction::new(&fixture.table); - let fast_append_action = tx - .fast_append(None, vec![]) - .add_parquet_files(&fixture.table, file_paths.clone()) - .await - .unwrap(); - - // Attempt to add Parquet file which was deleted from table. - assert!( - Arc::new(fast_append_action) - .commit(&fixture.table) - .await - .is_ok(), - "file not in table" - ); - } } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index a333dd738..a2af9aaa3 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -141,17 +141,8 @@ impl Transaction { } /// Creates a fast append action. - pub fn fast_append( - &self, - commit_uuid: Option, - key_metadata: Vec, - ) -> FastAppendAction { - let snapshot_id = self.generate_unique_snapshot_id(); - FastAppendAction::new( - snapshot_id, - commit_uuid.unwrap_or_else(Uuid::now_v7), - key_metadata, - ) + pub fn fast_append(&self) -> FastAppendAction { + FastAppendAction::new(self.generate_unique_snapshot_id()) } /// Creates replace sort order action. diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 79066fd03..80a9f68a1 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; @@ -27,7 +27,7 @@ use crate::spec::{ DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, - SnapshotSummaryCollector, Summary, update_snapshot_summaries, + SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit; @@ -63,7 +63,7 @@ pub(crate) trait ManifestProcess: Send + Sync { pub(crate) struct SnapshotProducer { snapshot_id: i64, commit_uuid: Uuid, - key_metadata: Vec, + key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, // A counter used to generate unique manifest file names. @@ -76,7 +76,7 @@ impl SnapshotProducer { pub(crate) fn new( snapshot_id: i64, commit_uuid: Uuid, - key_metadata: Vec, + key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, ) -> Self { @@ -90,6 +90,71 @@ impl SnapshotProducer { } } + pub(crate) fn validate_added_data_files( + table: &Table, + added_data_files: &[DataFile], + ) -> Result<()> { + for data_file in added_data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + // Check if the data file partition spec id matches the table default partition spec id. + if table.metadata().default_partition_spec_id() != data_file.partition_spec_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Data file partition spec id does not match table default partition spec id", + )); + } + Self::validate_partition_value( + data_file.partition(), + table.metadata().default_partition_type(), + )?; + } + + Ok(()) + } + + pub(crate) async fn validate_duplicate_files( + table: &Table, + added_data_files: &[DataFile], + ) -> Result<()> { + let new_files: HashSet<&str> = added_data_files + .iter() + .map(|df| df.file_path.as_str()) + .collect(); + + let mut referenced_files = Vec::new(); + if let Some(current_snapshot) = table.metadata().current_snapshot() { + let manifest_list = current_snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await?; + for manifest_list_entry in manifest_list.entries() { + let manifest = manifest_list_entry.load_manifest(table.file_io()).await?; + for entry in manifest.entries() { + let file_path = entry.file_path(); + if new_files.contains(file_path) && entry.is_alive() { + referenced_files.push(file_path.to_string()); + } + } + } + } + + if !referenced_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add files that are already referenced by table, files: {}", + referenced_files.join(", ") + ), + )); + } + + Ok(()) + } + fn new_manifest_output(&mut self, table: &Table) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", @@ -102,6 +167,37 @@ impl SnapshotProducer { table.file_io().new_output(new_manifest_path) } + // Check if the partition value is compatible with the partition type. + fn validate_partition_value( + partition_value: &Struct, + partition_type: &StructType, + ) -> Result<()> { + if partition_value.fields().len() != partition_type.fields().len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatible with partition type", + )); + } + + for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { + let field = field.field_type.as_primitive_type().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Partition field should only be primitive type.", + ) + })?; + if let Some(value) = value { + if !field.compatible(&value.as_primitive_literal().unwrap()) { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatible partition type", + )); + } + } + } + Ok(()) + } + // Write manifest file for added data files and return the ManifestFile for ManifestList. async fn write_added_manifest(&mut self, table: &Table) -> Result { let added_data_files = std::mem::take(&mut self.added_data_files); diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index 542ef9ccf..dd54ac98f 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -112,9 +112,7 @@ async fn test_append_data_file() { // commit result let tx = Transaction::new(&table); - let append_action = tx - .fast_append(None, vec![]) - .add_data_files(data_file.clone()); + let append_action = tx.fast_append().add_data_files(data_file.clone()); let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 53d86b6ab..36a1c3643 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -120,9 +120,7 @@ async fn test_append_partition_data_file() { // commit result let tx = Transaction::new(&table); - let append_action = tx - .fast_append(None, vec![]) - .add_data_files(data_file_valid.clone()); + let append_action = tx.fast_append().add_data_files(data_file_valid.clone()); let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); @@ -182,9 +180,7 @@ async fn test_schema_incompatible_partition_type( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let append_action = tx - .fast_append(None, vec![]) - .add_data_files(data_file_invalid.clone()); + let append_action = tx.fast_append().add_data_files(data_file_invalid.clone()); let tx = append_action.apply(tx).unwrap(); if tx.commit(catalog).await.is_ok() { @@ -224,9 +220,7 @@ async fn test_schema_incompatible_partition_fields( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let append_action = tx - .fast_append(None, vec![]) - .add_data_files(data_file_invalid.clone()); + let append_action = tx.fast_append().add_data_files(data_file_invalid.clone()); let tx = append_action.apply(tx).unwrap(); if tx.commit(catalog).await.is_ok() { panic!("passing different number of partition fields should have returned error"); diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index 171b9aac9..551116bf9 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -90,15 +90,11 @@ async fn test_append_data_file_conflict() { // start two transaction and commit one of them let tx1 = Transaction::new(&table); - let append_action = tx1 - .fast_append(None, vec![]) - .add_data_files(data_file.clone()); + let append_action = tx1.fast_append().add_data_files(data_file.clone()); let tx1 = append_action.apply(tx1).unwrap(); let tx2 = Transaction::new(&table); - let append_action = tx2 - .fast_append(None, vec![]) - .add_data_files(data_file.clone()); + let append_action = tx2.fast_append().add_data_files(data_file.clone()); let tx2 = append_action.apply(tx2).unwrap(); let table = tx2 .commit(&rest_catalog) diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 25774d26a..51f409392 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -309,9 +309,7 @@ async fn test_scan_all_type() { // commit result let tx = Transaction::new(&table); - let append_action = tx - .fast_append(None, vec![]) - .add_data_files(data_file.clone()); + let append_action = tx.fast_append().add_data_files(data_file.clone()); let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap();