Skip to content

Commit d631b3c

Browse files
committed
Add logic to commit metadata changes (no retry(yet))
1 parent 8ed0ddc commit d631b3c

File tree

6 files changed

+137
-50
lines changed

6 files changed

+137
-50
lines changed

crates/catalog/memory/src/catalog.rs

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use iceberg::{
2929
TableIdent, TableRequirement, TableUpdate,
3030
};
3131
use itertools::Itertools;
32+
use regex::Regex;
3233
use uuid::Uuid;
3334

3435
use crate::namespace_state::NamespaceState;
@@ -278,32 +279,74 @@ impl Catalog for MemoryCatalog {
278279

279280
/// Update a table to the catalog.
280281
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
281-
// TODO persist the updated metadata
282-
// let mut root_namespace_state = self.root_namespace_state.lock().await;
283-
// let current_table = self.load_table(commit.identifier()).await?;
284-
// let updated_staged_table = update_and_stage_table(Some(&current_table), &commit)?;
285-
//
286-
// if current_table.metadata() == updated_staged_table.metadata() {
287-
// // no changes
288-
// return Ok(current_table);
289-
// }
290-
//
291-
// // write metadata
292-
// self.file_io
293-
// .new_output(&updated_staged_table.metadata_location())?
294-
// .write(serde_json::to_vec(updated_staged_table.metadata())?.into())
295-
// .await?;
296-
//
297-
// root_namespace_state.update_existing_table_location(
298-
// commit.identifier(),
299-
// updated_staged_table.metadata_location(),
300-
// )?;
301-
// Ok(updated_staged_table)
302282
Err(Error::new(
303283
ErrorKind::FeatureUnsupported,
304284
"MemoryCatalog does not currently support updating tables.",
305285
))
306286
}
287+
288+
async fn commit_table(&self, base: &Table, current: Table) -> Result<Table> {
289+
if base.metadata() == current.metadata() {
290+
// no change
291+
return Ok(current);
292+
}
293+
294+
let mut root_namespace_state = self.root_namespace_state.lock().await;
295+
// TODO: caller needs to retry on the error below
296+
let _ = root_namespace_state
297+
.check_metadata_location(base.identifier(), base.metadata_location())?;
298+
299+
let next_metadata_version = if let Some(base_metadata_location) = base.metadata_location() {
300+
self.parse_metadata_version(base_metadata_location) + 1
301+
} else {
302+
0
303+
};
304+
305+
// write metadata
306+
let metadata_location = format!(
307+
"{}/metadata/{}-{}.metadata.json",
308+
current.metadata().location(),
309+
next_metadata_version,
310+
Uuid::new_v4()
311+
);
312+
313+
// TODO instead of using current.metadata(), build a new metadata with some properties like last_updated_ms updated
314+
self.file_io
315+
.new_output(&metadata_location)?
316+
.write(serde_json::to_vec(current.metadata())?.into())
317+
.await?;
318+
319+
root_namespace_state
320+
.update_existing_table_location(current.identifier(), current.metadata_location())?;
321+
322+
// TODO same here, need to update the metadata location
323+
Ok(current)
324+
}
325+
}
326+
327+
// todo move this to metadata?
328+
fn parse_metadata_version(metadata_location: &str) -> Result<i32> {
329+
let pattern = r"(\d+)-([\w-]{36})(?:\.\w+)?\.metadata\.json"; // todo make this constant
330+
331+
if let Some(metadata_file_name) = metadata_location.split('/').last() {
332+
let re = Regex::new(pattern).expect("Failed to parse regex for metadata file!");
333+
if let Some(caps) = re.captures(metadata_file_name) {
334+
let metadata_version_str = &caps[1];
335+
let uuid_str = &caps[2];
336+
337+
let metadata_version = metadata_version_str
338+
.parse()
339+
.expect(format!("Invalid metadata version: {metadata_version_str}").as_str());
340+
let uuid = Uuid::parse_str(uuid_str)?;
341+
342+
return Ok(metadata_version);
343+
}
344+
}
345+
346+
Err(Error::new(
347+
ErrorKind::Unexpected,
348+
format!("Unrecognizable metadata location: {metadata_location}"),
349+
))
307350
}
308351

309352
#[cfg(test)]

crates/catalog/memory/src/namespace_state.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,28 @@ impl NamespaceState {
262262
}
263263
}
264264

265-
/// TODO fix this
265+
pub(crate) fn check_metadata_location(
266+
&self,
267+
table_ident: &TableIdent,
268+
metadata_location: Option<&str>,
269+
) -> Result<()> {
270+
let namespace = self.get_namespace(table_ident.namespace())?;
271+
272+
if namespace
273+
.table_metadata_locations
274+
.get(table_ident.name())
275+
.map(|s| s.as_str())
276+
!= metadata_location
277+
{
278+
return Err(Error::new(
279+
ErrorKind::DataInvalid,
280+
format!("Metadata location does not match for table: {table_ident}!"),
281+
));
282+
}
283+
284+
Ok(())
285+
}
286+
266287
pub(crate) fn update_existing_table_location(
267288
&mut self,
268289
table_ident: &TableIdent,

crates/iceberg/src/catalog/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ pub trait Catalog: Debug + Sync + Send {
9494

9595
/// Update a table to the catalog.
9696
async fn update_table(&self, commit: TableCommit) -> Result<Table>;
97+
98+
async fn commit_table(&self, base: &Table, current: Table) -> Result<Table>;
9799
}
98100

99101
/// NamespaceIdent represents the identifier of a namespace in the catalog.

crates/iceberg/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub enum ErrorKind {
3939
/// Iceberg data is invalid.
4040
///
4141
/// This error is returned when we try to read a table from iceberg but
42-
/// failed to parse it's metadata or data file correctly.
42+
/// failed to parse its metadata or data file correctly.
4343
///
4444
/// The table could be invalid or corrupted.
4545
DataInvalid,

crates/iceberg/src/transaction/action/mod.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub(crate) trait TransactionAction<'a>: Sync {
2424
/// Apply the pending changes and return the uncommitted changes
2525
/// TODO is this even needed?
2626
fn apply(&mut self) -> Result<Option<TableUpdate>>;
27-
27+
2828
/// Commit the changes and apply the changes to the associated transaction
2929
fn commit(self) -> Result<Transaction<'a>>;
3030
}
@@ -36,12 +36,9 @@ pub struct SetLocation<'a> {
3636

3737
impl<'a> SetLocation<'a> {
3838
pub fn new(tx: Transaction<'a>) -> Self {
39-
SetLocation {
40-
tx,
41-
location: None
42-
}
39+
SetLocation { tx, location: None }
4340
}
44-
41+
4542
pub fn set_location(mut self, location: String) -> Self {
4643
self.location = Some(location);
4744
self
@@ -51,17 +48,19 @@ impl<'a> SetLocation<'a> {
5148
impl<'a> TransactionAction<'a> for SetLocation<'a> {
5249
fn apply(&mut self) -> Result<Option<TableUpdate>> {
5350
if self.location.is_none() {
54-
return Ok(None)
51+
return Ok(None);
5552
}
56-
Ok(Some(TableUpdate::SetLocation { location: self.location.clone().unwrap() }))
53+
Ok(Some(TableUpdate::SetLocation {
54+
location: self.location.clone().unwrap(),
55+
}))
5756
}
58-
57+
5958
fn commit(mut self) -> Result<Transaction<'a>> {
6059
let location = &mut self.apply()?;
6160
if location.is_none() {
62-
return Ok(self.tx)
61+
return Ok(self.tx);
6362
}
64-
63+
6564
self.tx.apply(vec![location.clone().unwrap()], vec![])?;
6665
Ok(self.tx)
6766
// self.tx.actions.push(Box::new(self));

crates/iceberg/src/transaction/mod.rs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ impl<'a> Transaction<'a> {
7171
Ok(())
7272
}
7373

74-
// TODO deprecate this and move the logic to TransactionAction
7574
fn apply(
7675
&mut self,
7776
updates: Vec<TableUpdate>,
@@ -193,29 +192,52 @@ impl<'a> Transaction<'a> {
193192
Ok(SetLocation::new(self))
194193
}
195194

195+
fn refresh(&mut self, refreshed: Table) {
196+
self.base_table = &refreshed;
197+
self.current_table = refreshed.clone();
198+
}
199+
196200
/// Commit transaction.
197-
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
198-
let table_commit = TableCommit::builder()
199-
.ident(self.base_table.identifier().clone())
200-
.updates(self.updates)
201-
.requirements(self.requirements)
202-
.build();
201+
pub async fn commit(mut self, catalog: &dyn Catalog) -> Result<Table> {
202+
// let table_commit = TableCommit::builder()
203+
// .ident(self.base_table.identifier().clone())
204+
// .updates(self.updates)
205+
// .requirements(self.requirements)
206+
// .build();
203207
if self.base_table.metadata() == self.current_table.metadata() {
204208
return Ok(self.current_table);
205209
}
206210

207211
// TODO add refresh() in catalog?
208-
let refreshed_table = catalog
209-
.load_table(table_commit.identifier())
212+
let refreshed = catalog
213+
.load_table(self.base_table.identifier())
210214
.await
211-
.expect(format!("Failed to refresh table {}", table_commit.identifier()).as_str());
215+
.expect(format!("Failed to refresh table {}", self.base_table.identifier()).as_str());
212216

213-
if self.base_table.metadata() != refreshed_table.metadata() {
214-
// TODO raise a real error and retry
215-
panic!("Stale base table!")
217+
if self.base_table.metadata() != refreshed.metadata()
218+
|| self.base_table.metadata_location() != refreshed.metadata_location()
219+
{
220+
self.refresh(refreshed);
221+
self.apply(self.updates, self.requirements) // TODO need create new requirements based on the refreshed table
222+
.expect("Failed to re-apply updates"); // re-apply updates
223+
// TODO retry on this error
224+
return Err(Error::new(
225+
ErrorKind::DataInvalid,
226+
"Cannot commit: stale base table metadata".to_string(),
227+
));
216228
}
217229

218-
catalog.update_table(table_commit).await
230+
if self.base_table.metadata() == self.current_table.metadata()
231+
&& self.base_table.metadata_location() == self.current_table.metadata_location()
232+
{
233+
// nothing to commit, return current table
234+
return Ok(self.current_table);
235+
}
236+
237+
catalog
238+
.commit_table(self.base_table, self.current_table)
239+
.await
240+
// catalog.update_table(table_commit).await
219241
}
220242
}
221243

@@ -229,8 +251,8 @@ mod tests {
229251
use crate::spec::{FormatVersion, TableMetadata};
230252
use crate::table::Table;
231253
use crate::transaction::Transaction;
232-
use crate::{TableIdent, TableUpdate};
233254
use crate::transaction::action::TransactionAction;
255+
use crate::{TableIdent, TableUpdate};
234256

235257
fn make_v1_table() -> Table {
236258
let file = File::open(format!(
@@ -368,7 +390,7 @@ mod tests {
368390
.set_location()
369391
.unwrap()
370392
.set_location(String::from("s3://bucket/prefix/new_table"));
371-
393+
372394
let tx = set_location.commit().unwrap();
373395

374396
assert_eq!(

0 commit comments

Comments
 (0)