diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 017107f92..405422558 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -29,15 +29,15 @@ use crate::writer::file_writer::ParquetWriter; use crate::{Error, ErrorKind}; /// FastAppendAction is a transaction action for fast append data files to the table. -pub struct FastAppendAction<'a> { - snapshot_produce_action: SnapshotProduceAction<'a>, +pub struct FastAppendAction { + snapshot_produce_action: SnapshotProduceAction, check_duplicate: bool, } -impl<'a> FastAppendAction<'a> { +impl FastAppendAction { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - tx: Transaction<'a>, + tx: Transaction, snapshot_id: i64, commit_uuid: Uuid, key_metadata: Vec, @@ -87,7 +87,7 @@ impl<'a> FastAppendAction<'a> { /// Specifically, schema compatibility checks and support for adding to partitioned tables /// have not yet been implemented. #[allow(dead_code)] - async fn add_parquet_files(mut self, file_path: Vec) -> Result> { + async fn add_parquet_files(mut self, file_path: Vec) -> Result { if !self .snapshot_produce_action .tx @@ -117,7 +117,7 @@ impl<'a> FastAppendAction<'a> { } /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result> { + pub async fn apply(self) -> Result { // Checks duplicate files if self.check_duplicate { let new_files: HashSet<&str> = self @@ -170,14 +170,14 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn delete_entries( &self, - _snapshot_produce: &SnapshotProduceAction<'_>, + _snapshot_produce: &SnapshotProduceAction, ) -> Result> { Ok(vec![]) } async fn existing_manifest( &self, - snapshot_produce: &SnapshotProduceAction<'_>, + snapshot_produce: &SnapshotProduceAction, ) -> Result> { let Some(snapshot) = snapshot_produce .tx diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index ba79d60bb..e7802a138 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -37,18 +37,18 @@ use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; /// Table transaction. -pub struct Transaction<'a> { - base_table: &'a Table, +pub struct Transaction { + base_table: Table, current_table: Table, updates: Vec, requirements: Vec, } -impl<'a> Transaction<'a> { +impl Transaction { /// Creates a new transaction. - pub fn new(table: &'a Table) -> Self { + pub fn new(table: &Table) -> Self { Self { - base_table: table, + base_table: table.clone(), current_table: table.clone(), updates: vec![], requirements: vec![], @@ -155,7 +155,7 @@ impl<'a> Transaction<'a> { self, commit_uuid: Option, key_metadata: Vec, - ) -> Result> { + ) -> Result { let snapshot_id = self.generate_unique_snapshot_id(); FastAppendAction::new( self, @@ -167,7 +167,7 @@ impl<'a> Transaction<'a> { } /// Creates replace sort order action. - pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> { + pub fn replace_sort_order(self) -> ReplaceSortOrderAction { ReplaceSortOrderAction { tx: self, sort_fields: vec![], diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 012fb52bb..3d3fd4570 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -59,8 +59,8 @@ pub(crate) trait ManifestProcess: Send + Sync { fn process_manifests(&self, manifests: Vec) -> Vec; } -pub(crate) struct SnapshotProduceAction<'a> { - pub tx: Transaction<'a>, +pub(crate) struct SnapshotProduceAction { + pub tx: Transaction, snapshot_id: i64, key_metadata: Vec, commit_uuid: Uuid, @@ -72,9 +72,9 @@ pub(crate) struct SnapshotProduceAction<'a> { manifest_counter: RangeFrom, } -impl<'a> SnapshotProduceAction<'a> { +impl SnapshotProduceAction { pub(crate) fn new( - tx: Transaction<'a>, + tx: Transaction, snapshot_id: i64, key_metadata: Vec, commit_uuid: Uuid, @@ -310,7 +310,7 @@ impl<'a> SnapshotProduceAction<'a> { mut self, snapshot_produce_operation: OP, process: MP, - ) -> Result> { + ) -> Result { let new_manifests = self .manifest_file(&snapshot_produce_operation, &process) .await?; diff --git a/crates/iceberg/src/transaction/sort_order.rs b/crates/iceberg/src/transaction/sort_order.rs index f925e602a..22351de97 100644 --- a/crates/iceberg/src/transaction/sort_order.rs +++ b/crates/iceberg/src/transaction/sort_order.rs @@ -21,12 +21,12 @@ use crate::transaction::Transaction; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; /// Transaction action for replacing sort order. -pub struct ReplaceSortOrderAction<'a> { - pub tx: Transaction<'a>, +pub struct ReplaceSortOrderAction { + pub tx: Transaction, pub sort_fields: Vec, } -impl<'a> ReplaceSortOrderAction<'a> { +impl ReplaceSortOrderAction { /// Adds a field for sorting in ascending order. pub fn asc(self, name: &str, null_order: NullOrder) -> Result { self.add_sort_field(name, SortDirection::Ascending, null_order) @@ -38,7 +38,7 @@ impl<'a> ReplaceSortOrderAction<'a> { } /// Finished building the action and apply it to the transaction. - pub fn apply(mut self) -> Result> { + pub fn apply(mut self) -> Result { let unbound_sort_order = SortOrder::builder() .with_fields(self.sort_fields) .build_unbound()?;