From 6d7316dec8ca0c63c84de20697d1ca0e248448dc Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sat, 7 Jun 2025 19:21:56 -0700 Subject: [PATCH 1/4] make tx own base_table --- crates/iceberg/src/transaction/append.rs | 26 +++++++++--------- crates/iceberg/src/transaction/mod.rs | 28 ++++++++++---------- crates/iceberg/src/transaction/snapshot.rs | 10 +++---- crates/iceberg/src/transaction/sort_order.rs | 10 +++---- 4 files changed, 37 insertions(+), 37 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 017107f92..c1b212ee1 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -29,15 +29,15 @@ use crate::writer::file_writer::ParquetWriter; use crate::{Error, ErrorKind}; /// FastAppendAction is a transaction action for fast append data files to the table. -pub struct FastAppendAction<'a> { - snapshot_produce_action: SnapshotProduceAction<'a>, +pub struct FastAppendAction { + snapshot_produce_action: SnapshotProduceAction, check_duplicate: bool, } -impl<'a> FastAppendAction<'a> { +impl FastAppendAction { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - tx: Transaction<'a>, + tx: Transaction, snapshot_id: i64, commit_uuid: Uuid, key_metadata: Vec, @@ -87,7 +87,7 @@ impl<'a> FastAppendAction<'a> { /// Specifically, schema compatibility checks and support for adding to partitioned tables /// have not yet been implemented. #[allow(dead_code)] - async fn add_parquet_files(mut self, file_path: Vec) -> Result> { + async fn add_parquet_files(mut self, file_path: Vec) -> Result { if !self .snapshot_produce_action .tx @@ -117,7 +117,7 @@ impl<'a> FastAppendAction<'a> { } /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result> { + pub async fn apply(self) -> Result { // Checks duplicate files if self.check_duplicate { let new_files: HashSet<&str> = self @@ -170,14 +170,14 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn delete_entries( &self, - _snapshot_produce: &SnapshotProduceAction<'_>, + _snapshot_produce: &SnapshotProduceAction, ) -> Result> { Ok(vec![]) } async fn existing_manifest( &self, - snapshot_produce: &SnapshotProduceAction<'_>, + snapshot_produce: &SnapshotProduceAction, ) -> Result> { let Some(snapshot) = snapshot_produce .tx @@ -219,7 +219,7 @@ mod tests { #[tokio::test] async fn test_empty_data_append_action() { let table = make_v2_minimal_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let mut action = tx.fast_append(None, vec![]).unwrap(); action.add_data_files(vec![]).unwrap(); assert!(action.apply().await.is_err()); @@ -228,7 +228,7 @@ mod tests { #[tokio::test] async fn test_set_snapshot_properties() { let table = make_v2_minimal_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table.clone()); let mut action = tx.fast_append(None, vec![]).unwrap(); let mut snapshot_properties = HashMap::new(); @@ -266,7 +266,7 @@ mod tests { #[tokio::test] async fn test_fast_append_action() { let table = make_v2_minimal_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table.clone()); let mut action = tx.fast_append(None, vec![]).unwrap(); // check add data file with incompatible partition value @@ -352,7 +352,7 @@ mod tests { async fn test_add_duplicated_parquet_files_to_unpartitioned_table() { let mut fixture = TableTestFixture::new_unpartitioned(); fixture.setup_unpartitioned_manifest_files().await; - let tx = crate::transaction::Transaction::new(&fixture.table); + let tx = crate::transaction::Transaction::new(fixture.table.clone()); let file_paths = vec![ format!("{}/1.parquet", &fixture.table_location), @@ -372,7 +372,7 @@ mod tests { let file_paths = vec![format!("{}/2.parquet", &fixture.table_location)]; - let tx = crate::transaction::Transaction::new(&fixture.table); + let tx = crate::transaction::Transaction::new(fixture.table.clone()); let fast_append_action = tx.fast_append(None, vec![]).unwrap(); // Attempt to add Parquet file which was deleted from table. diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index ba79d60bb..d28ee5ddb 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -37,18 +37,18 @@ use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; /// Table transaction. -pub struct Transaction<'a> { - base_table: &'a Table, +pub struct Transaction { + base_table: Table, current_table: Table, updates: Vec, requirements: Vec, } -impl<'a> Transaction<'a> { +impl Transaction { /// Creates a new transaction. - pub fn new(table: &'a Table) -> Self { + pub fn new(table: Table) -> Self { Self { - base_table: table, + base_table: table.clone(), current_table: table.clone(), updates: vec![], requirements: vec![], @@ -155,7 +155,7 @@ impl<'a> Transaction<'a> { self, commit_uuid: Option, key_metadata: Vec, - ) -> Result> { + ) -> Result { let snapshot_id = self.generate_unique_snapshot_id(); FastAppendAction::new( self, @@ -167,7 +167,7 @@ impl<'a> Transaction<'a> { } /// Creates replace sort order action. - pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> { + pub fn replace_sort_order(self) -> ReplaceSortOrderAction { ReplaceSortOrderAction { tx: self, sort_fields: vec![], @@ -273,7 +273,7 @@ mod tests { #[test] fn test_upgrade_table_version_v1_to_v2() { let table = make_v1_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); assert_eq!( @@ -287,7 +287,7 @@ mod tests { #[test] fn test_upgrade_table_version_v2_to_v2() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); assert!( @@ -303,7 +303,7 @@ mod tests { #[test] fn test_downgrade_table_version() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx.upgrade_table_version(FormatVersion::V1); assert!(tx.is_err(), "Downgrade table version should fail!"); @@ -312,7 +312,7 @@ mod tests { #[test] fn test_set_table_property() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx .set_properties(HashMap::from([("a".to_string(), "b".to_string())])) .unwrap(); @@ -328,7 +328,7 @@ mod tests { #[test] fn test_remove_property() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx .remove_properties(vec!["a".to_string(), "b".to_string()]) .unwrap(); @@ -344,7 +344,7 @@ mod tests { #[test] fn test_set_location() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx .set_location(String::from("s3://bucket/prefix/new_table")) .unwrap(); @@ -360,7 +360,7 @@ mod tests { #[tokio::test] async fn test_transaction_apply_upgrade() { let table = make_v1_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); // Upgrade v1 to v1, do nothing. let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap(); // Upgrade v1 to v2, success. diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 012fb52bb..3d3fd4570 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -59,8 +59,8 @@ pub(crate) trait ManifestProcess: Send + Sync { fn process_manifests(&self, manifests: Vec) -> Vec; } -pub(crate) struct SnapshotProduceAction<'a> { - pub tx: Transaction<'a>, +pub(crate) struct SnapshotProduceAction { + pub tx: Transaction, snapshot_id: i64, key_metadata: Vec, commit_uuid: Uuid, @@ -72,9 +72,9 @@ pub(crate) struct SnapshotProduceAction<'a> { manifest_counter: RangeFrom, } -impl<'a> SnapshotProduceAction<'a> { +impl SnapshotProduceAction { pub(crate) fn new( - tx: Transaction<'a>, + tx: Transaction, snapshot_id: i64, key_metadata: Vec, commit_uuid: Uuid, @@ -310,7 +310,7 @@ impl<'a> SnapshotProduceAction<'a> { mut self, snapshot_produce_operation: OP, process: MP, - ) -> Result> { + ) -> Result { let new_manifests = self .manifest_file(&snapshot_produce_operation, &process) .await?; diff --git a/crates/iceberg/src/transaction/sort_order.rs b/crates/iceberg/src/transaction/sort_order.rs index f925e602a..7564445e4 100644 --- a/crates/iceberg/src/transaction/sort_order.rs +++ b/crates/iceberg/src/transaction/sort_order.rs @@ -21,12 +21,12 @@ use crate::transaction::Transaction; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; /// Transaction action for replacing sort order. -pub struct ReplaceSortOrderAction<'a> { - pub tx: Transaction<'a>, +pub struct ReplaceSortOrderAction { + pub tx: Transaction, pub sort_fields: Vec, } -impl<'a> ReplaceSortOrderAction<'a> { +impl ReplaceSortOrderAction { /// Adds a field for sorting in ascending order. pub fn asc(self, name: &str, null_order: NullOrder) -> Result { self.add_sort_field(name, SortDirection::Ascending, null_order) @@ -38,7 +38,7 @@ impl<'a> ReplaceSortOrderAction<'a> { } /// Finished building the action and apply it to the transaction. - pub fn apply(mut self) -> Result> { + pub fn apply(mut self) -> Result { let unbound_sort_order = SortOrder::builder() .with_fields(self.sort_fields) .build_unbound()?; @@ -114,7 +114,7 @@ mod tests { #[test] fn test_replace_sort_order() { let table = make_v2_table(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let tx = tx.replace_sort_order().apply().unwrap(); assert_eq!( From 93df578cb15e642cbf371a84f19ea661963007e5 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sat, 7 Jun 2025 19:36:40 -0700 Subject: [PATCH 2/4] fix build --- crates/catalog/rest/src/catalog.rs | 4 ++-- crates/catalog/rest/tests/rest_catalog_test.rs | 2 +- .../tests/shared_tests/append_data_file_test.rs | 2 +- .../tests/shared_tests/append_partition_data_file_test.rs | 6 +++--- .../tests/shared_tests/conflict_commit_test.rs | 4 ++-- .../integration_tests/tests/shared_tests/scan_all_type.rs | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 851819069..868752be7 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -2125,7 +2125,7 @@ mod tests { .unwrap() }; - let table = Transaction::new(&table1) + let table = Transaction::new(table1) .upgrade_table_version(FormatVersion::V2) .unwrap() .commit(&catalog) @@ -2250,7 +2250,7 @@ mod tests { .unwrap() }; - let table_result = Transaction::new(&table1) + let table_result = Transaction::new(table1) .upgrade_table_version(FormatVersion::V2) .unwrap() .commit(&catalog) diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index ab7ea3d62..589bdec61 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -347,7 +347,7 @@ async fn test_update_table() { ); // Update table by committing transaction - let table2 = Transaction::new(&table) + let table2 = Transaction::new(table) .set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())])) .unwrap() .commit(&catalog) diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index 38a029510..e234d30b9 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -111,7 +111,7 @@ async fn test_append_data_file() { assert_eq!(field_ids, vec![1, 2, 3]); // commit result - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index c5c029a45..1c735e42d 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -119,7 +119,7 @@ async fn test_append_partition_data_file() { let data_file_valid = data_file_writer_valid.close().await.unwrap(); // commit result - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action .add_data_files(data_file_valid.clone()) @@ -179,7 +179,7 @@ async fn test_schema_incompatible_partition_type( data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); if append_action .add_data_files(data_file_invalid.clone()) @@ -219,7 +219,7 @@ async fn test_schema_incompatible_partition_fields( data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); if append_action .add_data_files(data_file_invalid.clone()) diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index d277e12e5..05072edf2 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -89,12 +89,12 @@ async fn test_append_data_file_conflict() { let data_file = data_file_writer.close().await.unwrap(); // start two transaction and commit one of them - let tx1 = Transaction::new(&table); + let tx1 = Transaction::new(table.clone()); let mut append_action = tx1.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx1 = append_action.apply().await.unwrap(); - let tx2 = Transaction::new(&table); + let tx2 = Transaction::new(table.clone()); let mut append_action = tx2.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx2 = append_action.apply().await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 5ff982720..d10fd40c9 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -308,7 +308,7 @@ async fn test_scan_all_type() { let data_file = data_file_writer.close().await.unwrap(); // commit result - let tx = Transaction::new(&table); + let tx = Transaction::new(table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); From c098628eec1506af9f1c971524abac3c7a6041d2 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 9 Jun 2025 10:11:33 -0700 Subject: [PATCH 3/4] Update crates/iceberg/src/transaction/mod.rs Co-authored-by: Renjie Liu --- crates/iceberg/src/transaction/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index d28ee5ddb..459795254 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -46,7 +46,7 @@ pub struct Transaction { impl Transaction { /// Creates a new transaction. - pub fn new(table: Table) -> Self { + pub fn new(table: &Table) -> Self { Self { base_table: table.clone(), current_table: table.clone(), From ac82580aefc97f8b1db285ce28445c6cf995e54d Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 9 Jun 2025 10:20:16 -0700 Subject: [PATCH 4/4] change the api back to pass table ref --- crates/catalog/rest/src/catalog.rs | 4 ++-- crates/catalog/rest/tests/rest_catalog_test.rs | 2 +- crates/iceberg/src/transaction/append.rs | 10 +++++----- crates/iceberg/src/transaction/mod.rs | 14 +++++++------- crates/iceberg/src/transaction/sort_order.rs | 2 +- .../tests/shared_tests/append_data_file_test.rs | 2 +- .../append_partition_data_file_test.rs | 6 +++--- .../tests/shared_tests/conflict_commit_test.rs | 4 ++-- .../tests/shared_tests/scan_all_type.rs | 2 +- 9 files changed, 23 insertions(+), 23 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 868752be7..851819069 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -2125,7 +2125,7 @@ mod tests { .unwrap() }; - let table = Transaction::new(table1) + let table = Transaction::new(&table1) .upgrade_table_version(FormatVersion::V2) .unwrap() .commit(&catalog) @@ -2250,7 +2250,7 @@ mod tests { .unwrap() }; - let table_result = Transaction::new(table1) + let table_result = Transaction::new(&table1) .upgrade_table_version(FormatVersion::V2) .unwrap() .commit(&catalog) diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index 589bdec61..ab7ea3d62 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -347,7 +347,7 @@ async fn test_update_table() { ); // Update table by committing transaction - let table2 = Transaction::new(table) + let table2 = Transaction::new(&table) .set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())])) .unwrap() .commit(&catalog) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index c1b212ee1..405422558 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -219,7 +219,7 @@ mod tests { #[tokio::test] async fn test_empty_data_append_action() { let table = make_v2_minimal_table(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let mut action = tx.fast_append(None, vec![]).unwrap(); action.add_data_files(vec![]).unwrap(); assert!(action.apply().await.is_err()); @@ -228,7 +228,7 @@ mod tests { #[tokio::test] async fn test_set_snapshot_properties() { let table = make_v2_minimal_table(); - let tx = Transaction::new(table.clone()); + let tx = Transaction::new(&table); let mut action = tx.fast_append(None, vec![]).unwrap(); let mut snapshot_properties = HashMap::new(); @@ -266,7 +266,7 @@ mod tests { #[tokio::test] async fn test_fast_append_action() { let table = make_v2_minimal_table(); - let tx = Transaction::new(table.clone()); + let tx = Transaction::new(&table); let mut action = tx.fast_append(None, vec![]).unwrap(); // check add data file with incompatible partition value @@ -352,7 +352,7 @@ mod tests { async fn test_add_duplicated_parquet_files_to_unpartitioned_table() { let mut fixture = TableTestFixture::new_unpartitioned(); fixture.setup_unpartitioned_manifest_files().await; - let tx = crate::transaction::Transaction::new(fixture.table.clone()); + let tx = crate::transaction::Transaction::new(&fixture.table); let file_paths = vec![ format!("{}/1.parquet", &fixture.table_location), @@ -372,7 +372,7 @@ mod tests { let file_paths = vec![format!("{}/2.parquet", &fixture.table_location)]; - let tx = crate::transaction::Transaction::new(fixture.table.clone()); + let tx = crate::transaction::Transaction::new(&fixture.table); let fast_append_action = tx.fast_append(None, vec![]).unwrap(); // Attempt to add Parquet file which was deleted from table. diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 459795254..e7802a138 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -273,7 +273,7 @@ mod tests { #[test] fn test_upgrade_table_version_v1_to_v2() { let table = make_v1_table(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); assert_eq!( @@ -287,7 +287,7 @@ mod tests { #[test] fn test_upgrade_table_version_v2_to_v2() { let table = make_v2_table(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); assert!( @@ -303,7 +303,7 @@ mod tests { #[test] fn test_downgrade_table_version() { let table = make_v2_table(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let tx = tx.upgrade_table_version(FormatVersion::V1); assert!(tx.is_err(), "Downgrade table version should fail!"); @@ -312,7 +312,7 @@ mod tests { #[test] fn test_set_table_property() { let table = make_v2_table(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let tx = tx .set_properties(HashMap::from([("a".to_string(), "b".to_string())])) .unwrap(); @@ -328,7 +328,7 @@ mod tests { #[test] fn test_remove_property() { let table = make_v2_table(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let tx = tx .remove_properties(vec!["a".to_string(), "b".to_string()]) .unwrap(); @@ -344,7 +344,7 @@ mod tests { #[test] fn test_set_location() { let table = make_v2_table(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let tx = tx .set_location(String::from("s3://bucket/prefix/new_table")) .unwrap(); @@ -360,7 +360,7 @@ mod tests { #[tokio::test] async fn test_transaction_apply_upgrade() { let table = make_v1_table(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); // Upgrade v1 to v1, do nothing. let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap(); // Upgrade v1 to v2, success. diff --git a/crates/iceberg/src/transaction/sort_order.rs b/crates/iceberg/src/transaction/sort_order.rs index 7564445e4..22351de97 100644 --- a/crates/iceberg/src/transaction/sort_order.rs +++ b/crates/iceberg/src/transaction/sort_order.rs @@ -114,7 +114,7 @@ mod tests { #[test] fn test_replace_sort_order() { let table = make_v2_table(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let tx = tx.replace_sort_order().apply().unwrap(); assert_eq!( diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index e234d30b9..38a029510 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -111,7 +111,7 @@ async fn test_append_data_file() { assert_eq!(field_ids, vec![1, 2, 3]); // commit result - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 1c735e42d..c5c029a45 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -119,7 +119,7 @@ async fn test_append_partition_data_file() { let data_file_valid = data_file_writer_valid.close().await.unwrap(); // commit result - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action .add_data_files(data_file_valid.clone()) @@ -179,7 +179,7 @@ async fn test_schema_incompatible_partition_type( data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); if append_action .add_data_files(data_file_invalid.clone()) @@ -219,7 +219,7 @@ async fn test_schema_incompatible_partition_fields( data_file_writer_invalid.write(batch.clone()).await.unwrap(); let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); if append_action .add_data_files(data_file_invalid.clone()) diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index 05072edf2..d277e12e5 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -89,12 +89,12 @@ async fn test_append_data_file_conflict() { let data_file = data_file_writer.close().await.unwrap(); // start two transaction and commit one of them - let tx1 = Transaction::new(table.clone()); + let tx1 = Transaction::new(&table); let mut append_action = tx1.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx1 = append_action.apply().await.unwrap(); - let tx2 = Transaction::new(table.clone()); + let tx2 = Transaction::new(&table); let mut append_action = tx2.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx2 = append_action.apply().await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index d10fd40c9..5ff982720 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -308,7 +308,7 @@ async fn test_scan_all_type() { let data_file = data_file_writer.close().await.unwrap(); // commit result - let tx = Transaction::new(table); + let tx = Transaction::new(&table); let mut append_action = tx.fast_append(None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap();