diff --git a/Cargo.lock b/Cargo.lock
index 28dd83775..06f3782b2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1029,9 +1029,9 @@ dependencies = [
[[package]]
name = "backon"
-version = "1.3.0"
+version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7"
+checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7"
dependencies = [
"fastrand",
"gloo-timers",
@@ -3490,6 +3490,7 @@ dependencies = [
"arrow-string",
"async-std",
"async-trait",
+ "backon",
"base64 0.22.1",
"bimap",
"bytes",
@@ -3510,6 +3511,7 @@ dependencies = [
"parquet",
"pretty_assertions",
"rand 0.8.5",
+ "regex",
"reqwest",
"roaring",
"rust_decimal",
diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs
index 7454de126..370749dcf 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -283,6 +283,44 @@ impl Catalog for MemoryCatalog {
"MemoryCatalog does not currently support updating tables.",
))
}
+
+ // async fn commit_table(&self, base: &Table, current: Table) -> Result
{
+ // if base.metadata() == current.metadata() {
+ // // no change
+ // return Ok(current);
+ // }
+ //
+ // let mut root_namespace_state = self.root_namespace_state.lock().await;
+ // // TODO: caller needs to retry on the error below
+ // let _ = root_namespace_state
+ // .check_metadata_location(base.identifier(), base.metadata_location())?;
+ //
+ // let next_metadata_version = if let Some(base_metadata_location) = base.metadata_location() {
+ // self.parse_metadata_version(base_metadata_location) + 1
+ // } else {
+ // 0
+ // };
+ //
+ // // write metadata
+ // let metadata_location = format!(
+ // "{}/metadata/{}-{}.metadata.json",
+ // current.metadata().location(),
+ // next_metadata_version,
+ // Uuid::new_v4()
+ // );
+ //
+ // // TODO instead of using current.metadata(), build a new metadata with some properties like last_updated_ms updated
+ // self.file_io
+ // .new_output(&metadata_location)?
+ // .write(serde_json::to_vec(current.metadata())?.into())
+ // .await?;
+ //
+ // root_namespace_state
+ // .update_existing_table_location(current.identifier(), current.metadata_location())?;
+ //
+ // // TODO same here, need to update the metadata location
+ // Ok(current)
+ // }
}
#[cfg(test)]
diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs
index e324e7a3d..547c6bcd0 100644
--- a/crates/catalog/memory/src/namespace_state.rs
+++ b/crates/catalog/memory/src/namespace_state.rs
@@ -262,6 +262,45 @@ impl NamespaceState {
}
}
+ pub(crate) fn check_metadata_location(
+ &self,
+ table_ident: &TableIdent,
+ metadata_location: Option<&str>,
+ ) -> Result<()> {
+ let namespace = self.get_namespace(table_ident.namespace())?;
+
+ if namespace
+ .table_metadata_locations
+ .get(table_ident.name())
+ .map(|s| s.as_str())
+ != metadata_location
+ {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Metadata location does not match for table: {table_ident}!"),
+ ));
+ }
+
+ Ok(())
+ }
+
+ pub(crate) fn update_existing_table_location(
+ &mut self,
+ table_ident: &TableIdent,
+ new_metadata_location: Option<&str>,
+ ) -> Result<()> {
+ if new_metadata_location.is_none() {
+ return Ok(());
+ }
+
+ let namespace = self.get_mut_namespace(table_ident.namespace())?;
+ namespace
+ .table_metadata_locations
+ .entry(table_ident.name().to_string())
+ .insert_entry(new_metadata_location.unwrap().to_string());
+ Ok(())
+ }
+
// Inserts the given table or returns an error if it already exists
pub(crate) fn insert_new_table(
&mut self,
diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs
index 851819069..5d70b5fff 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -2125,10 +2125,10 @@ mod tests {
.unwrap()
};
- let table = Transaction::new(&table1)
+ let table = Transaction::new(table1)
.upgrade_table_version(FormatVersion::V2)
.unwrap()
- .commit(&catalog)
+ .commit(Arc::new(&catalog))
.await
.unwrap();
@@ -2250,10 +2250,10 @@ mod tests {
.unwrap()
};
- let table_result = Transaction::new(&table1)
+ let table_result = Transaction::new(table1)
.upgrade_table_version(FormatVersion::V2)
.unwrap()
- .commit(&catalog)
+ .commit(Arc::new(&catalog))
.await;
assert!(table_result.is_err());
diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs
index ab7ea3d62..ddf574d5d 100644
--- a/crates/catalog/rest/tests/rest_catalog_test.rs
+++ b/crates/catalog/rest/tests/rest_catalog_test.rs
@@ -19,7 +19,7 @@
use std::collections::HashMap;
use std::net::SocketAddr;
-use std::sync::RwLock;
+use std::sync::{Arc, RwLock};
use ctor::{ctor, dtor};
use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type};
@@ -347,10 +347,10 @@ async fn test_update_table() {
);
// Update table by committing transaction
- let table2 = Transaction::new(&table)
+ let table2 = Transaction::new(table)
.set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())]))
.unwrap()
- .commit(&catalog)
+ .commit(Arc::new(&catalog))
.await
.unwrap();
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 3324a4e9e..cd0fedc70 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -55,6 +55,7 @@ arrow-select = { workspace = true }
arrow-string = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-trait = { workspace = true }
+backon = { version = "1.5.1"}
base64 = { workspace = true }
bimap = { workspace = true }
bytes = { workspace = true }
@@ -86,6 +87,7 @@ typed-builder = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
zstd = { workspace = true }
+regex = "1.11.1"
[dev-dependencies]
ctor = { workspace = true }
diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs
index 3457f8361..7c30d29aa 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -21,6 +21,7 @@ use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::mem::take;
use std::ops::Deref;
+use std::sync::Arc;
use _serde::deserialize_snapshot;
use async_trait::async_trait;
@@ -300,6 +301,27 @@ impl TableCommit {
pub fn take_updates(&mut self) -> Vec {
take(&mut self.updates)
}
+
+ /// Apply updates to a table
+ pub fn apply(&mut self, mut table: Table) -> Result {
+ // 1. check requirements
+ let requirements = self.take_requirements();
+ for requirement in requirements {
+ requirement.check(Some(table.metadata()))?;
+ }
+
+ // 2. Apply updates to metadata builder
+ let mut metadata_builder = table.metadata().clone().into_builder(None);
+
+ let updates = self.take_updates();
+ for update in updates {
+ metadata_builder = update.apply(metadata_builder)?;
+ }
+
+ table.with_metadata(Arc::new(metadata_builder.build()?.metadata));
+
+ Ok(table)
+ }
}
/// TableRequirement represents a requirement for a table in the catalog.
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 37529ee6f..07fd26413 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -39,7 +39,7 @@ pub enum ErrorKind {
/// Iceberg data is invalid.
///
/// This error is returned when we try to read a table from iceberg but
- /// failed to parse it's metadata or data file correctly.
+ /// failed to parse its metadata or data file correctly.
///
/// The table could be invalid or corrupted.
DataInvalid,
diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs
index fcd510920..68aac252c 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -26,6 +26,7 @@ use std::sync::Arc;
use _serde::TableMetadataEnum;
use chrono::{DateTime, Utc};
+use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
@@ -40,6 +41,7 @@ use crate::error::{Result, timestamp_ms_to_utc};
use crate::{Error, ErrorKind};
static MAIN_BRANCH: &str = "main";
+const TABLE_METADATA_FILE_NAME_REGEX: &str = r"(\d+)-([\w-]{36})(?:\.\w+)?\.metadata\.json";
pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
@@ -432,6 +434,30 @@ impl TableMetadata {
self.encryption_keys.get(key_id)
}
+ /// Parse metadata version and uuid from metadata filename
+ fn parse_metadata_filename(metadata_location: &str) -> Result<(i32, Uuid)> {
+ if let Some(metadata_file_name) = metadata_location.split('/').last() {
+ let re = Regex::new(TABLE_METADATA_FILE_NAME_REGEX)
+ .expect("Failed to parse regex for metadata file!");
+ if let Some(caps) = re.captures(metadata_file_name) {
+ let metadata_version_str = &caps[1];
+ let uuid_str = &caps[2];
+
+ let metadata_version = metadata_version_str
+ .parse()
+ .expect(format!("Invalid metadata version: {metadata_version_str}").as_str());
+ let uuid = Uuid::parse_str(uuid_str)?;
+
+ return Ok((metadata_version, uuid));
+ }
+ }
+
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("Unrecognizable metadata location: {metadata_location}"),
+ ))
+ }
+
/// Normalize this partition spec.
///
/// This is an internal method
diff --git a/crates/iceberg/src/transaction/action/mod.rs b/crates/iceberg/src/transaction/action/mod.rs
new file mode 100644
index 000000000..cda0c21a7
--- /dev/null
+++ b/crates/iceberg/src/transaction/action/mod.rs
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::Ordering;
+use std::collections::{HashMap, HashSet};
+use std::mem::take;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::TableUpdate::UpgradeFormatVersion;
+use crate::spec::FormatVersion;
+use crate::table::Table;
+use crate::transaction::Transaction;
+use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
+
+/// TODO doc
+pub type BoxedTransactionAction = Arc;
+
+/// TODO doc
+#[async_trait]
+pub trait TransactionAction: Sync + Send {
+ /// Commit the changes and apply the changes to the transaction
+ async fn commit(self: Arc, table: &Table) -> Result;
+}
+
+/// TODO doc
+pub trait ApplyTransactionAction {
+ /// TODO doc
+ fn apply(self, tx: Transaction) -> Result;
+}
+
+impl ApplyTransactionAction for T {
+ fn apply(self, mut tx: Transaction) -> Result
+ where Self: Sized {
+ tx.actions.push(Arc::new(self));
+ Ok(tx)
+ }
+}
+
+/// TODO doc
+pub struct ActionCommit {
+ updates: Vec,
+ requirements: Vec,
+}
+
+/// TODO doc
+impl ActionCommit {
+ /// TODO doc
+ pub fn new(updates: Vec, requirements: Vec) -> Self {
+ Self {
+ updates,
+ requirements,
+ }
+ }
+
+ /// TODO doc
+ pub fn take_updates(&mut self) -> Vec {
+ take(&mut self.updates)
+ }
+
+ /// TODO doc
+ pub fn take_requirements(&mut self) -> Vec {
+ take(&mut self.requirements)
+ }
+}
+
+/// TODO doc
+pub struct UpdateLocationAction {
+ location: Option,
+}
+
+impl UpdateLocationAction {
+ /// TODO doc
+ pub fn new() -> Self {
+ UpdateLocationAction { location: None }
+ }
+
+ /// TODO doc
+ pub fn set_location(mut self, location: String) -> Self {
+ self.location = Some(location);
+ self
+ }
+}
+
+#[async_trait]
+impl TransactionAction for UpdateLocationAction {
+ async fn commit(self: Arc, _table: &Table) -> Result {
+ let updates: Vec;
+ let requirements: Vec;
+ if let Some(location) = self.location.clone() {
+ updates = vec![TableUpdate::SetLocation { location }];
+ requirements = vec![];
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Location is not set for UpdateLocationAction!",
+ ));
+ }
+
+ Ok(ActionCommit::new(updates, requirements))
+ }
+}
+
+/// TODO doc
+pub struct UpgradeFormatVersionAction {
+ format_version: Option,
+}
+
+impl UpgradeFormatVersionAction {
+ /// TODO doc
+ pub fn new() -> Self {
+ UpgradeFormatVersionAction {
+ format_version: None,
+ }
+ }
+
+ /// TODO doc
+ pub fn set_format_version(mut self, format_version: FormatVersion) -> Self {
+ self.format_version = Some(format_version);
+ self
+ }
+}
+
+#[async_trait]
+impl TransactionAction for UpgradeFormatVersionAction {
+ async fn commit(self: Arc, table: &Table) -> Result {
+ let current_version = table.metadata().format_version();
+ let updates: Vec;
+ let requirements: Vec;
+
+ if let Some(format_version) = self.format_version {
+ match current_version.cmp(&format_version) {
+ Ordering::Greater => {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot downgrade table version from {} to {}",
+ current_version, format_version
+ ),
+ ));
+ }
+ Ordering::Less => {
+ updates = vec![UpgradeFormatVersion { format_version }];
+ requirements = vec![];
+ }
+ Ordering::Equal => {
+ // do nothing
+ updates = vec![];
+ requirements = vec![];
+ }
+ }
+ } else {
+ // error
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "FormatVersion is not set for UpgradeFormatVersionAction!",
+ ));
+ }
+
+ Ok(ActionCommit::new(updates, requirements))
+ }
+}
+
+/// TODO doc
+pub struct UpdatePropertiesAction {
+ updates: HashMap,
+ removals: HashSet,
+}
+
+impl UpdatePropertiesAction {
+ /// TODO doc
+ pub fn new() -> Self {
+ UpdatePropertiesAction {
+ updates: HashMap::default(),
+ removals: HashSet::default(),
+ }
+ }
+
+ /// TODO doc
+ pub fn set(mut self, key: String, value: String) -> Self {
+ assert!(!self.removals.contains(&key));
+ self.updates.insert(key, value);
+ self
+ }
+
+ /// TODO doc
+ pub fn remove(mut self, key: String) -> Self {
+ assert!(!self.updates.contains_key(&key));
+ self.removals.insert(key);
+ self
+ }
+}
+
+#[async_trait]
+impl TransactionAction for UpdatePropertiesAction {
+ async fn commit(self: Arc, _table: &Table) -> Result {
+ let updates: Vec = vec![
+ TableUpdate::SetProperties {
+ updates: self.updates.clone(),
+ },
+ TableUpdate::RemoveProperties {
+ removals: self.removals.clone().into_iter().collect::>(),
+ },
+ ];
+ let requirements: Vec = vec![];
+
+ Ok(ActionCommit::new(updates, requirements))
+ }
+}
diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs
index d3b3cb2e3..1a930d9ca 100644
--- a/crates/iceberg/src/transaction/append.rs
+++ b/crates/iceberg/src/transaction/append.rs
@@ -16,14 +16,17 @@
// under the License.
use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
use arrow_array::StringArray;
+use async_trait::async_trait;
use futures::TryStreamExt;
use uuid::Uuid;
use crate::error::Result;
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
-use crate::transaction::Transaction;
+use crate::table::Table;
+use crate::transaction::action::{ActionCommit, TransactionAction};
use crate::transaction::snapshot::{
DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
};
@@ -31,15 +34,17 @@ use crate::writer::file_writer::ParquetWriter;
use crate::{Error, ErrorKind};
/// FastAppendAction is a transaction action for fast append data files to the table.
-pub struct FastAppendAction<'a> {
- snapshot_produce_action: SnapshotProduceAction<'a>,
+pub struct FastAppendAction {
+ snapshot_produce_action: SnapshotProduceAction,
check_duplicate: bool,
+ // below are properties used to create SnapshotProduceAction when commit
+ snapshot_properties: HashMap,
+ pub added_data_files: Vec,
}
-impl<'a> FastAppendAction<'a> {
+impl FastAppendAction {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
- tx: Transaction<'a>,
snapshot_id: i64,
commit_uuid: Uuid,
key_metadata: Vec,
@@ -47,13 +52,14 @@ impl<'a> FastAppendAction<'a> {
) -> Result {
Ok(Self {
snapshot_produce_action: SnapshotProduceAction::new(
- tx,
snapshot_id,
key_metadata,
commit_uuid,
- snapshot_properties,
+ snapshot_properties.clone(),
)?,
check_duplicate: true,
+ snapshot_properties,
+ added_data_files: vec![],
})
}
@@ -68,7 +74,17 @@ impl<'a> FastAppendAction<'a> {
&mut self,
data_files: impl IntoIterator- ,
) -> Result<&mut Self> {
- self.snapshot_produce_action.add_data_files(data_files)?;
+ let data_files: Vec = data_files.into_iter().collect();
+ for data_file in &data_files {
+ if data_file.content_type() != crate::spec::DataContentType::Data {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Only data content type is allowed for fast append",
+ ));
+ }
+ }
+ self.added_data_files.extend(data_files);
+
Ok(self)
}
@@ -77,8 +93,7 @@ impl<'a> FastAppendAction<'a> {
&mut self,
snapshot_properties: HashMap,
) -> Result<&mut Self> {
- self.snapshot_produce_action
- .set_snapshot_properties(snapshot_properties)?;
+ self.snapshot_properties = snapshot_properties;
Ok(self)
}
@@ -89,54 +104,44 @@ impl<'a> FastAppendAction<'a> {
/// Specifically, schema compatibility checks and support for adding to partitioned tables
/// have not yet been implemented.
#[allow(dead_code)]
- async fn add_parquet_files(mut self, file_path: Vec) -> Result> {
- if !self
- .snapshot_produce_action
- .tx
- .current_table
- .metadata()
- .default_spec
- .is_unpartitioned()
- {
+ async fn add_parquet_files(mut self, table: &Table, file_path: Vec) -> Result<()> {
+ if !table.metadata().default_spec.is_unpartitioned() {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Appending to partitioned tables is not supported",
));
}
- let table_metadata = self.snapshot_produce_action.tx.current_table.metadata();
+ let table_metadata = table.metadata();
- let data_files = ParquetWriter::parquet_files_to_data_files(
- self.snapshot_produce_action.tx.current_table.file_io(),
- file_path,
- table_metadata,
- )
- .await?;
+ let data_files =
+ ParquetWriter::parquet_files_to_data_files(table.file_io(), file_path, table_metadata)
+ .await?;
self.add_data_files(data_files)?;
- self.apply().await
+ Ok(())
}
+}
+
+#[async_trait]
+impl TransactionAction for FastAppendAction {
+ async fn commit(self: Arc, table: &Table) -> Result {
+ let mut snapshot_produce_action = self.snapshot_produce_action.clone();
+
+ let snapshot_produce_action = snapshot_produce_action
+ .add_data_files(table, self.added_data_files.clone())?
+ .set_snapshot_properties(self.snapshot_properties.clone())?;
- /// Finished building the action and apply it to the transaction.
- pub async fn apply(self) -> Result> {
// Checks duplicate files
if self.check_duplicate {
- let new_files: HashSet<&str> = self
- .snapshot_produce_action
+ let new_files: HashSet<&str> = snapshot_produce_action
.added_data_files
.iter()
.map(|df| df.file_path.as_str())
.collect();
- let mut manifest_stream = self
- .snapshot_produce_action
- .tx
- .current_table
- .inspect()
- .manifests()
- .scan()
- .await?;
+ let mut manifest_stream = table.inspect().manifests().scan().await?;
let mut referenced_files = Vec::new();
while let Some(batch) = manifest_stream.try_next().await? {
@@ -170,8 +175,8 @@ impl<'a> FastAppendAction<'a> {
}
}
- self.snapshot_produce_action
- .apply(FastAppendOperation, DefaultManifestProcess)
+ snapshot_produce_action
+ .apply(table, FastAppendOperation, DefaultManifestProcess)
.await
}
}
@@ -185,29 +190,22 @@ impl SnapshotProduceOperation for FastAppendOperation {
async fn delete_entries(
&self,
- _snapshot_produce: &SnapshotProduceAction<'_>,
+ _snapshot_produce: &SnapshotProduceAction,
) -> Result> {
Ok(vec![])
}
async fn existing_manifest(
&self,
- snapshot_produce: &SnapshotProduceAction<'_>,
+ table: &Table,
+ _snapshot_produce: &SnapshotProduceAction,
) -> Result> {
- let Some(snapshot) = snapshot_produce
- .tx
- .current_table
- .metadata()
- .current_snapshot()
- else {
+ let Some(snapshot) = table.metadata().current_snapshot() else {
return Ok(vec![]);
};
let manifest_list = snapshot
- .load_manifest_list(
- snapshot_produce.tx.current_table.file_io(),
- &snapshot_produce.tx.current_table.metadata_ref(),
- )
+ .load_manifest_list(table.file_io(), &table.metadata_ref())
.await?;
Ok(manifest_list
@@ -222,28 +220,30 @@ impl SnapshotProduceOperation for FastAppendOperation {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
+ use std::sync::Arc;
use crate::scan::tests::TableTestFixture;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
};
use crate::transaction::Transaction;
+ use crate::transaction::action::TransactionAction;
use crate::transaction::tests::make_v2_minimal_table;
use crate::{TableRequirement, TableUpdate};
#[tokio::test]
async fn test_empty_data_append_action() {
let table = make_v2_minimal_table();
- let tx = Transaction::new(&table);
+ let mut tx = Transaction::new(table);
let mut action = tx.fast_append(None, vec![]).unwrap();
action.add_data_files(vec![]).unwrap();
- assert!(action.apply().await.is_err());
+ assert!(Arc::new(action).commit(&table).await.is_err());
}
#[tokio::test]
async fn test_set_snapshot_properties() {
let table = make_v2_minimal_table();
- let tx = Transaction::new(&table);
+ let mut tx = Transaction::new(table.clone());
let mut action = tx.fast_append(None, vec![]).unwrap();
let mut snapshot_properties = HashMap::new();
@@ -260,7 +260,7 @@ mod tests {
.build()
.unwrap();
action.add_data_files(vec![data_file]).unwrap();
- let tx = action.apply().await.unwrap();
+ Arc::new(action).commit(&table).await.unwrap();
// Check customized properties is contained in snapshot summary properties.
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] {
@@ -281,7 +281,7 @@ mod tests {
#[tokio::test]
async fn test_fast_append_action() {
let table = make_v2_minimal_table();
- let tx = Transaction::new(&table);
+ let mut tx = Transaction::new(table.clone());
let mut action = tx.fast_append(None, vec![]).unwrap();
// check add data file with incompatible partition value
@@ -308,7 +308,7 @@ mod tests {
.build()
.unwrap();
action.add_data_files(vec![data_file.clone()]).unwrap();
- let tx = action.apply().await.unwrap();
+ Arc::new(action).commit(&tx.current_table).await.unwrap();
// check updates and requirements
assert!(
@@ -367,7 +367,7 @@ mod tests {
async fn test_add_existing_parquet_files_to_unpartitioned_table() {
let mut fixture = TableTestFixture::new_unpartitioned();
fixture.setup_unpartitioned_manifest_files().await;
- let tx = crate::transaction::Transaction::new(&fixture.table);
+ let mut tx = Transaction::new(fixture.table.clone());
let file_paths = vec![
format!("{}/1.parquet", &fixture.table_location),
@@ -378,14 +378,14 @@ mod tests {
let fast_append_action = tx.fast_append(None, vec![]).unwrap();
// Attempt to add the existing Parquet files with fast append.
- let new_tx = fast_append_action
- .add_parquet_files(file_paths.clone())
+ fast_append_action
+ .add_parquet_files(&tx.base_table, file_paths.clone())
.await
.expect("Adding existing Parquet files should succeed");
let mut found_add_snapshot = false;
let mut found_set_snapshot_ref = false;
- for update in new_tx.updates.iter() {
+ for update in tx.updates.iter() {
match update {
TableUpdate::AddSnapshot { .. } => {
found_add_snapshot = true;
@@ -404,7 +404,7 @@ mod tests {
assert!(found_add_snapshot);
assert!(found_set_snapshot_ref);
- let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] {
+ let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] {
snapshot
} else {
panic!("Expected the first update to be an AddSnapshot update");
diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs
index ba79d60bb..6a666d51a 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -17,79 +17,83 @@
//! This module contains transaction api.
+/// TODO doc
+pub mod action;
mod append;
mod snapshot;
mod sort_order;
-use std::cmp::Ordering;
use std::collections::HashMap;
use std::mem::discriminant;
use std::sync::Arc;
+use std::time::Duration;
+use backon::{BackoffBuilder, ExponentialBuilder, RetryableWithContext};
use uuid::Uuid;
-use crate::TableUpdate::UpgradeFormatVersion;
use crate::error::Result;
-use crate::spec::FormatVersion;
use crate::table::Table;
+use crate::transaction::action::{
+ ApplyTransactionAction, BoxedTransactionAction, UpdateLocationAction, UpdatePropertiesAction,
+ UpgradeFormatVersionAction,
+};
use crate::transaction::append::FastAppendAction;
use crate::transaction::sort_order::ReplaceSortOrderAction;
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
/// Table transaction.
-pub struct Transaction<'a> {
- base_table: &'a Table,
- current_table: Table,
- updates: Vec,
- requirements: Vec,
+#[derive(Clone)]
+pub struct Transaction {
+ base_table: Table,
+ actions: Vec,
}
-impl<'a> Transaction<'a> {
+impl Transaction {
/// Creates a new transaction.
- pub fn new(table: &'a Table) -> Self {
+ pub fn new(table: Table) -> Self {
Self {
- base_table: table,
- current_table: table.clone(),
- updates: vec![],
- requirements: vec![],
+ base_table: table.clone(),
+ actions: vec![],
}
}
- fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> {
- let mut metadata_builder = self.current_table.metadata().clone().into_builder(None);
+ fn update_table_metadata(&self, table: &mut Table, updates: &[TableUpdate]) -> Result<()> {
+ let mut metadata_builder = table.metadata().clone().into_builder(None);
for update in updates {
metadata_builder = update.clone().apply(metadata_builder)?;
}
- self.current_table
- .with_metadata(Arc::new(metadata_builder.build()?.metadata));
+ table.with_metadata(Arc::new(metadata_builder.build()?.metadata));
Ok(())
}
- fn apply(
- &mut self,
+ /// TODO documentation
+ pub fn apply(
+ &self,
+ table: &mut Table,
updates: Vec,
requirements: Vec,
+ existing_updates: &mut Vec,
+ existing_requirements: &mut Vec,
) -> Result<()> {
for requirement in &requirements {
- requirement.check(Some(self.current_table.metadata()))?;
+ requirement.check(Some(table.metadata()))?;
}
- self.update_table_metadata(&updates)?;
+ self.update_table_metadata(table, &updates)?;
- self.updates.extend(updates);
+ existing_updates.extend(updates);
// For the requirements, it does not make sense to add a requirement more than once
// For example, you cannot assert that the current schema has two different IDs
for new_requirement in requirements {
- if self
- .requirements
+ if existing_requirements
.iter()
.map(discriminant)
.all(|d| d != discriminant(&new_requirement))
{
- self.requirements.push(new_requirement);
+ existing_requirements.push(new_requirement);
}
}
@@ -100,32 +104,13 @@ impl<'a> Transaction<'a> {
}
/// Sets table to a new version.
- pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result {
- let current_version = self.current_table.metadata().format_version();
- match current_version.cmp(&format_version) {
- Ordering::Greater => {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Cannot downgrade table version from {} to {}",
- current_version, format_version
- ),
- ));
- }
- Ordering::Less => {
- self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?;
- }
- Ordering::Equal => {
- // Do nothing.
- }
- }
- Ok(self)
+ pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
+ UpgradeFormatVersionAction::new()
}
/// Update table's property.
- pub fn set_properties(mut self, props: HashMap) -> Result {
- self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?;
- Ok(self)
+ pub fn update_properties(&self) -> UpdatePropertiesAction {
+ UpdatePropertiesAction::new()
}
fn generate_unique_snapshot_id(&self) -> i64 {
@@ -140,7 +125,7 @@ impl<'a> Transaction<'a> {
};
let mut snapshot_id = generate_random_id();
while self
- .current_table
+ .base_table
.metadata()
.snapshots()
.any(|s| s.snapshot_id() == snapshot_id)
@@ -152,13 +137,12 @@ impl<'a> Transaction<'a> {
/// Creates a fast append action.
pub fn fast_append(
- self,
+ &mut self,
commit_uuid: Option,
key_metadata: Vec,
- ) -> Result> {
+ ) -> Result {
let snapshot_id = self.generate_unique_snapshot_id();
FastAppendAction::new(
- self,
snapshot_id,
commit_uuid.unwrap_or_else(Uuid::now_v7),
key_metadata,
@@ -167,34 +151,83 @@ impl<'a> Transaction<'a> {
}
/// Creates replace sort order action.
- pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
+ pub fn replace_sort_order(self) -> ReplaceSortOrderAction {
ReplaceSortOrderAction {
- tx: self,
sort_fields: vec![],
}
}
- /// Remove properties in table.
- pub fn remove_properties(mut self, keys: Vec) -> Result {
- self.apply(
- vec![TableUpdate::RemoveProperties { removals: keys }],
- vec![],
- )?;
- Ok(self)
- }
-
/// Set the location of table
- pub fn set_location(mut self, location: String) -> Result {
- self.apply(vec![TableUpdate::SetLocation { location }], vec![])?;
- Ok(self)
+ pub fn update_location(&mut self) -> UpdateLocationAction {
+ UpdateLocationAction::new()
}
/// Commit transaction.
- pub async fn commit(self, catalog: &dyn Catalog) -> Result
{
+ pub async fn commit(&mut self, catalog: Arc<&dyn Catalog>) -> Result {
+ if self.actions.is_empty() {
+ // nothing to commit
+ return Ok(self.base_table.clone());
+ }
+
+ let tx = self.clone();
+ (|mut tx: Transaction| async {
+ let result = tx.do_commit(catalog.clone()).await;
+ (tx, result)
+ })
+ .retry(
+ ExponentialBuilder::new()
+ // TODO retry strategy should be configurable
+ .with_min_delay(Duration::from_millis(100))
+ .with_max_delay(Duration::from_millis(60 * 1000))
+ .with_total_delay(Some(Duration::from_millis(30 * 60 * 1000)))
+ .with_max_times(4)
+ .with_factor(2.0)
+ .build(),
+ )
+ .context(tx)
+ .sleep(tokio::time::sleep)
+ // todo use a specific commit failure
+ .when(|e| e.kind() == ErrorKind::DataInvalid)
+ .await
+ .1
+ }
+
+ async fn do_commit(&mut self, catalog: Arc<&dyn Catalog>) -> Result {
+ let base_table_identifier = self.base_table.identifier().to_owned();
+
+ let refreshed = catalog
+ .load_table(&base_table_identifier.clone())
+ .await
+ .expect(format!("Failed to refresh table {}", base_table_identifier).as_str());
+
+ let mut existing_updates: Vec = vec![];
+ let mut existing_requirements: Vec = vec![];
+
+ if self.base_table.metadata() != refreshed.metadata()
+ || self.base_table.metadata_location() != refreshed.metadata_location()
+ {
+ // current base is stale, use refreshed as base and re-apply transaction actions
+ self.base_table = refreshed.clone();
+ }
+
+ let mut current_table = self.base_table.clone();
+
+ for action in self.actions.clone() {
+ let mut action_commit = action.commit(¤t_table).await?;
+ // apply changes to current_table
+ self.apply(
+ &mut current_table,
+ action_commit.take_updates(),
+ action_commit.take_requirements(),
+ &mut existing_updates,
+ &mut existing_requirements,
+ )?;
+ }
+
let table_commit = TableCommit::builder()
- .ident(self.base_table.identifier().clone())
- .updates(self.updates)
- .requirements(self.requirements)
+ .ident(base_table_identifier.clone())
+ .updates(existing_updates)
+ .requirements(existing_requirements)
.build();
catalog.update_table(table_commit).await
@@ -206,11 +239,13 @@ mod tests {
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
+ use std::sync::Arc;
use crate::io::FileIOBuilder;
use crate::spec::{FormatVersion, TableMetadata};
use crate::table::Table;
use crate::transaction::Transaction;
+ use crate::transaction::action::TransactionAction;
use crate::{TableIdent, TableUpdate};
fn make_v1_table() -> Table {
@@ -273,7 +308,7 @@ mod tests {
#[test]
fn test_upgrade_table_version_v1_to_v2() {
let table = make_v1_table();
- let tx = Transaction::new(&table);
+ let tx = Transaction::new(table);
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
assert_eq!(
@@ -287,7 +322,7 @@ mod tests {
#[test]
fn test_upgrade_table_version_v2_to_v2() {
let table = make_v2_table();
- let tx = Transaction::new(&table);
+ let tx = Transaction::new(table);
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
assert!(
@@ -303,7 +338,7 @@ mod tests {
#[test]
fn test_downgrade_table_version() {
let table = make_v2_table();
- let tx = Transaction::new(&table);
+ let tx = Transaction::new(table);
let tx = tx.upgrade_table_version(FormatVersion::V1);
assert!(tx.is_err(), "Downgrade table version should fail!");
@@ -312,7 +347,7 @@ mod tests {
#[test]
fn test_set_table_property() {
let table = make_v2_table();
- let tx = Transaction::new(&table);
+ let tx = Transaction::new(table);
let tx = tx
.set_properties(HashMap::from([("a".to_string(), "b".to_string())]))
.unwrap();
@@ -328,7 +363,7 @@ mod tests {
#[test]
fn test_remove_property() {
let table = make_v2_table();
- let tx = Transaction::new(&table);
+ let tx = Transaction::new(table);
let tx = tx
.remove_properties(vec!["a".to_string(), "b".to_string()])
.unwrap();
@@ -341,13 +376,16 @@ mod tests {
);
}
- #[test]
- fn test_set_location() {
+ #[tokio::test]
+ async fn test_set_location() {
let table = make_v2_table();
- let tx = Transaction::new(&table);
- let tx = tx
- .set_location(String::from("s3://bucket/prefix/new_table"))
- .unwrap();
+ let mut tx = Transaction::new(table);
+ let update_location_action = tx
+ .update_location()
+ .unwrap()
+ .set_location(String::from("s3://bucket/prefix/new_table"));
+
+ let _res = Arc::new(update_location_action).commit(&mut tx).await;
assert_eq!(
vec![TableUpdate::SetLocation {
@@ -360,7 +398,7 @@ mod tests {
#[tokio::test]
async fn test_transaction_apply_upgrade() {
let table = make_v1_table();
- let tx = Transaction::new(&table);
+ let tx = Transaction::new(table);
// Upgrade v1 to v1, do nothing.
let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap();
// Upgrade v1 to v2, success.
diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs
index 012fb52bb..a9217835b 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -29,7 +29,8 @@ use crate::spec::{
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention,
SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries,
};
-use crate::transaction::Transaction;
+use crate::table::Table;
+use crate::transaction::action::ActionCommit;
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
const META_ROOT_PATH: &str = "metadata";
@@ -43,6 +44,7 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync {
) -> impl Future