From fd1ac4f0af7139ced4c904cad48525eb92365309 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Sun, 6 Jul 2025 19:31:05 +0800 Subject: [PATCH 1/2] refine the interface of SnapshotProducer --- crates/iceberg/src/transaction/append.rs | 16 ++-- crates/iceberg/src/transaction/snapshot.rs | 100 +++++++++++---------- 2 files changed, 65 insertions(+), 51 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 9ecbb54d9c..f0e2833605 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -101,7 +101,7 @@ impl TransactionAction for FastAppendAction { ); snapshot_producer - .commit(table, FastAppendOperation, DefaultManifestProcess) + .commit(FastAppendOperation, DefaultManifestProcess) .await } } @@ -115,18 +115,24 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn delete_entries( &self, - _snapshot_produce: &SnapshotProducer, + _snapshot_produce: &SnapshotProducer<'_>, ) -> Result> { Ok(vec![]) } - async fn existing_manifest(&self, table: &Table) -> Result> { - let Some(snapshot) = table.metadata().current_snapshot() else { + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { return Ok(vec![]); }; let manifest_list = snapshot - .load_manifest_list(table.file_io(), &table.metadata_ref()) + .load_manifest_list( + snapshot_produce.table.file_io(), + &snapshot_produce.table.metadata_ref(), + ) .await?; Ok(manifest_list diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 113e6a7673..ef564fc4da 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -44,23 +44,32 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { ) -> impl Future>> + Send; fn existing_manifest( &self, - table: &Table, + snapshot_produce: &SnapshotProducer<'_>, ) -> impl Future>> + Send; } pub(crate) struct DefaultManifestProcess; impl ManifestProcess for DefaultManifestProcess { - fn process_manifests(&self, manifests: Vec) -> Vec { + fn process_manifests( + &self, + _snapshot_produce: &SnapshotProducer<'_>, + manifests: Vec, + ) -> Vec { manifests } } pub(crate) trait ManifestProcess: Send + Sync { - fn process_manifests(&self, manifests: Vec) -> Vec; + fn process_manifests( + &self, + snapshot_produce: &SnapshotProducer<'_>, + manifests: Vec, + ) -> Vec; } -pub(crate) struct SnapshotProducer { +pub(crate) struct SnapshotProducer<'a> { + pub(crate) table: &'a Table, snapshot_id: i64, commit_uuid: Uuid, key_metadata: Option>, @@ -72,15 +81,16 @@ pub(crate) struct SnapshotProducer { manifest_counter: RangeFrom, } -impl SnapshotProducer { +impl<'a> SnapshotProducer<'a> { pub(crate) fn new( - table: &Table, + table: &'a Table, commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, ) -> Self { Self { + table, snapshot_id: Self::generate_unique_snapshot_id(table), commit_uuid, key_metadata, @@ -177,28 +187,28 @@ impl SnapshotProducer { snapshot_id } - fn new_manifest_writer( - &mut self, - content: ManifestContentType, - table: &Table, - ) -> Result { + fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", - table.metadata().location(), + self.table.metadata().location(), META_ROOT_PATH, self.commit_uuid, self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - let output_file = table.file_io().new_output(new_manifest_path)?; + let output_file = self.table.file_io().new_output(new_manifest_path)?; let builder = ManifestWriterBuilder::new( output_file, Some(self.snapshot_id), self.key_metadata.clone(), - table.metadata().current_schema().clone(), - table.metadata().default_partition_spec().as_ref().clone(), + self.table.metadata().current_schema().clone(), + self.table + .metadata() + .default_partition_spec() + .as_ref() + .clone(), ); - if table.metadata().format_version() == FormatVersion::V1 { + if self.table.metadata().format_version() == FormatVersion::V1 { Ok(builder.build_v1()) } else { match content { @@ -240,7 +250,7 @@ impl SnapshotProducer { } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self, table: &Table) -> Result { + async fn write_added_manifest(&mut self) -> Result { let added_data_files = std::mem::take(&mut self.added_data_files); if added_data_files.is_empty() { return Err(Error::new( @@ -250,7 +260,7 @@ impl SnapshotProducer { } let snapshot_id = self.snapshot_id; - let format_version = table.metadata().format_version(); + let format_version = self.table.metadata().format_version(); let manifest_entries = added_data_files.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) @@ -263,7 +273,7 @@ impl SnapshotProducer { builder.build() } }); - let mut writer = self.new_manifest_writer(ManifestContentType::Data, table)?; + let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; for entry in manifest_entries { writer.add_entry(entry)?; } @@ -272,29 +282,27 @@ impl SnapshotProducer { async fn manifest_file( &mut self, - table: &Table, snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - let added_manifest = self.write_added_manifest(table).await?; - let existing_manifests = snapshot_produce_operation.existing_manifest(table).await?; + let added_manifest = self.write_added_manifest().await?; + let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; // # TODO // Support process delete entries. let mut manifest_files = vec![added_manifest]; manifest_files.extend(existing_manifests); - let manifest_files = manifest_process.process_manifests(manifest_files); + let manifest_files = manifest_process.process_manifests(self, manifest_files); Ok(manifest_files) } // Returns a `Summary` of the current snapshot fn summary( &self, - table: &Table, snapshot_produce_operation: &OP, ) -> Result { let mut summary_collector = SnapshotSummaryCollector::default(); - let table_metadata = table.metadata_ref(); + let table_metadata = self.table.metadata_ref(); let partition_summary_limit = if let Some(limit) = table_metadata .properties() @@ -339,10 +347,10 @@ impl SnapshotProducer { ) } - fn generate_manifest_list_file_path(&self, table: &Table, attempt: i64) -> String { + fn generate_manifest_list_file_path(&self, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", - table.metadata().location(), + self.table.metadata().location(), META_ROOT_PATH, self.snapshot_id, attempt, @@ -354,34 +362,34 @@ impl SnapshotProducer { /// Finished building the action and return the [`ActionCommit`] to the transaction. pub(crate) async fn commit( mut self, - table: &Table, snapshot_produce_operation: OP, process: MP, ) -> Result { let new_manifests = self - .manifest_file(table, &snapshot_produce_operation, &process) + .manifest_file(&snapshot_produce_operation, &process) .await?; - let next_seq_num = table.metadata().next_sequence_number(); + let next_seq_num = self.table.metadata().next_sequence_number(); - let summary = self - .summary(table, &snapshot_produce_operation) - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.") - .with_source(err) - })?; + let summary = self.summary(&snapshot_produce_operation).map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.").with_source(err) + })?; - let manifest_list_path = self.generate_manifest_list_file_path(table, 0); + let manifest_list_path = self.generate_manifest_list_file_path(0); - let mut manifest_list_writer = match table.metadata().format_version() { + let mut manifest_list_writer = match self.table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( - table.file_io().new_output(manifest_list_path.clone())?, + self.table + .file_io() + .new_output(manifest_list_path.clone())?, self.snapshot_id, - table.metadata().current_snapshot_id(), + self.table.metadata().current_snapshot_id(), ), FormatVersion::V2 => ManifestListWriter::v2( - table.file_io().new_output(manifest_list_path.clone())?, + self.table + .file_io() + .new_output(manifest_list_path.clone())?, self.snapshot_id, - table.metadata().current_snapshot_id(), + self.table.metadata().current_snapshot_id(), next_seq_num, ), }; @@ -392,10 +400,10 @@ impl SnapshotProducer { let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(table.metadata().current_snapshot_id()) + .with_parent_snapshot_id(self.table.metadata().current_snapshot_id()) .with_sequence_number(next_seq_num) .with_summary(summary) - .with_schema_id(table.metadata().current_schema_id()) + .with_schema_id(self.table.metadata().current_schema_id()) .with_timestamp_ms(commit_ts) .build(); @@ -414,11 +422,11 @@ impl SnapshotProducer { let requirements = vec![ TableRequirement::UuidMatch { - uuid: table.metadata().uuid(), + uuid: self.table.metadata().uuid(), }, TableRequirement::RefSnapshotIdMatch { r#ref: MAIN_BRANCH.to_string(), - snapshot_id: table.metadata().current_snapshot_id(), + snapshot_id: self.table.metadata().current_snapshot_id(), }, ]; From d30ff237a3a34c5075495c511d65efb727f840fc Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 8 Jul 2025 22:09:03 +0800 Subject: [PATCH 2/2] refine validate_added_data_files && validate_duplicate_files --- crates/iceberg/src/transaction/append.rs | 18 ++++++++++-------- crates/iceberg/src/transaction/snapshot.rs | 19 +++++++++---------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index f0e2833605..6e719163d5 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -84,14 +84,6 @@ impl FastAppendAction { #[async_trait] impl TransactionAction for FastAppendAction { async fn commit(self: Arc, table: &Table) -> Result { - // validate added files - SnapshotProducer::validate_added_data_files(table, &self.added_data_files)?; - - // Checks duplicate files - if self.check_duplicate { - SnapshotProducer::validate_duplicate_files(table, &self.added_data_files).await?; - } - let snapshot_producer = SnapshotProducer::new( table, self.commit_uuid.unwrap_or_else(Uuid::now_v7), @@ -100,6 +92,16 @@ impl TransactionAction for FastAppendAction { self.added_data_files.clone(), ); + // validate added files + snapshot_producer.validate_added_data_files(&self.added_data_files)?; + + // Checks duplicate files + if self.check_duplicate { + snapshot_producer + .validate_duplicate_files(&self.added_data_files) + .await?; + } + snapshot_producer .commit(FastAppendOperation, DefaultManifestProcess) .await diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index ef564fc4da..092f9fb2d5 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -100,10 +100,7 @@ impl<'a> SnapshotProducer<'a> { } } - pub(crate) fn validate_added_data_files( - table: &Table, - added_data_files: &[DataFile], - ) -> Result<()> { + pub(crate) fn validate_added_data_files(&self, added_data_files: &[DataFile]) -> Result<()> { for data_file in added_data_files { if data_file.content_type() != crate::spec::DataContentType::Data { return Err(Error::new( @@ -112,7 +109,7 @@ impl<'a> SnapshotProducer<'a> { )); } // Check if the data file partition spec id matches the table default partition spec id. - if table.metadata().default_partition_spec_id() != data_file.partition_spec_id { + if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id { return Err(Error::new( ErrorKind::DataInvalid, "Data file partition spec id does not match table default partition spec id", @@ -120,7 +117,7 @@ impl<'a> SnapshotProducer<'a> { } Self::validate_partition_value( data_file.partition(), - table.metadata().default_partition_type(), + self.table.metadata().default_partition_type(), )?; } @@ -128,7 +125,7 @@ impl<'a> SnapshotProducer<'a> { } pub(crate) async fn validate_duplicate_files( - table: &Table, + &self, added_data_files: &[DataFile], ) -> Result<()> { let new_files: HashSet<&str> = added_data_files @@ -137,12 +134,14 @@ impl<'a> SnapshotProducer<'a> { .collect(); let mut referenced_files = Vec::new(); - if let Some(current_snapshot) = table.metadata().current_snapshot() { + if let Some(current_snapshot) = self.table.metadata().current_snapshot() { let manifest_list = current_snapshot - .load_manifest_list(table.file_io(), &table.metadata_ref()) + .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; for manifest_list_entry in manifest_list.entries() { - let manifest = manifest_list_entry.load_manifest(table.file_io()).await?; + let manifest = manifest_list_entry + .load_manifest(self.table.file_io()) + .await?; for entry in manifest.entries() { let file_path = entry.file_path(); if new_files.contains(file_path) && entry.is_alive() {