Skip to content

Commit 7a04d82

Browse files
committed
use the new tx api
1 parent 5cf4cbb commit 7a04d82

File tree

2 files changed

+89
-55
lines changed

2 files changed

+89
-55
lines changed

crates/iceberg/src/transaction/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ mod snapshot;
2626
mod sort_order;
2727
mod update_location;
2828
mod update_properties;
29-
mod upgrade_format_version;
3029
mod update_statistics;
30+
mod upgrade_format_version;
3131

3232
use std::mem::discriminant;
3333
use std::sync::Arc;
@@ -41,8 +41,8 @@ use crate::transaction::append::FastAppendAction;
4141
use crate::transaction::sort_order::ReplaceSortOrderAction;
4242
use crate::transaction::update_location::UpdateLocationAction;
4343
use crate::transaction::update_properties::UpdatePropertiesAction;
44+
use crate::transaction::update_statistics::UpdateStatisticsAction;
4445
use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
45-
use crate::transaction::update_statistics::UpdateStatistics;
4646
use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
4747

4848
/// Table transaction.
@@ -158,8 +158,8 @@ impl Transaction {
158158
}
159159

160160
/// Update the statistics of table
161-
pub fn update_statistics(self) -> UpdateStatistics<'a> {
162-
UpdateStatistics::new(self)
161+
pub fn update_statistics(&self) -> UpdateStatisticsAction {
162+
UpdateStatisticsAction::new()
163163
}
164164

165165
/// Commit transaction.

crates/iceberg/src/transaction/update_statistics.rs

Lines changed: 85 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,51 +15,92 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashMap;
19+
use std::sync::Arc;
20+
21+
use async_trait::async_trait;
22+
1823
use crate::spec::StatisticsFile;
19-
use crate::transaction::Transaction;
20-
use crate::{Error, TableUpdate};
24+
use crate::table::Table;
25+
use crate::transaction::{ActionCommit, TransactionAction};
26+
use crate::{Result, TableUpdate};
2127

22-
pub struct UpdateStatistics<'a> {
23-
tx: Transaction<'a>,
24-
updates: Vec<TableUpdate>,
28+
/// A transactional action for updating statistics files in a table
29+
pub struct UpdateStatisticsAction {
30+
statistics_to_set: HashMap<i64, Option<StatisticsFile>>,
2531
}
2632

27-
impl<'a> UpdateStatistics<'a> {
28-
pub fn new(tx: Transaction<'a>) -> Self {
33+
impl UpdateStatisticsAction {
34+
pub fn new() -> Self {
2935
Self {
30-
tx,
31-
updates: Vec::new(),
36+
statistics_to_set: HashMap::default(),
3237
}
3338
}
3439

35-
pub fn set_statistics(mut self, statistics: StatisticsFile) -> Result<Self, Error> {
36-
self.updates.push(TableUpdate::SetStatistics { statistics });
37-
38-
Ok(self)
40+
/// Set the table's statistics file for given snapshot, replacing the previous statistics file for
41+
/// the snapshot if any exists. The snapshot id of the statistics file will be used.
42+
///
43+
/// # Arguments
44+
///
45+
/// * `statistics_file` - The [`StatisticsFile`] to associate with its corresponding snapshot ID.
46+
///
47+
/// # Returns
48+
///
49+
/// An updated [`UpdateStatisticsAction`] with the new statistics file applied.
50+
pub fn set_statistics(mut self, statistics_file: StatisticsFile) -> Self {
51+
self.statistics_to_set
52+
.insert(statistics_file.snapshot_id, Some(statistics_file));
53+
self
3954
}
4055

41-
pub fn remove_statistics(mut self, snapshot_id: i64) -> Result<Self, Error> {
42-
self.updates
43-
.push(TableUpdate::RemoveStatistics { snapshot_id });
44-
45-
Ok(self)
56+
/// Remove the table's statistics file for given snapshot.
57+
///
58+
/// # Arguments
59+
///
60+
/// * `snapshot_id` - The ID of the snapshot whose statistics file should be removed.
61+
///
62+
/// # Returns
63+
///
64+
/// An updated [`UpdateStatisticsAction`] with the removal operation recorded.
65+
pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
66+
self.statistics_to_set.insert(snapshot_id, None);
67+
self
4668
}
69+
}
4770

48-
pub fn apply(mut self) -> Result<Transaction<'a>, Error> {
49-
self.tx.apply(self.updates, vec![])?;
50-
51-
Ok(self.tx)
71+
#[async_trait]
72+
impl TransactionAction for UpdateStatisticsAction {
73+
async fn commit(self: Arc<Self>, _table: &Table) -> Result<ActionCommit> {
74+
let mut updates: Vec<TableUpdate> = vec![];
75+
76+
self.statistics_to_set
77+
.iter()
78+
.for_each(|(snapshot_id, statistic_file)| {
79+
if let Some(statistics) = statistic_file {
80+
updates.push(TableUpdate::SetStatistics {
81+
statistics: statistics.clone(),
82+
})
83+
} else {
84+
updates.push(TableUpdate::RemoveStatistics {
85+
snapshot_id: snapshot_id.clone(),
86+
})
87+
}
88+
});
89+
90+
Ok(ActionCommit::new(updates, vec![]))
5291
}
5392
}
5493

5594
#[cfg(test)]
5695
mod tests {
5796
use std::collections::HashMap;
5897

59-
use crate::TableUpdate;
98+
use as_any::Downcast;
99+
60100
use crate::spec::{BlobMetadata, StatisticsFile};
61-
use crate::transaction::Transaction;
62101
use crate::transaction::tests::make_v2_table;
102+
use crate::transaction::update_statistics::UpdateStatisticsAction;
103+
use crate::transaction::{ApplyTransactionAction, Transaction};
63104

64105
#[test]
65106
fn test_update_statistics() {
@@ -100,35 +141,28 @@ mod tests {
100141
let tx = tx
101142
.update_statistics()
102143
.set_statistics(statistics_file_1.clone())
103-
.unwrap()
104-
.apply()
105-
.unwrap();
106-
107-
let TableUpdate::SetStatistics { statistics } = tx.updates.first().unwrap().clone() else {
108-
panic!("The update should be a TableUpdate::SetStatistics!");
109-
};
110-
assert_eq!(statistics, statistics_file_1);
111-
112-
// start a new update, remove stat1 and set stat2
113-
let tx = tx
114-
.update_statistics()
115-
.remove_statistics(3055729675574597004i64)
116-
.unwrap()
117144
.set_statistics(statistics_file_2.clone())
118-
.unwrap()
119-
.apply()
145+
.remove_statistics(3055729675574597004i64) // remove stats1
146+
.apply(tx)
120147
.unwrap();
121148

122-
assert_eq!(tx.updates.len(), 3);
123-
let TableUpdate::RemoveStatistics { snapshot_id } = tx.updates.get(1).unwrap().clone()
124-
else {
125-
panic!("The update should be a TableUpdate::RemoveStatistics!");
126-
};
127-
assert_eq!(snapshot_id, 3055729675574597004i64);
128-
129-
let TableUpdate::SetStatistics { statistics } = tx.updates.get(2).unwrap().clone() else {
130-
panic!("The update should be a TableUpdate::SetStatistics!");
131-
};
132-
assert_eq!(statistics, statistics_file_2);
149+
let action = (*tx.actions[0])
150+
.downcast_ref::<UpdateStatisticsAction>()
151+
.unwrap();
152+
assert!(
153+
action
154+
.statistics_to_set
155+
.get(&statistics_file_1.snapshot_id)
156+
.unwrap()
157+
.is_none()
158+
); // stats1 should have been removed
159+
assert_eq!(
160+
action
161+
.statistics_to_set
162+
.get(&statistics_file_2.snapshot_id)
163+
.unwrap()
164+
.clone(),
165+
Some(statistics_file_2)
166+
);
133167
}
134168
}

0 commit comments

Comments
 (0)