18
18
//! This module contains memory catalog implementation.
19
19
20
20
use std:: collections:: HashMap ;
21
+ use std:: str:: FromStr ;
21
22
22
23
use async_trait:: async_trait;
23
24
use futures:: lock:: { Mutex , MutexGuard } ;
@@ -29,9 +30,8 @@ use iceberg::{
29
30
TableIdent , TableUpdate ,
30
31
} ;
31
32
use itertools:: Itertools ;
32
- use uuid:: Uuid ;
33
33
34
- use crate :: namespace_state:: NamespaceState ;
34
+ use crate :: namespace_state:: { MetadataLocation , NamespaceState } ;
35
35
36
36
/// namespace `location` property
37
37
const LOCATION : & str = "location" ;
@@ -71,7 +71,11 @@ impl MemoryCatalog {
71
71
. build ( )
72
72
}
73
73
74
- async fn update_table ( & self , table : & Table , updates : Vec < TableUpdate > ) -> Result < Table > {
74
+ async fn update_table (
75
+ & self ,
76
+ table : & Table ,
77
+ updates : Vec < TableUpdate > ,
78
+ ) -> Result < ( Table , MetadataLocation ) > {
75
79
let ( new_metadata, new_metadata_location) = apply_table_updates ( table, updates) ?;
76
80
77
81
self . write_metadata ( & new_metadata, & new_metadata_location)
@@ -84,11 +88,11 @@ impl MemoryCatalog {
84
88
. file_io ( self . file_io . clone ( ) )
85
89
. build ( ) ?;
86
90
87
- Ok ( new_table)
91
+ Ok ( ( new_table, new_metadata_location ) )
88
92
}
89
93
90
- async fn read_metadata ( & self , location : & str ) -> Result < TableMetadata > {
91
- let input_file = self . file_io . new_input ( location) ?;
94
+ async fn read_metadata ( & self , location : & MetadataLocation ) -> Result < TableMetadata > {
95
+ let input_file = self . file_io . new_input ( location. to_string ( ) ) ?;
92
96
let metadata_content = input_file. read ( ) . await ?;
93
97
let metadata = serde_json:: from_slice :: < TableMetadata > ( & metadata_content) ?;
94
98
@@ -98,10 +102,10 @@ impl MemoryCatalog {
98
102
async fn write_metadata (
99
103
& self ,
100
104
metadata : & TableMetadata ,
101
- metadata_location : & str ,
105
+ metadata_location : & MetadataLocation ,
102
106
) -> Result < ( ) > {
103
107
self . file_io
104
- . new_output ( metadata_location) ?
108
+ . new_output ( metadata_location. to_string ( ) ) ?
105
109
. write ( serde_json:: to_vec ( metadata) ?. into ( ) )
106
110
. await
107
111
}
@@ -249,20 +253,15 @@ impl Catalog for MemoryCatalog {
249
253
let metadata = TableMetadataBuilder :: from_table_creation ( table_creation) ?
250
254
. build ( ) ?
251
255
. metadata ;
252
- let metadata_location = format ! (
253
- "{}/metadata/{}-{}.metadata.json" ,
254
- & location,
255
- 0 ,
256
- Uuid :: new_v4( )
257
- ) ;
256
+ let metadata_location = MetadataLocation :: new ( & location) ;
258
257
259
258
self . write_metadata ( & metadata, & metadata_location) . await ?;
260
259
261
260
root_namespace_state. insert_new_table ( & table_ident, metadata_location. clone ( ) ) ?;
262
261
263
262
Table :: builder ( )
264
263
. file_io ( self . file_io . clone ( ) )
265
- . metadata_location ( metadata_location)
264
+ . metadata_location ( metadata_location. to_string ( ) )
266
265
. metadata ( metadata)
267
266
. identifier ( table_ident)
268
267
. build ( )
@@ -281,7 +280,7 @@ impl Catalog for MemoryCatalog {
281
280
let mut root_namespace_state = self . root_namespace_state . lock ( ) . await ;
282
281
283
282
let metadata_location = root_namespace_state. remove_existing_table ( table_ident) ?;
284
- self . file_io . delete ( & metadata_location) . await
283
+ self . file_io . delete ( metadata_location. to_string ( ) ) . await
285
284
}
286
285
287
286
/// Check if a table exists in the catalog.
@@ -322,17 +321,11 @@ impl Catalog for MemoryCatalog {
322
321
requirement. check ( Some ( current_table. metadata ( ) ) ) ?;
323
322
}
324
323
325
- let updated_table = self
324
+ let ( updated_table, new_metadata_location ) = self
326
325
. update_table ( & current_table, commit. take_updates ( ) )
327
326
. await ?;
328
327
329
- root_namespace_state. update_table (
330
- updated_table. identifier ( ) ,
331
- updated_table
332
- . metadata_location ( )
333
- . ok_or ( empty_metadata_location_err ( & updated_table) ) ?
334
- . to_string ( ) ,
335
- ) ?;
328
+ root_namespace_state. update_table ( updated_table. identifier ( ) , new_metadata_location) ?;
336
329
337
330
Ok ( updated_table)
338
331
}
@@ -341,10 +334,11 @@ impl Catalog for MemoryCatalog {
341
334
fn apply_table_updates (
342
335
table : & Table ,
343
336
updates : Vec < TableUpdate > ,
344
- ) -> Result < ( TableMetadata , String ) > {
345
- let metadata_location = table
346
- . metadata_location ( )
347
- . ok_or ( empty_metadata_location_err ( table) ) ?;
337
+ ) -> Result < ( TableMetadata , MetadataLocation ) > {
338
+ let metadata_location = table. metadata_location ( ) . ok_or ( Error :: new (
339
+ ErrorKind :: DataInvalid ,
340
+ format ! ( "Table metadata location is not set: {}" , table. identifier( ) ) ,
341
+ ) ) ?;
348
342
349
343
let mut builder = TableMetadataBuilder :: new_from_metadata (
350
344
table. metadata ( ) . clone ( ) ,
@@ -355,55 +349,11 @@ fn apply_table_updates(
355
349
builder = update. apply ( builder) ?;
356
350
}
357
351
358
- let new_metadata_location = bump_metadata_version ( metadata_location) ?;
352
+ let new_metadata_location = MetadataLocation :: from_str ( metadata_location) ?. with_next_version ( ) ;
359
353
360
354
Ok ( ( builder. build ( ) ?. metadata , new_metadata_location) )
361
355
}
362
356
363
- fn empty_metadata_location_err ( table : & Table ) -> Error {
364
- Error :: new (
365
- ErrorKind :: DataInvalid ,
366
- format ! ( "Table metadata location is not set: {}" , table. identifier( ) ) ,
367
- )
368
- }
369
- /// Parses a metadata location of format `<prefix>/metadata/<version>-<uuid>.metadata.json`,
370
- /// increments the version and generates a new UUID.
371
- /// It returns an error if the format is invalid.
372
- fn bump_metadata_version ( metadata_location : & str ) -> Result < String > {
373
- let ( path, file_name) = metadata_location. rsplit_once ( '/' ) . ok_or ( Error :: new (
374
- ErrorKind :: Unexpected ,
375
- format ! ( "Invalid metadata location: {}" , metadata_location) ,
376
- ) ) ?;
377
-
378
- let prefix = path. strip_suffix ( "/metadata" ) . ok_or ( Error :: new (
379
- ErrorKind :: Unexpected ,
380
- format ! (
381
- "Metadata location not under /metadata/ subdirectory: {}" ,
382
- metadata_location
383
- ) ,
384
- ) ) ?;
385
-
386
- let ( version, _id) = file_name
387
- . strip_suffix ( ".metadata.json" )
388
- . ok_or ( Error :: new (
389
- ErrorKind :: Unexpected ,
390
- format ! ( "Invalid metadata file ending: {}" , file_name) ,
391
- ) ) ?
392
- . split_once ( '-' )
393
- . ok_or ( Error :: new (
394
- ErrorKind :: Unexpected ,
395
- format ! ( "Invalid metadata file name: {}" , file_name) ,
396
- ) ) ?;
397
-
398
- let new_version = version. parse :: < i32 > ( ) ? + 1 ;
399
- let new_id = Uuid :: new_v4 ( ) ;
400
-
401
- Ok ( format ! (
402
- "{}/metadata/{}-{}.metadata.json" ,
403
- prefix, new_version, new_id
404
- ) )
405
- }
406
-
407
357
#[ cfg( test) ]
408
358
mod tests {
409
359
use std:: collections:: HashSet ;
0 commit comments