Skip to content

Commit 8ed0ddc

Browse files
committed
add tx action-ish for set_location (does not work well :(
1 parent 40b055a commit 8ed0ddc

File tree

4 files changed

+138
-8
lines changed

4 files changed

+138
-8
lines changed

crates/catalog/memory/src/catalog.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder};
2626
use iceberg::table::Table;
2727
use iceberg::{
2828
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
29-
TableIdent,
29+
TableIdent, TableRequirement, TableUpdate,
3030
};
3131
use itertools::Itertools;
3232
use uuid::Uuid;
@@ -277,7 +277,28 @@ impl Catalog for MemoryCatalog {
277277
}
278278

279279
/// Update a table to the catalog.
280-
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
280+
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
281+
// TODO persist the updated metadata
282+
// let mut root_namespace_state = self.root_namespace_state.lock().await;
283+
// let current_table = self.load_table(commit.identifier()).await?;
284+
// let updated_staged_table = update_and_stage_table(Some(&current_table), &commit)?;
285+
//
286+
// if current_table.metadata() == updated_staged_table.metadata() {
287+
// // no changes
288+
// return Ok(current_table);
289+
// }
290+
//
291+
// // write metadata
292+
// self.file_io
293+
// .new_output(&updated_staged_table.metadata_location())?
294+
// .write(serde_json::to_vec(updated_staged_table.metadata())?.into())
295+
// .await?;
296+
//
297+
// root_namespace_state.update_existing_table_location(
298+
// commit.identifier(),
299+
// updated_staged_table.metadata_location(),
300+
// )?;
301+
// Ok(updated_staged_table)
281302
Err(Error::new(
282303
ErrorKind::FeatureUnsupported,
283304
"MemoryCatalog does not currently support updating tables.",

crates/catalog/memory/src/namespace_state.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,24 @@ impl NamespaceState {
262262
}
263263
}
264264

265+
/// TODO fix this
266+
pub(crate) fn update_existing_table_location(
267+
&mut self,
268+
table_ident: &TableIdent,
269+
new_metadata_location: Option<&str>,
270+
) -> Result<()> {
271+
if new_metadata_location.is_none() {
272+
return Ok(());
273+
}
274+
275+
let mut namespace = self.get_mut_namespace(table_ident.namespace())?;
276+
namespace
277+
.table_metadata_locations
278+
.entry(table_ident.name().to_string())
279+
.insert_entry(new_metadata_location.unwrap().into_string());
280+
Ok(())
281+
}
282+
265283
// Inserts the given table or returns an error if it already exists
266284
pub(crate) fn insert_new_table(
267285
&mut self,
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::transaction::Transaction;
19+
use crate::{Result, TableUpdate};
20+
21+
pub type ActionElement<'a> = Box<dyn TransactionAction<'a>>;
22+
23+
pub(crate) trait TransactionAction<'a>: Sync {
24+
/// Apply the pending changes and return the uncommitted changes
25+
/// TODO is this even needed?
26+
fn apply(&mut self) -> Result<Option<TableUpdate>>;
27+
28+
/// Commit the changes and apply the changes to the associated transaction
29+
fn commit(self) -> Result<Transaction<'a>>;
30+
}
31+
32+
pub struct SetLocation<'a> {
33+
pub tx: Transaction<'a>,
34+
location: Option<String>,
35+
}
36+
37+
impl<'a> SetLocation<'a> {
38+
pub fn new(tx: Transaction<'a>) -> Self {
39+
SetLocation {
40+
tx,
41+
location: None
42+
}
43+
}
44+
45+
pub fn set_location(mut self, location: String) -> Self {
46+
self.location = Some(location);
47+
self
48+
}
49+
}
50+
51+
impl<'a> TransactionAction<'a> for SetLocation<'a> {
52+
fn apply(&mut self) -> Result<Option<TableUpdate>> {
53+
if self.location.is_none() {
54+
return Ok(None)
55+
}
56+
Ok(Some(TableUpdate::SetLocation { location: self.location.clone().unwrap() }))
57+
}
58+
59+
fn commit(mut self) -> Result<Transaction<'a>> {
60+
let location = &mut self.apply()?;
61+
if location.is_none() {
62+
return Ok(self.tx)
63+
}
64+
65+
self.tx.apply(vec![location.clone().unwrap()], vec![])?;
66+
Ok(self.tx)
67+
// self.tx.actions.push(Box::new(self));
68+
}
69+
}

crates/iceberg/src/transaction/mod.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! This module contains transaction api.
1919
20+
mod action;
2021
mod append;
2122
mod snapshot;
2223
mod sort_order;
@@ -32,6 +33,7 @@ use crate::TableUpdate::UpgradeFormatVersion;
3233
use crate::error::Result;
3334
use crate::spec::FormatVersion;
3435
use crate::table::Table;
36+
use crate::transaction::action::{ActionElement, SetLocation};
3537
use crate::transaction::append::FastAppendAction;
3638
use crate::transaction::sort_order::ReplaceSortOrderAction;
3739
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
@@ -40,6 +42,7 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat
4042
pub struct Transaction<'a> {
4143
base_table: &'a Table,
4244
current_table: Table,
45+
_actions: Vec<ActionElement<'a>>, // TODO unused for now, should we use this to reapply actions?
4346
updates: Vec<TableUpdate>,
4447
requirements: Vec<TableRequirement>,
4548
}
@@ -50,6 +53,7 @@ impl<'a> Transaction<'a> {
5053
Self {
5154
base_table: table,
5255
current_table: table.clone(),
56+
_actions: vec![],
5357
updates: vec![],
5458
requirements: vec![],
5559
}
@@ -67,6 +71,7 @@ impl<'a> Transaction<'a> {
6771
Ok(())
6872
}
6973

74+
// TODO deprecate this and move the logic to TransactionAction
7075
fn apply(
7176
&mut self,
7277
updates: Vec<TableUpdate>,
@@ -184,9 +189,8 @@ impl<'a> Transaction<'a> {
184189
}
185190

186191
/// Set the location of table
187-
pub fn set_location(mut self, location: String) -> Result<Self> {
188-
self.apply(vec![TableUpdate::SetLocation { location }], vec![])?;
189-
Ok(self)
192+
pub fn set_location(self) -> Result<SetLocation<'a>> {
193+
Ok(SetLocation::new(self))
190194
}
191195

192196
/// Commit transaction.
@@ -196,6 +200,20 @@ impl<'a> Transaction<'a> {
196200
.updates(self.updates)
197201
.requirements(self.requirements)
198202
.build();
203+
if self.base_table.metadata() == self.current_table.metadata() {
204+
return Ok(self.current_table);
205+
}
206+
207+
// TODO add refresh() in catalog?
208+
let refreshed_table = catalog
209+
.load_table(table_commit.identifier())
210+
.await
211+
.expect(format!("Failed to refresh table {}", table_commit.identifier()).as_str());
212+
213+
if self.base_table.metadata() != refreshed_table.metadata() {
214+
// TODO raise a real error and retry
215+
panic!("Stale base table!")
216+
}
199217

200218
catalog.update_table(table_commit).await
201219
}
@@ -212,6 +230,7 @@ mod tests {
212230
use crate::table::Table;
213231
use crate::transaction::Transaction;
214232
use crate::{TableIdent, TableUpdate};
233+
use crate::transaction::action::TransactionAction;
215234

216235
fn make_v1_table() -> Table {
217236
let file = File::open(format!(
@@ -345,9 +364,12 @@ mod tests {
345364
fn test_set_location() {
346365
let table = make_v2_table();
347366
let tx = Transaction::new(&table);
348-
let tx = tx
349-
.set_location(String::from("s3://bucket/prefix/new_table"))
350-
.unwrap();
367+
let set_location = tx
368+
.set_location()
369+
.unwrap()
370+
.set_location(String::from("s3://bucket/prefix/new_table"));
371+
372+
let tx = set_location.commit().unwrap();
351373

352374
assert_eq!(
353375
vec![TableUpdate::SetLocation {

0 commit comments

Comments
 (0)