@@ -26,7 +26,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
26
26
use iceberg:: table:: Table ;
27
27
use iceberg:: {
28
28
Catalog , Error , ErrorKind , Namespace , NamespaceIdent , Result , TableCommit , TableCreation ,
29
- TableIdent ,
29
+ TableIdent , TableUpdate ,
30
30
} ;
31
31
use itertools:: Itertools ;
32
32
use uuid:: Uuid ;
@@ -61,7 +61,7 @@ impl MemoryCatalog {
61
61
root_namespace_state : & MutexGuard < ' _ , NamespaceState > ,
62
62
) -> Result < Table > {
63
63
let metadata_location = root_namespace_state. get_existing_table_location ( table_ident) ?;
64
- let metadata = self . read_metadata ( & metadata_location) . await ?;
64
+ let metadata = self . read_metadata ( metadata_location) . await ?;
65
65
66
66
Table :: builder ( )
67
67
. identifier ( table_ident. clone ( ) )
@@ -71,6 +71,30 @@ impl MemoryCatalog {
71
71
. build ( )
72
72
}
73
73
74
+ async fn update_table ( & self , table : & Table , updates : Vec < TableUpdate > ) -> Result < Table > {
75
+ let ( new_metadata, new_metadata_location) = apply_table_updates ( table, updates) ?;
76
+
77
+ self . write_metadata ( & new_metadata, & new_metadata_location)
78
+ . await ?;
79
+
80
+ let new_table = Table :: builder ( )
81
+ . identifier ( table. identifier ( ) . clone ( ) )
82
+ . metadata ( new_metadata)
83
+ . metadata_location ( new_metadata_location. to_string ( ) )
84
+ . file_io ( self . file_io . clone ( ) )
85
+ . build ( ) ?;
86
+
87
+ Ok ( new_table)
88
+ }
89
+
90
+ async fn read_metadata ( & self , location : & str ) -> Result < TableMetadata > {
91
+ let input_file = self . file_io . new_input ( location) ?;
92
+ let metadata_content = input_file. read ( ) . await ?;
93
+ let metadata = serde_json:: from_slice :: < TableMetadata > ( & metadata_content) ?;
94
+
95
+ Ok ( metadata)
96
+ }
97
+
74
98
async fn write_metadata (
75
99
& self ,
76
100
metadata : & TableMetadata ,
@@ -286,23 +310,110 @@ impl Catalog for MemoryCatalog {
286
310
Ok ( ( ) )
287
311
}
288
312
289
- /// Update a table to the catalog.
290
- async fn update_table ( & self , _commit : TableCommit ) -> Result < Table > {
291
- Err ( Error :: new (
292
- ErrorKind :: FeatureUnsupported ,
293
- "MemoryCatalog does not currently support updating tables." ,
294
- ) )
313
+ /// Update a table in the catalog.
314
+ async fn update_table ( & self , mut commit : TableCommit ) -> Result < Table > {
315
+ let mut root_namespace_state = self . root_namespace_state . lock ( ) . await ;
316
+
317
+ let current_table = self
318
+ . load_table_from_locked_namespace_state ( commit. identifier ( ) , & root_namespace_state)
319
+ . await ?;
320
+
321
+ for requirement in commit. take_requirements ( ) {
322
+ requirement. check ( Some ( current_table. metadata ( ) ) ) ?;
323
+ }
324
+
325
+ let updated_table = self
326
+ . update_table ( & current_table, commit. take_updates ( ) )
327
+ . await ?;
328
+
329
+ root_namespace_state. update_table (
330
+ updated_table. identifier ( ) ,
331
+ updated_table
332
+ . metadata_location ( )
333
+ . ok_or ( empty_metadata_location_err ( & updated_table) ) ?
334
+ . to_string ( ) ,
335
+ ) ?;
336
+
337
+ Ok ( updated_table)
295
338
}
296
339
}
297
340
341
+ fn apply_table_updates (
342
+ table : & Table ,
343
+ updates : Vec < TableUpdate > ,
344
+ ) -> Result < ( TableMetadata , String ) > {
345
+ let metadata_location = table
346
+ . metadata_location ( )
347
+ . ok_or ( empty_metadata_location_err ( table) ) ?;
348
+
349
+ let mut builder = TableMetadataBuilder :: new_from_metadata (
350
+ table. metadata ( ) . clone ( ) ,
351
+ Some ( metadata_location. to_string ( ) ) ,
352
+ ) ;
353
+
354
+ for update in updates {
355
+ builder = update. apply ( builder) ?;
356
+ }
357
+
358
+ let new_metadata_location = bump_metadata_version ( metadata_location) ?;
359
+
360
+ Ok ( ( builder. build ( ) ?. metadata , new_metadata_location) )
361
+ }
362
+
363
+ fn empty_metadata_location_err ( table : & Table ) -> Error {
364
+ Error :: new (
365
+ ErrorKind :: DataInvalid ,
366
+ format ! ( "Table metadata location is not set: {}" , table. identifier( ) ) ,
367
+ )
368
+ }
369
+ /// Parses a metadata location of format `<prefix>/metadata/<version>-<uuid>.metadata.json`,
370
+ /// increments the version and generates a new UUID.
371
+ /// It returns an error if the format is invalid.
372
+ fn bump_metadata_version ( metadata_location : & str ) -> Result < String > {
373
+ let ( path, file_name) = metadata_location. rsplit_once ( '/' ) . ok_or ( Error :: new (
374
+ ErrorKind :: Unexpected ,
375
+ format ! ( "Invalid metadata location: {}" , metadata_location) ,
376
+ ) ) ?;
377
+
378
+ let prefix = path. strip_suffix ( "/metadata" ) . ok_or ( Error :: new (
379
+ ErrorKind :: Unexpected ,
380
+ format ! (
381
+ "Metadata location not under /metadata/ subdirectory: {}" ,
382
+ metadata_location
383
+ ) ,
384
+ ) ) ?;
385
+
386
+ let ( version, _id) = file_name
387
+ . strip_suffix ( ".metadata.json" )
388
+ . ok_or ( Error :: new (
389
+ ErrorKind :: Unexpected ,
390
+ format ! ( "Invalid metadata file ending: {}" , file_name) ,
391
+ ) ) ?
392
+ . split_once ( '-' )
393
+ . ok_or ( Error :: new (
394
+ ErrorKind :: Unexpected ,
395
+ format ! ( "Invalid metadata file name: {}" , file_name) ,
396
+ ) ) ?;
397
+
398
+ let new_version = version. parse :: < i32 > ( ) ? + 1 ;
399
+ let new_id = Uuid :: new_v4 ( ) ;
400
+
401
+ Ok ( format ! (
402
+ "{}/metadata/{}-{}.metadata.json" ,
403
+ prefix, new_version, new_id
404
+ ) )
405
+ }
406
+
298
407
#[ cfg( test) ]
299
408
mod tests {
300
409
use std:: collections:: HashSet ;
301
410
use std:: hash:: Hash ;
302
411
use std:: iter:: FromIterator ;
412
+ use std:: vec;
303
413
304
414
use iceberg:: io:: FileIOBuilder ;
305
415
use iceberg:: spec:: { NestedField , PartitionSpec , PrimitiveType , Schema , SortOrder , Type } ;
416
+ use iceberg:: transaction:: Transaction ;
306
417
use regex:: Regex ;
307
418
use tempfile:: TempDir ;
308
419
@@ -348,8 +459,8 @@ mod tests {
348
459
. unwrap ( )
349
460
}
350
461
351
- async fn create_table < C : Catalog > ( catalog : & C , table_ident : & TableIdent ) {
352
- let _ = catalog
462
+ async fn create_table < C : Catalog > ( catalog : & C , table_ident : & TableIdent ) -> Table {
463
+ catalog
353
464
. create_table (
354
465
& table_ident. namespace ,
355
466
TableCreation :: builder ( )
@@ -358,7 +469,7 @@ mod tests {
358
469
. build ( ) ,
359
470
)
360
471
. await
361
- . unwrap ( ) ;
472
+ . unwrap ( )
362
473
}
363
474
364
475
async fn create_tables < C : Catalog > ( catalog : & C , table_idents : Vec < & TableIdent > ) {
@@ -1694,4 +1805,30 @@ mod tests {
1694
1805
) ,
1695
1806
) ;
1696
1807
}
1808
+
1809
+ #[ tokio:: test]
1810
+ async fn test_update_table ( ) {
1811
+ let catalog = new_memory_catalog ( ) ;
1812
+
1813
+ let namespace_ident = NamespaceIdent :: new ( "a" . into ( ) ) ;
1814
+ create_namespace ( & catalog, & namespace_ident) . await ;
1815
+ let table_ident = TableIdent :: new ( namespace_ident, "test" . to_string ( ) ) ;
1816
+ let table = create_table ( & catalog, & table_ident) . await ;
1817
+
1818
+ // Assert the table doesn't contain the update yet
1819
+ assert ! ( !table. metadata( ) . properties( ) . contains_key( "key" ) ) ;
1820
+
1821
+ // Update table metadata
1822
+ let updated_table = Transaction :: new ( & table)
1823
+ . set_properties ( HashMap :: from ( [ ( "key" . to_string ( ) , "value" . to_string ( ) ) ] ) )
1824
+ . unwrap ( )
1825
+ . commit ( & catalog)
1826
+ . await
1827
+ . unwrap ( ) ;
1828
+
1829
+ assert_eq ! (
1830
+ updated_table. metadata( ) . properties( ) . get( "key" ) . unwrap( ) ,
1831
+ "value"
1832
+ ) ;
1833
+ }
1697
1834
}
0 commit comments