From c0339ac2e2fc1e024b5fc19438d7286857a87c7d Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 1 Jul 2025 11:40:52 -0700 Subject: [PATCH 1/9] Add retry logic to transaction --- Cargo.lock | 34 +++++++++++++--- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/error.rs | 5 +++ crates/iceberg/src/transaction/mod.rs | 56 ++++++++++++++++++++++++++- 5 files changed, 90 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a356bb387f..9e4a923e9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,6 +593,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "async-task" version = "4.7.1" @@ -1032,9 +1054,9 @@ dependencies = [ [[package]] name = "backon" -version = "1.3.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7" +checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7" dependencies = [ "fastrand", "gloo-timers", @@ -1534,7 +1556,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -3500,6 +3522,7 @@ dependencies = [ "as-any", "async-std", "async-trait", + "backon", "base64 0.22.1", "bimap", "bytes", @@ -3670,6 +3693,7 @@ name = "iceberg-datafusion" version = "0.5.1" dependencies = [ "anyhow", + "async-stream", "async-trait", "datafusion", "expect-test", @@ -4202,7 +4226,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -7718,7 +7742,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fd83fd55c7..4502231bae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ async-std = "1.12" async-trait = "0.1.88" aws-config = "1.6.1" aws-sdk-glue = "1.39" +backon = "1.5.1" base64 = "0.22.1" bimap = "0.6" bytes = "1.10" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 52f5b4d8b5..65d593c80b 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -57,6 +57,7 @@ arrow-string = { workspace = true } as-any = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } async-trait = { workspace = true } +backon = { workspace = true } base64 = { workspace = true } bimap = { workspace = true } bytes = { workspace = true } diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 9f299fb6a9..067329d524 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -320,6 +320,11 @@ impl Error { self.kind } + /// Return error's retryable status + pub fn retryable(&self) -> bool { + self.retryable + } + /// Return error's message. #[inline] pub fn message(&self) -> &str { diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index f96a15b73c..70a540dfa0 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -51,6 +51,9 @@ /// The `ApplyTransactionAction` trait provides an `apply` method /// that allows users to apply a transaction action to a `Transaction`. mod action; + +use std::collections::HashMap; + pub use action::*; mod append; mod snapshot; @@ -61,6 +64,9 @@ mod update_statistics; mod upgrade_format_version; use std::sync::Arc; +use std::time::Duration; + +use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext}; use crate::error::Result; use crate::table::Table; @@ -152,13 +158,59 @@ impl Transaction { } /// Commit transaction. - pub async fn commit(mut self, catalog: &dyn Catalog) -> Result { + pub async fn commit(self, catalog: &dyn Catalog) -> Result
{ if self.actions.is_empty() { // nothing to commit return Ok(self.table); } - self.do_commit(catalog).await + let backoff = Self::build_backoff(self.table.metadata().properties()); + let tx = self; + + (|mut tx: Transaction| async { + let result = tx.do_commit(catalog).await; + (tx, result) + }) + .retry(backoff) + .context(tx) + .sleep(tokio::time::sleep) + .when(|e| e.retryable()) + .await + .1 + } + + fn build_backoff(props: &HashMap) -> ExponentialBackoff { + ExponentialBuilder::new() + .with_min_delay(Duration::from_millis( + props + .get("commit.retry.min-wait-ms") + .map(|s| s.parse()) + .unwrap_or_else(|| Ok(100)) + .expect("Invalid value for commit.retry.min-wait-ms"), + )) + .with_max_delay(Duration::from_millis( + props + .get("commit.retry.max-wait-ms") + .map(|s| s.parse()) + .unwrap_or_else(|| Ok(60 * 1000)) + .expect("Invalid value for commit.retry.max-wait-ms"), + )) + .with_total_delay(Some(Duration::from_millis( + props + .get("commit.retry.total-timeout-ms") + .map(|s| s.parse()) + .unwrap_or_else(|| Ok(30 * 60 * 1000)) + .expect("Invalid value for commit.retry.total-timeout-ms"), + ))) + .with_max_times( + props + .get("commit.retry.num-retries") + .map(|s| s.parse()) + .unwrap_or_else(|| Ok(4)) + .expect("Invalid value for commit.retry.num-retries"), + ) + .with_factor(2.0) + .build() } async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result
{ From 894fb6f038100e3e629d189745277ec053171e01 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 1 Jul 2025 11:46:24 -0700 Subject: [PATCH 2/9] minor --- Cargo.lock | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e4a923e9c..b389756c48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,28 +593,6 @@ dependencies = [ "wasm-bindgen-futures", ] -[[package]] -name = "async-stream" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "async-task" version = "4.7.1" @@ -3693,7 +3671,6 @@ name = "iceberg-datafusion" version = "0.5.1" dependencies = [ "anyhow", - "async-stream", "async-trait", "datafusion", "expect-test", From 5a397649245dd55d5e314a178e5de7046b11e8f3 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 1 Jul 2025 11:56:26 -0700 Subject: [PATCH 3/9] set sleep before context --- crates/iceberg/src/transaction/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 70a540dfa0..b99c3b46ba 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -172,8 +172,8 @@ impl Transaction { (tx, result) }) .retry(backoff) - .context(tx) .sleep(tokio::time::sleep) + .context(tx) .when(|e| e.retryable()) .await .1 From 01bc5a9f432eaeafadce300847eb45c710b3a328 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 1 Jul 2025 20:58:38 -0700 Subject: [PATCH 4/9] Add props in table_metadata --- crates/iceberg/src/spec/table_metadata.rs | 12 ++++++++++++ crates/iceberg/src/transaction/mod.rs | 17 +++++++++-------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index c3156e56a4..cb2ccd55df 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -82,6 +82,18 @@ pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partitio /// Default value for the max number of partitions to keep summary stats for. pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0; +pub const COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; +pub const COMMIT_NUM_RETRIES_DEFAULT: usize = 4; + +pub const COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; +pub const COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100; + +pub const COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms"; +pub const COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute + +pub const COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms"; +pub const COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes + /// Reserved Iceberg table properties list. /// /// Reserved table properties are only used to control behaviors when creating or updating a diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index b99c3b46ba..5029c9399c 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -78,6 +78,7 @@ use crate::transaction::update_properties::UpdatePropertiesAction; use crate::transaction::update_statistics::UpdateStatisticsAction; use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; use crate::{Catalog, TableCommit, TableRequirement, TableUpdate}; +use crate::spec::{COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT}; /// Table transaction. #[derive(Clone)] @@ -183,30 +184,30 @@ impl Transaction { ExponentialBuilder::new() .with_min_delay(Duration::from_millis( props - .get("commit.retry.min-wait-ms") + .get(COMMIT_MIN_RETRY_WAIT_MS) .map(|s| s.parse()) - .unwrap_or_else(|| Ok(100)) + .unwrap_or_else(|| Ok(COMMIT_MIN_RETRY_WAIT_MS_DEFAULT)) .expect("Invalid value for commit.retry.min-wait-ms"), )) .with_max_delay(Duration::from_millis( props - .get("commit.retry.max-wait-ms") + .get(COMMIT_MAX_RETRY_WAIT_MS) .map(|s| s.parse()) - .unwrap_or_else(|| Ok(60 * 1000)) + .unwrap_or_else(|| Ok(COMMIT_MAX_RETRY_WAIT_MS_DEFAULT)) .expect("Invalid value for commit.retry.max-wait-ms"), )) .with_total_delay(Some(Duration::from_millis( props - .get("commit.retry.total-timeout-ms") + .get(COMMIT_TOTAL_RETRY_TIME_MS) .map(|s| s.parse()) - .unwrap_or_else(|| Ok(30 * 60 * 1000)) + .unwrap_or_else(|| Ok(COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) .expect("Invalid value for commit.retry.total-timeout-ms"), ))) .with_max_times( props - .get("commit.retry.num-retries") + .get(COMMIT_NUM_RETRIES) .map(|s| s.parse()) - .unwrap_or_else(|| Ok(4)) + .unwrap_or_else(|| Ok(COMMIT_NUM_RETRIES_DEFAULT)) .expect("Invalid value for commit.retry.num-retries"), ) .with_factor(2.0) From bac4a946d0cd7d86bd574318e05fa754c8e73ff1 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 1 Jul 2025 21:20:20 -0700 Subject: [PATCH 5/9] big beautiful fmt --- crates/iceberg/src/transaction/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 5029c9399c..07b3f83b65 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -69,6 +69,11 @@ use std::time::Duration; use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext}; use crate::error::Result; +use crate::spec::{ + COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, COMMIT_MIN_RETRY_WAIT_MS, + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT, + COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, +}; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; @@ -78,7 +83,6 @@ use crate::transaction::update_properties::UpdatePropertiesAction; use crate::transaction::update_statistics::UpdateStatisticsAction; use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; use crate::{Catalog, TableCommit, TableRequirement, TableUpdate}; -use crate::spec::{COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT}; /// Table transaction. #[derive(Clone)] From f95c1268502e4e281cdc697af22d832a854818f8 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 1 Jul 2025 21:26:47 -0700 Subject: [PATCH 6/9] doc is good --- crates/iceberg/src/spec/table_metadata.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index cb2ccd55df..b35fc3792d 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -82,16 +82,24 @@ pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partitio /// Default value for the max number of partitions to keep summary stats for. pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0; +/// Property key for number of commit retries. pub const COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; +/// Default value for number of commit retries. pub const COMMIT_NUM_RETRIES_DEFAULT: usize = 4; +/// Property key for minimum wait time (ms) between retries. pub const COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; +/// Default value for minimum wait time (ms) between retries. pub const COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100; +/// Property key for maximum wait time (ms) between retries. pub const COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms"; +/// Default value for maximum wait time (ms) between retries. pub const COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute +/// Property key for total maximum retry time (ms). pub const COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms"; +/// Default value for total maximum retry time (ms). pub const COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes /// Reserved Iceberg table properties list. From d23174698f0d60df59006295203c2c27d79948a0 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 1 Jul 2025 21:40:06 -0700 Subject: [PATCH 7/9] lets not mess up others code --- crates/iceberg/src/spec/table_metadata.rs | 30 +++++++++++------------ 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index b35fc3792d..c1c692c305 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -87,21 +87,6 @@ pub const COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; /// Default value for number of commit retries. pub const COMMIT_NUM_RETRIES_DEFAULT: usize = 4; -/// Property key for minimum wait time (ms) between retries. -pub const COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; -/// Default value for minimum wait time (ms) between retries. -pub const COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100; - -/// Property key for maximum wait time (ms) between retries. -pub const COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms"; -/// Default value for maximum wait time (ms) between retries. -pub const COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute - -/// Property key for total maximum retry time (ms). -pub const COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms"; -/// Default value for total maximum retry time (ms). -pub const COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes - /// Reserved Iceberg table properties list. /// /// Reserved table properties are only used to control behaviors when creating or updating a @@ -118,6 +103,21 @@ pub const RESERVED_PROPERTIES: [&str; 9] = [ PROPERTY_DEFAULT_SORT_ORDER, ]; +/// Property key for minimum wait time (ms) between retries. +pub const COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; +/// Default value for minimum wait time (ms) between retries. +pub const COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100; + +/// Property key for maximum wait time (ms) between retries. +pub const COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms"; +/// Default value for maximum wait time (ms) between retries. +pub const COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute + +/// Property key for total maximum retry time (ms). +pub const COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms"; +/// Default value for total maximum retry time (ms). +pub const COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes + /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; From 0d81b3811f228833fbeea02c9c7f5cf256da5249 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 1 Jul 2025 21:40:53 -0700 Subject: [PATCH 8/9] very minor --- crates/iceberg/src/spec/table_metadata.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index c1c692c305..3789e7ed59 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -82,11 +82,6 @@ pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partitio /// Default value for the max number of partitions to keep summary stats for. pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0; -/// Property key for number of commit retries. -pub const COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; -/// Default value for number of commit retries. -pub const COMMIT_NUM_RETRIES_DEFAULT: usize = 4; - /// Reserved Iceberg table properties list. /// /// Reserved table properties are only used to control behaviors when creating or updating a @@ -103,6 +98,11 @@ pub const RESERVED_PROPERTIES: [&str; 9] = [ PROPERTY_DEFAULT_SORT_ORDER, ]; +/// Property key for number of commit retries. +pub const COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; +/// Default value for number of commit retries. +pub const COMMIT_NUM_RETRIES_DEFAULT: usize = 4; + /// Property key for minimum wait time (ms) between retries. pub const COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; /// Default value for minimum wait time (ms) between retries. From d4e0ebfba432650eaaf5d23d92778341a3ccca0a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 2 Jul 2025 21:35:39 -0700 Subject: [PATCH 9/9] mockall for the tests --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/catalog/mod.rs | 2 + crates/iceberg/src/transaction/mod.rs | 176 +++++++++++++++++++++++++- 4 files changed, 179 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4502231bae..e322c97d7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ itertools = "0.13" linkedbytes = "0.1.8" metainfo = "0.7.14" mimalloc = "0.1.46" +mockall = "0.13.1" mockito = "1" motore-macros = "0.4.3" murmur3 = "0.5.2" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 65d593c80b..657d463f18 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -67,6 +67,7 @@ expect-test = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +mockall = { workspace = true } moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } num-bigint = { workspace = true } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index ebb9a66ce8..0c6686ca2d 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use _serde::deserialize_snapshot; use async_trait::async_trait; +use mockall::automock; use serde_derive::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use uuid::Uuid; @@ -40,6 +41,7 @@ use crate::{Error, ErrorKind, Result}; /// The catalog API for Iceberg Rust. #[async_trait] +#[automock] pub trait Catalog: Debug + Sync + Send { /// List namespaces inside the catalog. async fn list_namespaces(&self, parent: Option<&NamespaceIdent>) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 07b3f83b65..47e8f3d55e 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -255,13 +255,17 @@ impl Transaction { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::fs::File; use std::io::BufReader; + use std::sync::Arc; - use crate::TableIdent; + use crate::catalog::MockCatalog; use crate::io::FileIOBuilder; use crate::spec::TableMetadata; use crate::table::Table; + use crate::transaction::{ApplyTransactionAction, Transaction}; + use crate::{Error, ErrorKind, TableIdent}; pub fn make_v1_table() -> Table { let file = File::open(format!( @@ -319,4 +323,174 @@ mod tests { .build() .unwrap() } + + /// Helper function to create a test table with retry properties + fn setup_test_table(num_retries: &str) -> Table { + let table = make_v2_table(); + + // Set retry properties + let mut props = HashMap::new(); + props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string()); + props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string()); + props.insert( + "commit.retry.total-timeout-ms".to_string(), + "1000".to_string(), + ); + props.insert( + "commit.retry.num-retries".to_string(), + num_retries.to_string(), + ); + + // Update table properties + let metadata = table + .metadata() + .clone() + .into_builder(None) + .set_properties(props) + .unwrap() + .build() + .unwrap() + .metadata; + + table.with_metadata(Arc::new(metadata)) + } + + /// Helper function to create a transaction with a simple update action + fn create_test_transaction(table: &Table) -> Transaction { + let tx = Transaction::new(table); + tx.update_table_properties() + .set("test.key".to_string(), "test.value".to_string()) + .apply(tx) + .unwrap() + } + + /// Helper function to set up a mock catalog with retryable errors + fn setup_mock_catalog_with_retryable_errors( + success_after_attempts: Option, + expected_calls: usize, + ) -> MockCatalog { + let mut mock_catalog = MockCatalog::new(); + + mock_catalog + .expect_load_table() + .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) })); + + mock_catalog + .expect_update_table() + .times(expected_calls) + .returning_st(move |_| { + if let Some(attempts) = success_after_attempts { + static mut ATTEMPTS: u32 = 0; + unsafe { + ATTEMPTS += 1; + if ATTEMPTS <= attempts { + Box::pin(async move { + Err(Error::new( + ErrorKind::CatalogCommitConflicts, + "Commit conflict", + ) + .with_retryable(true)) + }) + } else { + Box::pin(async move { Ok(make_v2_table()) }) + } + } + } else { + // Always fail with retryable error + Box::pin(async move { + Err( + Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict") + .with_retryable(true), + ) + }) + } + }); + + mock_catalog + } + + /// Helper function to set up a mock catalog with non-retryable error + fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog { + let mut mock_catalog = MockCatalog::new(); + + mock_catalog + .expect_load_table() + .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) })); + + mock_catalog + .expect_update_table() + .times(1) // Should only be called once since error is not retryable + .returning_st(move |_| { + Box::pin(async move { + Err(Error::new(ErrorKind::Unexpected, "Non-retryable error") + .with_retryable(false)) + }) + }); + + mock_catalog + } + + #[tokio::test] + async fn test_commit_retryable_error() { + // Create a test table with retry properties + let table = setup_test_table("3"); + + // Create a transaction with a simple update action + let tx = create_test_transaction(&table); + + // Create a mock catalog that fails twice then succeeds + let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3); + + // Commit the transaction + let result = tx.commit(&mock_catalog).await; + + // Verify the result + assert!(result.is_ok(), "Transaction should eventually succeed"); + } + + #[tokio::test] + async fn test_commit_non_retryable_error() { + // Create a test table with retry properties + let table = setup_test_table("3"); + + // Create a transaction with a simple update action + let tx = create_test_transaction(&table); + + // Create a mock catalog that fails with non-retryable error + let mock_catalog = setup_mock_catalog_with_non_retryable_error(); + + // Commit the transaction + let result = tx.commit(&mock_catalog).await; + + // Verify the result + assert!(result.is_err(), "Transaction should fail immediately"); + if let Err(err) = result { + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert_eq!(err.message(), "Non-retryable error"); + assert!(!err.retryable(), "Error should not be retryable"); + } + } + + #[tokio::test] + async fn test_commit_max_retries_exceeded() { + // Create a test table with retry properties (only allow 2 retries) + let table = setup_test_table("2"); + + // Create a transaction with a simple update action + let tx = create_test_transaction(&table); + + // Create a mock catalog that always fails with retryable error + let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); // Initial attempt + 2 retries = 3 total attempts + + // Commit the transaction + let result = tx.commit(&mock_catalog).await; + + // Verify the result + assert!(result.is_err(), "Transaction should fail after max retries"); + if let Err(err) = result { + assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts); + assert_eq!(err.message(), "Commit conflict"); + assert!(err.retryable(), "Error should be retryable"); + } + } }