From 93b8d1eb882645290110d1ed09104e414aa4cb34 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 19 May 2025 17:55:35 -0700 Subject: [PATCH 1/6] Add update statistics --- crates/iceberg/src/transaction/mod.rs | 7 +++ .../src/transaction/update_statistics.rs | 53 +++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 crates/iceberg/src/transaction/update_statistics.rs diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 4f760cacd..3e3398acd 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -27,6 +27,7 @@ mod sort_order; mod update_location; mod update_properties; mod upgrade_format_version; +mod update_statistics; use std::sync::Arc; @@ -38,6 +39,7 @@ use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; +use crate::transaction::update_statistics::UpdateStatistics; use crate::{Catalog, TableCommit, TableRequirement, TableUpdate}; /// Table transaction. @@ -112,6 +114,11 @@ impl Transaction { UpdateLocationAction::new() } + /// Update the statistics of table + pub fn update_statistics(self) -> UpdateStatistics<'a> { + UpdateStatistics::new(self) + } + /// Commit transaction. pub async fn commit(mut self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { diff --git a/crates/iceberg/src/transaction/update_statistics.rs b/crates/iceberg/src/transaction/update_statistics.rs new file mode 100644 index 000000000..3ade3b970 --- /dev/null +++ b/crates/iceberg/src/transaction/update_statistics.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::StatisticsFile; +use crate::transaction::Transaction; +use crate::{Error, TableUpdate}; + +pub struct UpdateStatistics<'a> { + tx: Transaction<'a>, + updates: Vec, +} + +impl<'a> UpdateStatistics<'a> { + pub fn new(tx: Transaction<'a>) -> Self { + UpdateStatistics { + tx, + updates: Vec::new(), + } + } + + pub fn set_statistics(&mut self, statistics: StatisticsFile) -> Result<&mut Self, Error> { + self.updates.push(TableUpdate::SetStatistics { statistics }); + + Ok(self) + } + + pub fn remove_statistics(&mut self, snapshot_id: i64) -> Result<&mut Self, Error> { + self.updates + .push(TableUpdate::RemoveStatistics { snapshot_id }); + + Ok(self) + } + + pub fn apply(mut self) -> Result, Error> { + self.tx.apply(self.updates, vec![])?; + + Ok(self.tx) + } +} From ef6a15eb298d30c01fe6dff3041e3e26319ed0a7 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 21 May 2025 12:33:17 -0700 Subject: [PATCH 2/6] add ut --- .../src/transaction/update_statistics.rs | 87 ++++++++++++++++++- 1 file changed, 84 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/transaction/update_statistics.rs b/crates/iceberg/src/transaction/update_statistics.rs index 3ade3b970..c64e696d2 100644 --- a/crates/iceberg/src/transaction/update_statistics.rs +++ b/crates/iceberg/src/transaction/update_statistics.rs @@ -26,19 +26,19 @@ pub struct UpdateStatistics<'a> { impl<'a> UpdateStatistics<'a> { pub fn new(tx: Transaction<'a>) -> Self { - UpdateStatistics { + Self { tx, updates: Vec::new(), } } - pub fn set_statistics(&mut self, statistics: StatisticsFile) -> Result<&mut Self, Error> { + pub fn set_statistics(mut self, statistics: StatisticsFile) -> Result { self.updates.push(TableUpdate::SetStatistics { statistics }); Ok(self) } - pub fn remove_statistics(&mut self, snapshot_id: i64) -> Result<&mut Self, Error> { + pub fn remove_statistics(mut self, snapshot_id: i64) -> Result { self.updates .push(TableUpdate::RemoveStatistics { snapshot_id }); @@ -51,3 +51,84 @@ impl<'a> UpdateStatistics<'a> { Ok(self.tx) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use crate::TableUpdate; + use crate::spec::{BlobMetadata, StatisticsFile}; + use crate::transaction::Transaction; + use crate::transaction::tests::make_v2_table; + + #[test] + fn test_update_statistics() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + + let statistics_file_1 = StatisticsFile { + snapshot_id: 3055729675574597004i64, + statistics_path: "s3://a/b/stats.puffin".to_string(), + file_size_in_bytes: 413, + file_footer_size_in_bytes: 42, + key_metadata: None, + blob_metadata: vec![BlobMetadata { + r#type: "ndv".to_string(), + snapshot_id: 3055729675574597004i64, + sequence_number: 1, + fields: vec![1], + properties: HashMap::new(), + }], + }; + + let statistics_file_2 = StatisticsFile { + snapshot_id: 3366729675595277004i64, + statistics_path: "s3://a/b/stats.puffin".to_string(), + file_size_in_bytes: 413, + file_footer_size_in_bytes: 42, + key_metadata: None, + blob_metadata: vec![BlobMetadata { + r#type: "ndv".to_string(), + snapshot_id: 3366729675595277004i64, + sequence_number: 1, + fields: vec![1], + properties: HashMap::new(), + }], + }; + + // set stats1 + let tx = tx + .update_statistics() + .set_statistics(statistics_file_1.clone()) + .unwrap() + .apply() + .unwrap(); + + let TableUpdate::SetStatistics { statistics } = tx.updates.get(0).unwrap().clone() else { + panic!("The update should be a TableUpdate::SetStatistics!"); + }; + assert_eq!(statistics, statistics_file_1); + + // start a new update, remove stat1 and set stat2 + let tx = tx + .update_statistics() + .remove_statistics(3055729675574597004i64) + .unwrap() + .set_statistics(statistics_file_2.clone()) + .unwrap() + .apply() + .unwrap(); + + assert_eq!(tx.updates.len(), 3); + let TableUpdate::RemoveStatistics { snapshot_id } = tx.updates.get(1).unwrap().clone() + else { + panic!("The update should be a TableUpdate::RemoveStatistics!"); + }; + assert_eq!(snapshot_id, 3055729675574597004i64); + + let TableUpdate::SetStatistics { statistics } = tx.updates.get(2).unwrap().clone() else { + panic!("The update should be a TableUpdate::SetStatistics!"); + }; + assert_eq!(statistics, statistics_file_2); + } +} From 373462d3529218b34fcf05d3a0d1ce7ff03afc3c Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 21 May 2025 14:36:12 -0700 Subject: [PATCH 3/6] minor --- crates/iceberg/src/transaction/update_statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/update_statistics.rs b/crates/iceberg/src/transaction/update_statistics.rs index c64e696d2..39a787c12 100644 --- a/crates/iceberg/src/transaction/update_statistics.rs +++ b/crates/iceberg/src/transaction/update_statistics.rs @@ -104,7 +104,7 @@ mod tests { .apply() .unwrap(); - let TableUpdate::SetStatistics { statistics } = tx.updates.get(0).unwrap().clone() else { + let TableUpdate::SetStatistics { statistics } = tx.updates.first().unwrap().clone() else { panic!("The update should be a TableUpdate::SetStatistics!"); }; assert_eq!(statistics, statistics_file_1); From 764883d22881ec80d28943c8058dfbc72b9b725f Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 20 Jun 2025 16:49:15 -0700 Subject: [PATCH 4/6] use the new tx api --- crates/iceberg/src/transaction/mod.rs | 8 +- .../src/transaction/update_statistics.rs | 136 +++++++++++------- 2 files changed, 89 insertions(+), 55 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 3e3398acd..7aefca710 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -26,8 +26,8 @@ mod snapshot; mod sort_order; mod update_location; mod update_properties; -mod upgrade_format_version; mod update_statistics; +mod upgrade_format_version; use std::sync::Arc; @@ -38,8 +38,8 @@ use crate::transaction::append::FastAppendAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; +use crate::transaction::update_statistics::UpdateStatisticsAction; use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; -use crate::transaction::update_statistics::UpdateStatistics; use crate::{Catalog, TableCommit, TableRequirement, TableUpdate}; /// Table transaction. @@ -115,8 +115,8 @@ impl Transaction { } /// Update the statistics of table - pub fn update_statistics(self) -> UpdateStatistics<'a> { - UpdateStatistics::new(self) + pub fn update_statistics(&self) -> UpdateStatisticsAction { + UpdateStatisticsAction::new() } /// Commit transaction. diff --git a/crates/iceberg/src/transaction/update_statistics.rs b/crates/iceberg/src/transaction/update_statistics.rs index 39a787c12..884a46cb3 100644 --- a/crates/iceberg/src/transaction/update_statistics.rs +++ b/crates/iceberg/src/transaction/update_statistics.rs @@ -15,40 +15,79 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; + use crate::spec::StatisticsFile; -use crate::transaction::Transaction; -use crate::{Error, TableUpdate}; +use crate::table::Table; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Result, TableUpdate}; -pub struct UpdateStatistics<'a> { - tx: Transaction<'a>, - updates: Vec, +/// A transactional action for updating statistics files in a table +pub struct UpdateStatisticsAction { + statistics_to_set: HashMap>, } -impl<'a> UpdateStatistics<'a> { - pub fn new(tx: Transaction<'a>) -> Self { +impl UpdateStatisticsAction { + pub fn new() -> Self { Self { - tx, - updates: Vec::new(), + statistics_to_set: HashMap::default(), } } - pub fn set_statistics(mut self, statistics: StatisticsFile) -> Result { - self.updates.push(TableUpdate::SetStatistics { statistics }); - - Ok(self) + /// Set the table's statistics file for given snapshot, replacing the previous statistics file for + /// the snapshot if any exists. The snapshot id of the statistics file will be used. + /// + /// # Arguments + /// + /// * `statistics_file` - The [`StatisticsFile`] to associate with its corresponding snapshot ID. + /// + /// # Returns + /// + /// An updated [`UpdateStatisticsAction`] with the new statistics file applied. + pub fn set_statistics(mut self, statistics_file: StatisticsFile) -> Self { + self.statistics_to_set + .insert(statistics_file.snapshot_id, Some(statistics_file)); + self } - pub fn remove_statistics(mut self, snapshot_id: i64) -> Result { - self.updates - .push(TableUpdate::RemoveStatistics { snapshot_id }); - - Ok(self) + /// Remove the table's statistics file for given snapshot. + /// + /// # Arguments + /// + /// * `snapshot_id` - The ID of the snapshot whose statistics file should be removed. + /// + /// # Returns + /// + /// An updated [`UpdateStatisticsAction`] with the removal operation recorded. + pub fn remove_statistics(mut self, snapshot_id: i64) -> Self { + self.statistics_to_set.insert(snapshot_id, None); + self } +} - pub fn apply(mut self) -> Result, Error> { - self.tx.apply(self.updates, vec![])?; - - Ok(self.tx) +#[async_trait] +impl TransactionAction for UpdateStatisticsAction { + async fn commit(self: Arc, _table: &Table) -> Result { + let mut updates: Vec = vec![]; + + self.statistics_to_set + .iter() + .for_each(|(snapshot_id, statistic_file)| { + if let Some(statistics) = statistic_file { + updates.push(TableUpdate::SetStatistics { + statistics: statistics.clone(), + }) + } else { + updates.push(TableUpdate::RemoveStatistics { + snapshot_id: snapshot_id.clone(), + }) + } + }); + + Ok(ActionCommit::new(updates, vec![])) } } @@ -56,10 +95,12 @@ impl<'a> UpdateStatistics<'a> { mod tests { use std::collections::HashMap; - use crate::TableUpdate; + use as_any::Downcast; + use crate::spec::{BlobMetadata, StatisticsFile}; - use crate::transaction::Transaction; use crate::transaction::tests::make_v2_table; + use crate::transaction::update_statistics::UpdateStatisticsAction; + use crate::transaction::{ApplyTransactionAction, Transaction}; #[test] fn test_update_statistics() { @@ -100,35 +141,28 @@ mod tests { let tx = tx .update_statistics() .set_statistics(statistics_file_1.clone()) - .unwrap() - .apply() - .unwrap(); - - let TableUpdate::SetStatistics { statistics } = tx.updates.first().unwrap().clone() else { - panic!("The update should be a TableUpdate::SetStatistics!"); - }; - assert_eq!(statistics, statistics_file_1); - - // start a new update, remove stat1 and set stat2 - let tx = tx - .update_statistics() - .remove_statistics(3055729675574597004i64) - .unwrap() .set_statistics(statistics_file_2.clone()) - .unwrap() - .apply() + .remove_statistics(3055729675574597004i64) // remove stats1 + .apply(tx) .unwrap(); - assert_eq!(tx.updates.len(), 3); - let TableUpdate::RemoveStatistics { snapshot_id } = tx.updates.get(1).unwrap().clone() - else { - panic!("The update should be a TableUpdate::RemoveStatistics!"); - }; - assert_eq!(snapshot_id, 3055729675574597004i64); - - let TableUpdate::SetStatistics { statistics } = tx.updates.get(2).unwrap().clone() else { - panic!("The update should be a TableUpdate::SetStatistics!"); - }; - assert_eq!(statistics, statistics_file_2); + let action = (*tx.actions[0]) + .downcast_ref::() + .unwrap(); + assert!( + action + .statistics_to_set + .get(&statistics_file_1.snapshot_id) + .unwrap() + .is_none() + ); // stats1 should have been removed + assert_eq!( + action + .statistics_to_set + .get(&statistics_file_2.snapshot_id) + .unwrap() + .clone(), + Some(statistics_file_2) + ); } } From d644bb5cb85694a2dddd236251eb25a700662e66 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sat, 21 Jun 2025 10:49:27 -0700 Subject: [PATCH 5/6] clippy rules --- crates/iceberg/src/transaction/update_statistics.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/update_statistics.rs b/crates/iceberg/src/transaction/update_statistics.rs index 884a46cb3..636bb0ca5 100644 --- a/crates/iceberg/src/transaction/update_statistics.rs +++ b/crates/iceberg/src/transaction/update_statistics.rs @@ -68,6 +68,12 @@ impl UpdateStatisticsAction { } } +impl Default for UpdateStatisticsAction { + fn default() -> Self { + Self::new() + } +} + #[async_trait] impl TransactionAction for UpdateStatisticsAction { async fn commit(self: Arc, _table: &Table) -> Result { @@ -82,7 +88,7 @@ impl TransactionAction for UpdateStatisticsAction { }) } else { updates.push(TableUpdate::RemoveStatistics { - snapshot_id: snapshot_id.clone(), + snapshot_id: *snapshot_id, }) } }); From e0c01bdaca5d2f748a67fc5f8ad341f4c8dc3966 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sun, 22 Jun 2025 17:46:19 -0700 Subject: [PATCH 6/6] Trigger Build