Skip to content

Commit e43954a

Browse files
committed
Refactor update functions
Signed-off-by: DerGut <jannik.steinmann@gmx.de>
1 parent 13b4aa4 commit e43954a

File tree

2 files changed

+99
-37
lines changed

2 files changed

+99
-37
lines changed

crates/catalog/memory/src/catalog.rs

Lines changed: 83 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
2727
use iceberg::table::Table;
2828
use iceberg::{
2929
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
30-
TableIdent, TableUpdate,
30+
TableIdent, TableRequirement, TableUpdate,
3131
};
3232
use itertools::Itertools;
3333

@@ -55,7 +55,7 @@ impl MemoryCatalog {
5555
}
5656

5757
/// Loads a table from the locked namespace state.
58-
async fn load_table_from_locked_namespace_state(
58+
async fn load_table_from_locked_state(
5959
&self,
6060
table_ident: &TableIdent,
6161
root_namespace_state: &MutexGuard<'_, NamespaceState>,
@@ -71,24 +71,51 @@ impl MemoryCatalog {
7171
.build()
7272
}
7373

74-
async fn update_table(
74+
async fn update_table_in_locked_state(
7575
&self,
76-
table: &Table,
76+
mut commit: TableCommit,
77+
locked_state: &MutexGuard<'_, NamespaceState>,
78+
) -> Result<(Table, MetadataLocation)> {
79+
let current_table = self
80+
.load_table_from_locked_state(commit.identifier(), locked_state)
81+
.await?;
82+
83+
// Checks whether the commit's expectations are met by the current table state.
84+
let location =
85+
check_current_table_state(&current_table, commit.take_requirements()).await?;
86+
87+
self.apply_table_updates_and_write_metadata(
88+
&current_table,
89+
&location,
90+
commit.take_updates(),
91+
)
92+
.await
93+
}
94+
95+
async fn apply_table_updates_and_write_metadata(
96+
&self,
97+
current_table: &Table,
98+
current_location: &MetadataLocation,
7799
updates: Vec<TableUpdate>,
78100
) -> Result<(Table, MetadataLocation)> {
79-
let (new_metadata, new_metadata_location) = apply_table_updates(table, updates)?;
101+
let new_location = current_location.with_next_version();
80102

81-
self.write_metadata(&new_metadata, &new_metadata_location)
82-
.await?;
103+
// Build the new table metadata.
104+
let new_metadata =
105+
apply_table_updates(current_table, current_location, &new_location, updates)?;
83106

107+
// Write the updated metadata to it's new location.
108+
self.write_metadata(&new_metadata, &new_location).await?;
109+
110+
// Return a table representing the updated version.
84111
let new_table = Table::builder()
85-
.identifier(table.identifier().clone())
112+
.identifier(current_table.identifier().clone())
86113
.metadata(new_metadata)
87-
.metadata_location(new_metadata_location.to_string())
114+
.metadata_location(new_location.to_string())
88115
.file_io(self.file_io.clone())
89116
.build()?;
90117

91-
Ok((new_table, new_metadata_location))
118+
Ok((new_table, new_location))
92119
}
93120

94121
async fn read_metadata(&self, location: &MetadataLocation) -> Result<TableMetadata> {
@@ -271,7 +298,7 @@ impl Catalog for MemoryCatalog {
271298
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
272299
let root_namespace_state = self.root_namespace_state.lock().await;
273300

274-
self.load_table_from_locked_namespace_state(table_ident, &root_namespace_state)
301+
self.load_table_from_locked_state(table_ident, &root_namespace_state)
275302
.await
276303
}
277304

@@ -310,48 +337,70 @@ impl Catalog for MemoryCatalog {
310337
}
311338

312339
/// Update a table in the catalog.
313-
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
314-
let mut root_namespace_state = self.root_namespace_state.lock().await;
315-
316-
let current_table = self
317-
.load_table_from_locked_namespace_state(commit.identifier(), &root_namespace_state)
318-
.await?;
319-
320-
for requirement in commit.take_requirements() {
321-
requirement.check(Some(current_table.metadata()))?;
322-
}
340+
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
341+
let mut locked_namespace_state = self.root_namespace_state.lock().await;
323342

343+
// Updates the current table version and writes a new metadata file.
324344
let (updated_table, new_metadata_location) = self
325-
.update_table(&current_table, commit.take_updates())
345+
.update_table_in_locked_state(commit, &locked_namespace_state)
326346
.await?;
327347

328-
root_namespace_state.update_table(updated_table.identifier(), new_metadata_location)?;
348+
// Flip the pointer to reference the new metadata file.
349+
locked_namespace_state
350+
.commit_table_update(updated_table.identifier(), new_metadata_location)?;
329351

330352
Ok(updated_table)
331353
}
332354
}
333355

356+
/// Verifies that the a TableCommit's requirements are met by the current table state.
357+
/// If not, there's a conflict and the client should retry the commit.
358+
async fn check_current_table_state(
359+
current_table: &Table,
360+
requirements: Vec<TableRequirement>,
361+
) -> Result<MetadataLocation> {
362+
let location =
363+
MetadataLocation::from_str(current_table.metadata_location().ok_or(Error::new(
364+
ErrorKind::DataInvalid,
365+
format!(
366+
"Table metadata location is not set: {}",
367+
current_table.identifier()
368+
),
369+
))?)?;
370+
371+
// Check that the commit's point of view is still reflected by the current state of the table.
372+
for requirement in requirements {
373+
requirement
374+
.check(Some(current_table.metadata()))
375+
.map_err(|e| {
376+
Error::new(
377+
ErrorKind::Unexpected,
378+
"Conflict: One or more requirements failed, the client my retry",
379+
)
380+
.with_source(e)
381+
})?;
382+
}
383+
384+
Ok(location)
385+
}
386+
334387
fn apply_table_updates(
335388
table: &Table,
389+
current_location: &MetadataLocation,
390+
new_location: &MetadataLocation,
336391
updates: Vec<TableUpdate>,
337-
) -> Result<(TableMetadata, MetadataLocation)> {
338-
let metadata_location = table.metadata_location().ok_or(Error::new(
339-
ErrorKind::DataInvalid,
340-
format!("Table metadata location is not set: {}", table.identifier()),
341-
))?;
342-
392+
) -> Result<TableMetadata> {
343393
let mut builder = TableMetadataBuilder::new_from_metadata(
344394
table.metadata().clone(),
345-
Some(metadata_location.to_string()),
346-
);
395+
Some(current_location.to_string()),
396+
)
397+
.set_location(new_location.to_string());
347398

348399
for update in updates {
349400
builder = update.apply(builder)?;
350401
}
351402

352-
let new_metadata_location = MetadataLocation::from_str(metadata_location)?.with_next_version();
353-
354-
Ok((builder.build()?.metadata, new_metadata_location))
403+
Ok(builder.build()?.metadata)
355404
}
356405

357406
#[cfg(test)]

crates/catalog/memory/src/namespace_state.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ impl NamespaceState {
306306
}
307307

308308
/// Updates the metadata location of the given table or returns an error if doesn't exist
309-
pub(crate) fn update_table(
309+
pub(crate) fn commit_table_update(
310310
&mut self,
311311
table_ident: &TableIdent,
312312
new_location: MetadataLocation,
@@ -317,7 +317,7 @@ impl NamespaceState {
317317
.table_metadata_locations
318318
.insert(table_ident.name().to_string(), new_location)
319319
.ok_or(Error::new(
320-
ErrorKind::Unexpected,
320+
ErrorKind::TableNotFound,
321321
format!("No such table: {:?}", table_ident),
322322
))?;
323323

@@ -379,7 +379,20 @@ impl MetadataLocation {
379379
format!("Invalid metadata file name format: {}", file_name),
380380
))?;
381381

382-
Ok((version.parse::<i32>()?, Uuid::parse_str(id)?))
382+
let version = version.parse::<i32>().map_err(|_| {
383+
Error::new(
384+
ErrorKind::Unexpected,
385+
format!("Metadata version not a number: {}", version),
386+
)
387+
})?;
388+
if version < 0 {
389+
return Err(Error::new(
390+
ErrorKind::Unexpected,
391+
format!("Negative metadata version: {}", version),
392+
));
393+
}
394+
395+
Ok((version, Uuid::parse_str(id)?))
383396
}
384397
}
385398

0 commit comments

Comments
 (0)