Skip to content

Commit 93bffe9

Browse files
committed
refresh-ish action
1 parent 4927ba8 commit 93bffe9

File tree

2 files changed

+41
-56
lines changed

2 files changed

+41
-56
lines changed

crates/iceberg/src/transaction/action/mod.rs

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,20 @@
1818
use crate::transaction::Transaction;
1919
use crate::{Result, TableUpdate};
2020

21-
pub type ActionElement<'a> = Box<dyn TransactionAction<'a>>;
21+
pub type PendingAction = Box<dyn TransactionAction>;
2222

23-
pub(crate) trait TransactionAction<'a>: Sync {
24-
/// Apply the pending changes and return the uncommitted changes
25-
/// TODO is this even needed?
26-
fn apply(&mut self) -> Result<Option<TableUpdate>>;
27-
28-
/// Commit the changes and apply the changes to the associated transaction
29-
fn commit(self) -> Result<Transaction<'a>>;
23+
pub(crate) trait TransactionAction: Sync {
24+
/// Commit the changes and apply the changes to the transaction, return the updated transaction
25+
fn commit(self, tx: Transaction) -> Result<Transaction>;
3026
}
3127

32-
pub struct SetLocation<'a> {
33-
pub tx: Transaction<'a>,
34-
location: Option<String>,
28+
pub struct SetLocation {
29+
pub location: Option<String>,
3530
}
3631

37-
impl<'a> SetLocation<'a> {
38-
pub fn new(tx: Transaction<'a>) -> Self {
39-
SetLocation { tx, location: None }
32+
impl SetLocation {
33+
pub fn new() -> Self {
34+
SetLocation { location: None }
4035
}
4136

4237
pub fn set_location(mut self, location: String) -> Self {
@@ -45,24 +40,13 @@ impl<'a> SetLocation<'a> {
4540
}
4641
}
4742

48-
impl<'a> TransactionAction<'a> for SetLocation<'a> {
49-
fn apply(&mut self) -> Result<Option<TableUpdate>> {
50-
if self.location.is_none() {
51-
return Ok(None);
52-
}
53-
Ok(Some(TableUpdate::SetLocation {
54-
location: self.location.clone().unwrap(),
55-
}))
56-
}
57-
58-
fn commit(mut self) -> Result<Transaction<'a>> {
59-
let location = &mut self.apply()?;
60-
if location.is_none() {
61-
return Ok(self.tx);
43+
impl TransactionAction for SetLocation {
44+
fn commit(self, mut tx: Transaction) -> Result<Transaction> {
45+
if let Some(location) = self.location.clone() {
46+
tx.apply(vec![TableUpdate::SetLocation { location }], vec![])?;
6247
}
48+
tx.actions.push(Box::new(self));
6349

64-
self.tx.apply(vec![location.clone().unwrap()], vec![])?;
65-
Ok(self.tx)
66-
// self.tx.actions.push(Box::new(self));
50+
Ok(tx)
6751
}
6852
}

crates/iceberg/src/transaction/mod.rs

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::TableUpdate::UpgradeFormatVersion;
3333
use crate::error::Result;
3434
use crate::spec::FormatVersion;
3535
use crate::table::Table;
36-
use crate::transaction::action::{ActionElement, SetLocation};
36+
use crate::transaction::action::{SetLocation, TransactionAction, PendingAction};
3737
use crate::transaction::append::FastAppendAction;
3838
use crate::transaction::sort_order::ReplaceSortOrderAction;
3939
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
@@ -42,7 +42,7 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat
4242
pub struct Transaction<'a> {
4343
base_table: &'a Table,
4444
current_table: Table,
45-
_actions: Vec<ActionElement<'a>>, // TODO unused for now, should we use this to reapply actions?
45+
actions: Vec<PendingAction>,
4646
updates: Vec<TableUpdate>,
4747
requirements: Vec<TableRequirement>,
4848
}
@@ -53,12 +53,21 @@ impl<'a> Transaction<'a> {
5353
Self {
5454
base_table: table,
5555
current_table: table.clone(),
56-
_actions: vec![],
56+
actions: vec![],
5757
updates: vec![],
5858
requirements: vec![],
5959
}
6060
}
6161

62+
pub fn refresh(old_tx: Transaction<'a>, refreshed: Table) -> Result<Self> {
63+
let mut new_tx = Transaction::new(&refreshed.clone());
64+
for action in &old_tx.actions {
65+
new_tx = action.commit(new_tx)?
66+
}
67+
68+
Ok(new_tx)
69+
}
70+
6271
fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> {
6372
let mut metadata_builder = self.current_table.metadata().clone().into_builder(None);
6473
for update in updates {
@@ -188,17 +197,12 @@ impl<'a> Transaction<'a> {
188197
}
189198

190199
/// Set the location of table
191-
pub fn set_location(self) -> Result<SetLocation<'a>> {
192-
Ok(SetLocation::new(self))
193-
}
194-
195-
fn refresh(&mut self, refreshed: Table) {
196-
self.base_table = &refreshed;
197-
self.current_table = refreshed.clone();
200+
pub fn set_location(self, location: String) -> Result<Transaction<'a>> {
201+
Ok(SetLocation::new().set_location(location).commit(self)?)
198202
}
199203

200204
/// Commit transaction.
201-
pub async fn commit(mut self, catalog: &dyn Catalog) -> Result<Table> {
205+
pub async fn commit(mut self: Transaction<'a>, catalog: &dyn Catalog) -> Result<Table> {
202206
// let table_commit = TableCommit::builder()
203207
// .ident(self.base_table.identifier().clone())
204208
// .updates(self.updates)
@@ -217,14 +221,14 @@ impl<'a> Transaction<'a> {
217221
if self.base_table.metadata() != refreshed.metadata()
218222
|| self.base_table.metadata_location() != refreshed.metadata_location()
219223
{
220-
self.refresh(refreshed);
221-
self.apply(self.updates, self.requirements) // TODO need create new requirements based on the refreshed table
222-
.expect("Failed to re-apply updates"); // re-apply updates
223-
// TODO retry on this error
224-
return Err(Error::new(
225-
ErrorKind::DataInvalid,
226-
"Cannot commit: stale base table metadata".to_string(),
227-
));
224+
// refresh table
225+
let new_tx = Transaction::refresh(self, refreshed)?;
226+
return new_tx.commit(catalog).await
227+
// TODO instead of refreshing directly, retry on this error
228+
// return Err(Error::new(
229+
// ErrorKind::DataInvalid,
230+
// "Cannot commit: stale base table metadata".to_string(),
231+
// ));
228232
}
229233

230234
if self.base_table.metadata() == self.current_table.metadata()
@@ -386,12 +390,9 @@ mod tests {
386390
fn test_set_location() {
387391
let table = make_v2_table();
388392
let tx = Transaction::new(&table);
389-
let set_location = tx
390-
.set_location()
391-
.unwrap()
392-
.set_location(String::from("s3://bucket/prefix/new_table"));
393-
394-
let tx = set_location.commit().unwrap();
393+
let tx = tx
394+
.set_location(String::from("s3://bucket/prefix/new_table"))
395+
.unwrap();
395396

396397
assert_eq!(
397398
vec![TableUpdate::SetLocation {

0 commit comments

Comments
 (0)