From 35053b383f02f23051ee87331fef69c45058c625 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sat, 7 Jun 2025 10:53:34 -0700 Subject: [PATCH 1/6] add tx action --- Cargo.lock | 2 +- crates/iceberg/src/transaction/action.rs | 185 +++++++++++++++++++++++ crates/iceberg/src/transaction/mod.rs | 4 + rust-toolchain.toml | 2 +- 4 files changed, 191 insertions(+), 2 deletions(-) create mode 100644 crates/iceberg/src/transaction/action.rs diff --git a/Cargo.lock b/Cargo.lock index 9e9bd7c4b..76e07e015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3712,7 +3712,7 @@ dependencies = [ "iceberg-catalog-rest", "iceberg-datafusion", "iceberg_test_utils", - "ordered-float 4.6.0", + "ordered-float 2.10.1", "parquet", "tokio", "uuid", diff --git a/crates/iceberg/src/transaction/action.rs b/crates/iceberg/src/transaction/action.rs new file mode 100644 index 000000000..4b479c049 --- /dev/null +++ b/crates/iceberg/src/transaction/action.rs @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::mem::take; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::table::Table; +use crate::transaction::Transaction; +use crate::{Result, TableRequirement, TableUpdate}; + +/// A boxed, thread-safe reference to a `TransactionAction`. +pub type BoxedTransactionAction = Arc; + +/// A trait representing an atomic action that can be part of a transaction. +/// +/// Implementors of this trait define how a specific action is committed to a table. +/// Each action is responsible for generating the updates and requirements needed +/// to modify the table metadata. +#[async_trait] +pub trait TransactionAction: Sync + Send { + /// Commits this action against the provided table and returns the resulting updates. + /// NOTE: This function is intended for internal use only and should not be called directly by users. + /// + /// # Arguments + /// + /// * `table` - The current state of the table this action should apply to. + /// + /// # Returns + /// + /// An `ActionCommit` containing table updates and table requirements, + /// or an error if the commit fails. + async fn commit(self: Arc, table: &Table) -> Result; +} + +/// A helper trait for applying a `TransactionAction` to a `Transaction`. +/// +/// This is implemented for all `TransactionAction` types +/// to allow easy chaining of actions into a transaction context. +pub trait ApplyTransactionAction { + /// Adds this action to the given transaction. + /// + /// # Arguments + /// + /// * `tx` - The transaction to apply the action to. + /// + /// # Returns + /// + /// The modified transaction containing this action, or an error if the operation fails. + fn apply(self, tx: Transaction) -> Result; +} + +impl ApplyTransactionAction for T { + fn apply(self, mut tx: Transaction) -> Result + where Self: Sized { + tx.actions.push(Arc::new(self)); + Ok(tx) + } +} + +/// The result of committing a `TransactionAction`. +/// +/// This struct contains the updates to apply to the table's metadata +/// and any preconditions that must be satisfied before the update can be committed. +pub struct ActionCommit { + updates: Vec, + requirements: Vec, +} + +impl ActionCommit { + /// Creates a new `ActionCommit` from the given updates and requirements. + pub fn new(updates: Vec, requirements: Vec) -> Self { + Self { + updates, + requirements, + } + } + + /// Consumes and returns the list of table updates. + pub fn take_updates(&mut self) -> Vec { + take(&mut self.updates) + } + + /// Consumes and returns the list of table requirements. + pub fn take_requirements(&mut self) -> Vec { + take(&mut self.requirements) + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + use std::sync::Arc; + + use async_trait::async_trait; + use uuid::Uuid; + + use crate::table::Table; + use crate::transaction::Transaction; + use crate::transaction::action::{ActionCommit, ApplyTransactionAction, TransactionAction}; + use crate::transaction::tests::make_v2_table; + use crate::{Result, TableRequirement, TableUpdate}; + + struct TestAction; + + #[async_trait] + impl TransactionAction for TestAction { + async fn commit(self: Arc, _table: &Table) -> Result { + Ok(ActionCommit::new( + vec![TableUpdate::SetLocation { + location: String::from("s3://bucket/prefix/table/"), + }], + vec![TableRequirement::UuidMatch { + uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1")?, + }], + )) + } + } + + #[tokio::test] + async fn test_commit_transaction_action() { + let table = make_v2_table(); + let action = TestAction; + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + + assert_eq!(updates[0], TableUpdate::SetLocation { + location: String::from("s3://bucket/prefix/table/") + }); + assert_eq!(requirements[0], TableRequirement::UuidMatch { + uuid: Uuid::from_str("9c12d441-03fe-4693-9a96-a0705ddf69c1").unwrap() + }); + } + + #[test] + fn test_apply_transaction_action() { + let table = make_v2_table(); + let action = TestAction; + let tx = Transaction::new(&table); + + let updated_tx = action.apply(tx).unwrap(); + + // There should be one action in the transaction now + assert_eq!(updated_tx.actions.len(), 1); + } + + #[test] + fn test_action_commit() { + // Create dummy updates and requirements + let location = String::from("s3://bucket/prefix/table/"); + let uuid = Uuid::new_v4(); + let updates = vec![TableUpdate::SetLocation { location }]; + let requirements = vec![TableRequirement::UuidMatch { uuid }]; + + let mut action_commit = ActionCommit::new(updates.clone(), requirements.clone()); + + let taken_updates = action_commit.take_updates(); + let taken_requirements = action_commit.take_requirements(); + + // Check values are returned correctly + assert_eq!(taken_updates, updates); + assert_eq!(taken_requirements, requirements); + + assert!(action_commit.take_updates().is_empty()); + assert!(action_commit.take_requirements().is_empty()); + } +} diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index ba79d60bb..6151aed4f 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -17,6 +17,7 @@ //! This module contains transaction api. +mod action; mod append; mod snapshot; mod sort_order; @@ -32,6 +33,7 @@ use crate::TableUpdate::UpgradeFormatVersion; use crate::error::Result; use crate::spec::FormatVersion; use crate::table::Table; +use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; @@ -40,6 +42,7 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat pub struct Transaction<'a> { base_table: &'a Table, current_table: Table, + actions: Vec, updates: Vec, requirements: Vec, } @@ -50,6 +53,7 @@ impl<'a> Transaction<'a> { Self { base_table: table, current_table: table.clone(), + actions: vec![], updates: vec![], requirements: vec![], } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index a9a807133..6dab34d58 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -20,5 +20,5 @@ # # The channel is exactly same day for our MSRV. [toolchain] -channel = "nightly-2025-02-20" +channel = "nightly-2025-05-20" components = ["rustfmt", "clippy"] From 97336c05c7c15a1689dfb0e387ce6176ba9c60c0 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sat, 7 Jun 2025 19:06:19 -0700 Subject: [PATCH 2/6] revert toolchain changes --- Cargo.lock | 2 +- rust-toolchain.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76e07e015..9e9bd7c4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3712,7 +3712,7 @@ dependencies = [ "iceberg-catalog-rest", "iceberg-datafusion", "iceberg_test_utils", - "ordered-float 2.10.1", + "ordered-float 4.6.0", "parquet", "tokio", "uuid", diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 6dab34d58..a9a807133 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -20,5 +20,5 @@ # # The channel is exactly same day for our MSRV. [toolchain] -channel = "nightly-2025-05-20" +channel = "nightly-2025-02-20" components = ["rustfmt", "clippy"] From 9b29de8681135463c18606b47276b8a0b8baa167 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sat, 7 Jun 2025 19:12:10 -0700 Subject: [PATCH 3/6] clippy --- crates/iceberg/src/transaction/action.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/iceberg/src/transaction/action.rs b/crates/iceberg/src/transaction/action.rs index 4b479c049..89cf45a30 100644 --- a/crates/iceberg/src/transaction/action.rs +++ b/crates/iceberg/src/transaction/action.rs @@ -45,6 +45,7 @@ pub trait TransactionAction: Sync + Send { /// /// An `ActionCommit` containing table updates and table requirements, /// or an error if the commit fails. + #[allow(dead_code)] async fn commit(self: Arc, table: &Table) -> Result; } @@ -62,6 +63,7 @@ pub trait ApplyTransactionAction { /// # Returns /// /// The modified transaction containing this action, or an error if the operation fails. + #[allow(dead_code)] fn apply(self, tx: Transaction) -> Result; } @@ -84,6 +86,7 @@ pub struct ActionCommit { impl ActionCommit { /// Creates a new `ActionCommit` from the given updates and requirements. + #[allow(dead_code)] pub fn new(updates: Vec, requirements: Vec) -> Self { Self { updates, @@ -92,11 +95,13 @@ impl ActionCommit { } /// Consumes and returns the list of table updates. + #[allow(dead_code)] pub fn take_updates(&mut self) -> Vec { take(&mut self.updates) } /// Consumes and returns the list of table requirements. + #[allow(dead_code)] pub fn take_requirements(&mut self) -> Vec { take(&mut self.requirements) } From 2e1a604c5b584449644c833f83d46cf2304cc27c Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 9 Jun 2025 09:42:33 -0700 Subject: [PATCH 4/6] Update crates/iceberg/src/transaction/action.rs Co-authored-by: Renjie Liu --- crates/iceberg/src/transaction/action.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/action.rs b/crates/iceberg/src/transaction/action.rs index 89cf45a30..282b11fbb 100644 --- a/crates/iceberg/src/transaction/action.rs +++ b/crates/iceberg/src/transaction/action.rs @@ -33,7 +33,7 @@ pub type BoxedTransactionAction = Arc; /// Each action is responsible for generating the updates and requirements needed /// to modify the table metadata. #[async_trait] -pub trait TransactionAction: Sync + Send { +pub(crate) trait TransactionAction: Sync + Send { /// Commits this action against the provided table and returns the resulting updates. /// NOTE: This function is intended for internal use only and should not be called directly by users. /// From 0316250d6661dd6ce561a783818fda498bd8ab2a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 9 Jun 2025 09:46:02 -0700 Subject: [PATCH 5/6] allow dead code for the action module --- crates/iceberg/src/transaction/action.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/iceberg/src/transaction/action.rs b/crates/iceberg/src/transaction/action.rs index 282b11fbb..67279a610 100644 --- a/crates/iceberg/src/transaction/action.rs +++ b/crates/iceberg/src/transaction/action.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#[allow(dead_code)] use std::mem::take; use std::sync::Arc; @@ -45,7 +46,6 @@ pub(crate) trait TransactionAction: Sync + Send { /// /// An `ActionCommit` containing table updates and table requirements, /// or an error if the commit fails. - #[allow(dead_code)] async fn commit(self: Arc, table: &Table) -> Result; } @@ -63,7 +63,6 @@ pub trait ApplyTransactionAction { /// # Returns /// /// The modified transaction containing this action, or an error if the operation fails. - #[allow(dead_code)] fn apply(self, tx: Transaction) -> Result; } @@ -86,7 +85,6 @@ pub struct ActionCommit { impl ActionCommit { /// Creates a new `ActionCommit` from the given updates and requirements. - #[allow(dead_code)] pub fn new(updates: Vec, requirements: Vec) -> Self { Self { updates, @@ -95,13 +93,11 @@ impl ActionCommit { } /// Consumes and returns the list of table updates. - #[allow(dead_code)] pub fn take_updates(&mut self) -> Vec { take(&mut self.updates) } /// Consumes and returns the list of table requirements. - #[allow(dead_code)] pub fn take_requirements(&mut self) -> Vec { take(&mut self.requirements) } From 182a863b4cd50afb6d1319203cdc2e5d10187e8e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 9 Jun 2025 10:05:35 -0700 Subject: [PATCH 6/6] minor --- crates/iceberg/src/transaction/action.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/action.rs b/crates/iceberg/src/transaction/action.rs index 67279a610..51f54c4ce 100644 --- a/crates/iceberg/src/transaction/action.rs +++ b/crates/iceberg/src/transaction/action.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#[allow(dead_code)] +#![allow(dead_code)] use std::mem::take; use std::sync::Arc;