Skip to content

Commit bcc9909

Browse files
authored
feat(transaction): Add UpdateStatisticsAction (#1359)
## Which issue does this PR close? - Closes #1358 ## What changes are included in this PR? - Add new transaction api to update statistics ## Are these changes tested? will add uts
1 parent 6c25387 commit bcc9909

File tree

2 files changed

+181
-0
lines changed

2 files changed

+181
-0
lines changed

crates/iceberg/src/transaction/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ mod snapshot;
5757
mod sort_order;
5858
mod update_location;
5959
mod update_properties;
60+
mod update_statistics;
6061
mod upgrade_format_version;
6162

6263
use std::sync::Arc;
@@ -68,6 +69,7 @@ use crate::transaction::append::FastAppendAction;
6869
use crate::transaction::sort_order::ReplaceSortOrderAction;
6970
use crate::transaction::update_location::UpdateLocationAction;
7071
use crate::transaction::update_properties::UpdatePropertiesAction;
72+
use crate::transaction::update_statistics::UpdateStatisticsAction;
7173
use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
7274
use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
7375

@@ -143,6 +145,11 @@ impl Transaction {
143145
UpdateLocationAction::new()
144146
}
145147

148+
/// Update the statistics of table
149+
pub fn update_statistics(&self) -> UpdateStatisticsAction {
150+
UpdateStatisticsAction::new()
151+
}
152+
146153
/// Commit transaction.
147154
pub async fn commit(mut self, catalog: &dyn Catalog) -> Result<Table> {
148155
if self.actions.is_empty() {
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::sync::Arc;
20+
21+
use async_trait::async_trait;
22+
23+
use crate::spec::StatisticsFile;
24+
use crate::table::Table;
25+
use crate::transaction::{ActionCommit, TransactionAction};
26+
use crate::{Result, TableUpdate};
27+
28+
/// A transactional action for updating statistics files in a table
29+
pub struct UpdateStatisticsAction {
30+
statistics_to_set: HashMap<i64, Option<StatisticsFile>>,
31+
}
32+
33+
impl UpdateStatisticsAction {
34+
pub fn new() -> Self {
35+
Self {
36+
statistics_to_set: HashMap::default(),
37+
}
38+
}
39+
40+
/// Set the table's statistics file for given snapshot, replacing the previous statistics file for
41+
/// the snapshot if any exists. The snapshot id of the statistics file will be used.
42+
///
43+
/// # Arguments
44+
///
45+
/// * `statistics_file` - The [`StatisticsFile`] to associate with its corresponding snapshot ID.
46+
///
47+
/// # Returns
48+
///
49+
/// An updated [`UpdateStatisticsAction`] with the new statistics file applied.
50+
pub fn set_statistics(mut self, statistics_file: StatisticsFile) -> Self {
51+
self.statistics_to_set
52+
.insert(statistics_file.snapshot_id, Some(statistics_file));
53+
self
54+
}
55+
56+
/// Remove the table's statistics file for given snapshot.
57+
///
58+
/// # Arguments
59+
///
60+
/// * `snapshot_id` - The ID of the snapshot whose statistics file should be removed.
61+
///
62+
/// # Returns
63+
///
64+
/// An updated [`UpdateStatisticsAction`] with the removal operation recorded.
65+
pub fn remove_statistics(mut self, snapshot_id: i64) -> Self {
66+
self.statistics_to_set.insert(snapshot_id, None);
67+
self
68+
}
69+
}
70+
71+
impl Default for UpdateStatisticsAction {
72+
fn default() -> Self {
73+
Self::new()
74+
}
75+
}
76+
77+
#[async_trait]
78+
impl TransactionAction for UpdateStatisticsAction {
79+
async fn commit(self: Arc<Self>, _table: &Table) -> Result<ActionCommit> {
80+
let mut updates: Vec<TableUpdate> = vec![];
81+
82+
self.statistics_to_set
83+
.iter()
84+
.for_each(|(snapshot_id, statistic_file)| {
85+
if let Some(statistics) = statistic_file {
86+
updates.push(TableUpdate::SetStatistics {
87+
statistics: statistics.clone(),
88+
})
89+
} else {
90+
updates.push(TableUpdate::RemoveStatistics {
91+
snapshot_id: *snapshot_id,
92+
})
93+
}
94+
});
95+
96+
Ok(ActionCommit::new(updates, vec![]))
97+
}
98+
}
99+
100+
#[cfg(test)]
101+
mod tests {
102+
use std::collections::HashMap;
103+
104+
use as_any::Downcast;
105+
106+
use crate::spec::{BlobMetadata, StatisticsFile};
107+
use crate::transaction::tests::make_v2_table;
108+
use crate::transaction::update_statistics::UpdateStatisticsAction;
109+
use crate::transaction::{ApplyTransactionAction, Transaction};
110+
111+
#[test]
112+
fn test_update_statistics() {
113+
let table = make_v2_table();
114+
let tx = Transaction::new(&table);
115+
116+
let statistics_file_1 = StatisticsFile {
117+
snapshot_id: 3055729675574597004i64,
118+
statistics_path: "s3://a/b/stats.puffin".to_string(),
119+
file_size_in_bytes: 413,
120+
file_footer_size_in_bytes: 42,
121+
key_metadata: None,
122+
blob_metadata: vec![BlobMetadata {
123+
r#type: "ndv".to_string(),
124+
snapshot_id: 3055729675574597004i64,
125+
sequence_number: 1,
126+
fields: vec![1],
127+
properties: HashMap::new(),
128+
}],
129+
};
130+
131+
let statistics_file_2 = StatisticsFile {
132+
snapshot_id: 3366729675595277004i64,
133+
statistics_path: "s3://a/b/stats.puffin".to_string(),
134+
file_size_in_bytes: 413,
135+
file_footer_size_in_bytes: 42,
136+
key_metadata: None,
137+
blob_metadata: vec![BlobMetadata {
138+
r#type: "ndv".to_string(),
139+
snapshot_id: 3366729675595277004i64,
140+
sequence_number: 1,
141+
fields: vec![1],
142+
properties: HashMap::new(),
143+
}],
144+
};
145+
146+
// set stats1
147+
let tx = tx
148+
.update_statistics()
149+
.set_statistics(statistics_file_1.clone())
150+
.set_statistics(statistics_file_2.clone())
151+
.remove_statistics(3055729675574597004i64) // remove stats1
152+
.apply(tx)
153+
.unwrap();
154+
155+
let action = (*tx.actions[0])
156+
.downcast_ref::<UpdateStatisticsAction>()
157+
.unwrap();
158+
assert!(
159+
action
160+
.statistics_to_set
161+
.get(&statistics_file_1.snapshot_id)
162+
.unwrap()
163+
.is_none()
164+
); // stats1 should have been removed
165+
assert_eq!(
166+
action
167+
.statistics_to_set
168+
.get(&statistics_file_2.snapshot_id)
169+
.unwrap()
170+
.clone(),
171+
Some(statistics_file_2)
172+
);
173+
}
174+
}

0 commit comments

Comments
 (0)