@@ -33,7 +33,7 @@ use crate::TableUpdate::UpgradeFormatVersion;
33
33
use crate :: error:: Result ;
34
34
use crate :: spec:: FormatVersion ;
35
35
use crate :: table:: Table ;
36
- use crate :: transaction:: action:: { ActionElement , SetLocation } ;
36
+ use crate :: transaction:: action:: { SetLocation , TransactionAction , PendingAction } ;
37
37
use crate :: transaction:: append:: FastAppendAction ;
38
38
use crate :: transaction:: sort_order:: ReplaceSortOrderAction ;
39
39
use crate :: { Catalog , Error , ErrorKind , TableCommit , TableRequirement , TableUpdate } ;
@@ -42,7 +42,7 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat
42
42
pub struct Transaction < ' a > {
43
43
base_table : & ' a Table ,
44
44
current_table : Table ,
45
- _actions : Vec < ActionElement < ' a > > , // TODO unused for now, should we use this to reapply actions?
45
+ actions : Vec < PendingAction > ,
46
46
updates : Vec < TableUpdate > ,
47
47
requirements : Vec < TableRequirement > ,
48
48
}
@@ -53,12 +53,21 @@ impl<'a> Transaction<'a> {
53
53
Self {
54
54
base_table : table,
55
55
current_table : table. clone ( ) ,
56
- _actions : vec ! [ ] ,
56
+ actions : vec ! [ ] ,
57
57
updates : vec ! [ ] ,
58
58
requirements : vec ! [ ] ,
59
59
}
60
60
}
61
61
62
+ pub fn refresh ( old_tx : Transaction < ' a > , refreshed : Table ) -> Result < Self > {
63
+ let mut new_tx = Transaction :: new ( & refreshed. clone ( ) ) ;
64
+ for action in & old_tx. actions {
65
+ new_tx = action. commit ( new_tx) ?
66
+ }
67
+
68
+ Ok ( new_tx)
69
+ }
70
+
62
71
fn update_table_metadata ( & mut self , updates : & [ TableUpdate ] ) -> Result < ( ) > {
63
72
let mut metadata_builder = self . current_table . metadata ( ) . clone ( ) . into_builder ( None ) ;
64
73
for update in updates {
@@ -188,17 +197,12 @@ impl<'a> Transaction<'a> {
188
197
}
189
198
190
199
/// Set the location of table
191
- pub fn set_location ( self ) -> Result < SetLocation < ' a > > {
192
- Ok ( SetLocation :: new ( self ) )
193
- }
194
-
195
- fn refresh ( & mut self , refreshed : Table ) {
196
- self . base_table = & refreshed;
197
- self . current_table = refreshed. clone ( ) ;
200
+ pub fn set_location ( self , location : String ) -> Result < Transaction < ' a > > {
201
+ Ok ( SetLocation :: new ( ) . set_location ( location) . commit ( self ) ?)
198
202
}
199
203
200
204
/// Commit transaction.
201
- pub async fn commit ( mut self , catalog : & dyn Catalog ) -> Result < Table > {
205
+ pub async fn commit ( mut self : Transaction < ' a > , catalog : & dyn Catalog ) -> Result < Table > {
202
206
// let table_commit = TableCommit::builder()
203
207
// .ident(self.base_table.identifier().clone())
204
208
// .updates(self.updates)
@@ -217,14 +221,14 @@ impl<'a> Transaction<'a> {
217
221
if self . base_table . metadata ( ) != refreshed. metadata ( )
218
222
|| self . base_table . metadata_location ( ) != refreshed. metadata_location ( )
219
223
{
220
- self . refresh ( refreshed ) ;
221
- self . apply ( self . updates , self . requirements ) // TODO need create new requirements based on the refreshed table
222
- . expect ( "Failed to re-apply updates" ) ; // re-apply updates
223
- // TODO retry on this error
224
- return Err ( Error :: new (
225
- ErrorKind :: DataInvalid ,
226
- "Cannot commit: stale base table metadata" . to_string ( ) ,
227
- ) ) ;
224
+ // refresh table
225
+ let new_tx = Transaction :: refresh ( self , refreshed) ? ;
226
+ return new_tx . commit ( catalog ) . await
227
+ // TODO instead of refreshing directly, retry on this error
228
+ // return Err(Error::new(
229
+ // ErrorKind::DataInvalid,
230
+ // "Cannot commit: stale base table metadata".to_string(),
231
+ // ));
228
232
}
229
233
230
234
if self . base_table . metadata ( ) == self . current_table . metadata ( )
@@ -386,12 +390,9 @@ mod tests {
386
390
fn test_set_location ( ) {
387
391
let table = make_v2_table ( ) ;
388
392
let tx = Transaction :: new ( & table) ;
389
- let set_location = tx
390
- . set_location ( )
391
- . unwrap ( )
392
- . set_location ( String :: from ( "s3://bucket/prefix/new_table" ) ) ;
393
-
394
- let tx = set_location. commit ( ) . unwrap ( ) ;
393
+ let tx = tx
394
+ . set_location ( String :: from ( "s3://bucket/prefix/new_table" ) )
395
+ . unwrap ( ) ;
395
396
396
397
assert_eq ! (
397
398
vec![ TableUpdate :: SetLocation {
0 commit comments