@@ -82,7 +82,7 @@ impl MemoryCatalog {
82
82
83
83
// Checks whether the commit's expectations are met by the current table state.
84
84
let location =
85
- check_current_table_state ( & current_table, commit. take_requirements ( ) ) . await ?;
85
+ Self :: check_current_table_state ( & current_table, commit. take_requirements ( ) ) . await ?;
86
86
87
87
self . apply_table_updates_and_write_metadata (
88
88
& current_table,
@@ -102,7 +102,7 @@ impl MemoryCatalog {
102
102
103
103
// Build the new table metadata.
104
104
let new_metadata =
105
- apply_table_updates ( current_table, current_location, & new_location, updates) ?;
105
+ Self :: apply_table_updates ( current_table, current_location, & new_location, updates) ?;
106
106
107
107
// Write the updated metadata to it's new location.
108
108
self . write_metadata ( & new_metadata, & new_location) . await ?;
@@ -136,6 +136,56 @@ impl MemoryCatalog {
136
136
. write ( serde_json:: to_vec ( metadata) ?. into ( ) )
137
137
. await
138
138
}
139
+
140
+ /// Verifies that the a TableCommit's requirements are met by the current table state.
141
+ /// If not, there's a conflict and the client should retry the commit.
142
+ async fn check_current_table_state (
143
+ current_table : & Table ,
144
+ requirements : Vec < TableRequirement > ,
145
+ ) -> Result < MetadataLocation > {
146
+ let location =
147
+ MetadataLocation :: from_str ( current_table. metadata_location ( ) . ok_or ( Error :: new (
148
+ ErrorKind :: DataInvalid ,
149
+ format ! (
150
+ "Table metadata location is not set: {}" ,
151
+ current_table. identifier( )
152
+ ) ,
153
+ ) ) ?) ?;
154
+
155
+ // Check that the commit's point of view is still reflected by the current state of the table.
156
+ for requirement in requirements {
157
+ requirement
158
+ . check ( Some ( current_table. metadata ( ) ) )
159
+ . map_err ( |e| {
160
+ Error :: new (
161
+ ErrorKind :: Unexpected ,
162
+ "Conflict: One or more requirements failed, the client my retry" ,
163
+ )
164
+ . with_source ( e)
165
+ } ) ?;
166
+ }
167
+
168
+ Ok ( location)
169
+ }
170
+
171
+ fn apply_table_updates (
172
+ table : & Table ,
173
+ current_location : & MetadataLocation ,
174
+ new_location : & MetadataLocation ,
175
+ updates : Vec < TableUpdate > ,
176
+ ) -> Result < TableMetadata > {
177
+ let mut builder = TableMetadataBuilder :: new_from_metadata (
178
+ table. metadata ( ) . clone ( ) ,
179
+ Some ( current_location. to_string ( ) ) ,
180
+ )
181
+ . set_location ( new_location. to_string ( ) ) ;
182
+
183
+ for update in updates {
184
+ builder = update. apply ( builder) ?;
185
+ }
186
+
187
+ Ok ( builder. build ( ) ?. metadata )
188
+ }
139
189
}
140
190
141
191
#[ async_trait]
@@ -341,66 +391,19 @@ impl Catalog for MemoryCatalog {
341
391
let mut locked_namespace_state = self . root_namespace_state . lock ( ) . await ;
342
392
343
393
// Updates the current table version and writes a new metadata file.
344
- let ( updated_table , new_metadata_location) = self
394
+ let ( staged_updated_table , new_metadata_location) = self
345
395
. update_table_in_locked_state ( commit, & locked_namespace_state)
346
396
. await ?;
347
397
348
398
// Flip the pointer to reference the new metadata file.
349
399
locked_namespace_state
350
- . commit_table_update ( updated_table . identifier ( ) , new_metadata_location) ?;
400
+ . commit_table_update ( staged_updated_table . identifier ( ) , new_metadata_location) ?;
351
401
352
- Ok ( updated_table)
353
- }
354
- }
402
+ // After the update is committed, the table is now the current version.
403
+ let updated_table = staged_updated_table;
355
404
356
- /// Verifies that the a TableCommit's requirements are met by the current table state.
357
- /// If not, there's a conflict and the client should retry the commit.
358
- async fn check_current_table_state (
359
- current_table : & Table ,
360
- requirements : Vec < TableRequirement > ,
361
- ) -> Result < MetadataLocation > {
362
- let location =
363
- MetadataLocation :: from_str ( current_table. metadata_location ( ) . ok_or ( Error :: new (
364
- ErrorKind :: DataInvalid ,
365
- format ! (
366
- "Table metadata location is not set: {}" ,
367
- current_table. identifier( )
368
- ) ,
369
- ) ) ?) ?;
370
-
371
- // Check that the commit's point of view is still reflected by the current state of the table.
372
- for requirement in requirements {
373
- requirement
374
- . check ( Some ( current_table. metadata ( ) ) )
375
- . map_err ( |e| {
376
- Error :: new (
377
- ErrorKind :: Unexpected ,
378
- "Conflict: One or more requirements failed, the client my retry" ,
379
- )
380
- . with_source ( e)
381
- } ) ?;
382
- }
383
-
384
- Ok ( location)
385
- }
386
-
387
- fn apply_table_updates (
388
- table : & Table ,
389
- current_location : & MetadataLocation ,
390
- new_location : & MetadataLocation ,
391
- updates : Vec < TableUpdate > ,
392
- ) -> Result < TableMetadata > {
393
- let mut builder = TableMetadataBuilder :: new_from_metadata (
394
- table. metadata ( ) . clone ( ) ,
395
- Some ( current_location. to_string ( ) ) ,
396
- )
397
- . set_location ( new_location. to_string ( ) ) ;
398
-
399
- for update in updates {
400
- builder = update. apply ( builder) ?;
405
+ Ok ( updated_table)
401
406
}
402
-
403
- Ok ( builder. build ( ) ?. metadata )
404
407
}
405
408
406
409
#[ cfg( test) ]
@@ -411,9 +414,7 @@ mod tests {
411
414
use std:: vec;
412
415
413
416
use iceberg:: io:: FileIOBuilder ;
414
- use iceberg:: spec:: {
415
- NestedField , NullOrder , PartitionSpec , PrimitiveType , Schema , SortOrder , Type ,
416
- } ;
417
+ use iceberg:: spec:: { NestedField , PartitionSpec , PrimitiveType , Schema , SortOrder , Type } ;
417
418
use iceberg:: transaction:: Transaction ;
418
419
use regex:: Regex ;
419
420
use tempfile:: TempDir ;
0 commit comments