Skip to content

Commit bcd1033

Browse files
CTTYliurenjie1024
andauthored
feat(transaction): Implement TransactionAction for updata_loc, update_props, and upgrade_format (#1433)
## Which issue does this PR close? Related Issues: - #1382 [EPIC] - #1386 - #1387 - #1388 - #1389 ## What changes are included in this PR? - Added implementations of `TransactionAction` - `UpdateLocationAction` - `UpdatePropertiesAction` - `UpgradeFormatVersionAction` - Added `as_any` to `TransactionAction` trait - Added `do_commit` in `Transaction` to commit applied actions We will also need to implement `TransactionAction` for `FastAppendAction` and `ReplaceSortOrderAction`, and I'm planning to do that in a separate PR. ## Are these changes tested? Added unit tests --------- Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
1 parent 44cd718 commit bcd1033

File tree

10 files changed

+475
-165
lines changed

10 files changed

+475
-165
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ arrow-ord = { version = "55" }
5050
arrow-schema = { version = "55" }
5151
arrow-select = { version = "55" }
5252
arrow-string = { version = "55" }
53+
as-any = "0.3.2"
5354
async-std = "1.12"
5455
async-trait = "0.1.88"
5556
aws-config = "1.6.1"

crates/catalog/rest/src/catalog.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,7 @@ mod tests {
802802
SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
803803
UnboundPartitionField, UnboundPartitionSpec,
804804
};
805-
use iceberg::transaction::Transaction;
805+
use iceberg::transaction::{ApplyTransactionAction, Transaction};
806806
use mockito::{Mock, Server, ServerGuard};
807807
use serde_json::json;
808808
use uuid::uuid;
@@ -2093,6 +2093,17 @@ mod tests {
20932093

20942094
let config_mock = create_config_mock(&mut server).await;
20952095

2096+
let load_table_mock = server
2097+
.mock("GET", "/v1/namespaces/ns1/tables/test1")
2098+
.with_status(200)
2099+
.with_body_from_file(format!(
2100+
"{}/testdata/{}",
2101+
env!("CARGO_MANIFEST_DIR"),
2102+
"load_table_response.json"
2103+
))
2104+
.create_async()
2105+
.await;
2106+
20962107
let update_table_mock = server
20972108
.mock("POST", "/v1/namespaces/ns1/tables/test1")
20982109
.with_status(200)
@@ -2125,8 +2136,11 @@ mod tests {
21252136
.unwrap()
21262137
};
21272138

2128-
let table = Transaction::new(&table1)
2129-
.upgrade_table_version(FormatVersion::V2)
2139+
let tx = Transaction::new(&table1);
2140+
let table = tx
2141+
.upgrade_table_version()
2142+
.set_format_version(FormatVersion::V2)
2143+
.apply(tx)
21302144
.unwrap()
21312145
.commit(&catalog)
21322146
.await
@@ -2204,6 +2218,7 @@ mod tests {
22042218

22052219
config_mock.assert_async().await;
22062220
update_table_mock.assert_async().await;
2221+
load_table_mock.assert_async().await
22072222
}
22082223

22092224
#[tokio::test]
@@ -2212,6 +2227,17 @@ mod tests {
22122227

22132228
let config_mock = create_config_mock(&mut server).await;
22142229

2230+
let load_table_mock = server
2231+
.mock("GET", "/v1/namespaces/ns1/tables/test1")
2232+
.with_status(200)
2233+
.with_body_from_file(format!(
2234+
"{}/testdata/{}",
2235+
env!("CARGO_MANIFEST_DIR"),
2236+
"load_table_response.json"
2237+
))
2238+
.create_async()
2239+
.await;
2240+
22152241
let update_table_mock = server
22162242
.mock("POST", "/v1/namespaces/ns1/tables/test1")
22172243
.with_status(404)
@@ -2250,8 +2276,11 @@ mod tests {
22502276
.unwrap()
22512277
};
22522278

2253-
let table_result = Transaction::new(&table1)
2254-
.upgrade_table_version(FormatVersion::V2)
2279+
let tx = Transaction::new(&table1);
2280+
let table_result = tx
2281+
.upgrade_table_version()
2282+
.set_format_version(FormatVersion::V2)
2283+
.apply(tx)
22552284
.unwrap()
22562285
.commit(&catalog)
22572286
.await;
@@ -2267,5 +2296,6 @@ mod tests {
22672296

22682297
config_mock.assert_async().await;
22692298
update_table_mock.assert_async().await;
2299+
load_table_mock.assert_async().await;
22702300
}
22712301
}

crates/catalog/rest/tests/rest_catalog_test.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::RwLock;
2323

2424
use ctor::{ctor, dtor};
2525
use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type};
26-
use iceberg::transaction::Transaction;
26+
use iceberg::transaction::{ApplyTransactionAction, Transaction};
2727
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
2828
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
2929
use iceberg_test_utils::docker::DockerCompose;
@@ -346,9 +346,12 @@ async fn test_update_table() {
346346
&TableIdent::new(ns.name().clone(), "t1".to_string())
347347
);
348348

349+
let tx = Transaction::new(&table);
349350
// Update table by committing transaction
350-
let table2 = Transaction::new(&table)
351-
.set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())]))
351+
let table2 = tx
352+
.update_table_properties()
353+
.set("prop1".to_string(), "v1".to_string())
354+
.apply(tx)
352355
.unwrap()
353356
.commit(&catalog)
354357
.await

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ arrow-ord = { workspace = true }
5454
arrow-schema = { workspace = true }
5555
arrow-select = { workspace = true }
5656
arrow-string = { workspace = true }
57+
as-any = { workspace = true }
5758
async-std = { workspace = true, optional = true, features = ["attributes"] }
5859
async-trait = { workspace = true }
5960
base64 = { workspace = true }

crates/iceberg/src/transaction/action.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,26 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#![allow(dead_code)]
1918
use std::mem::take;
2019
use std::sync::Arc;
2120

21+
use as_any::AsAny;
2222
use async_trait::async_trait;
2323

2424
use crate::table::Table;
2525
use crate::transaction::Transaction;
2626
use crate::{Result, TableRequirement, TableUpdate};
2727

2828
/// A boxed, thread-safe reference to a `TransactionAction`.
29-
pub type BoxedTransactionAction = Arc<dyn TransactionAction>;
29+
pub(crate) type BoxedTransactionAction = Arc<dyn TransactionAction>;
3030

3131
/// A trait representing an atomic action that can be part of a transaction.
3232
///
3333
/// Implementors of this trait define how a specific action is committed to a table.
3434
/// Each action is responsible for generating the updates and requirements needed
3535
/// to modify the table metadata.
3636
#[async_trait]
37-
pub(crate) trait TransactionAction: Sync + Send {
37+
pub(crate) trait TransactionAction: AsAny + Sync + Send {
3838
/// Commits this action against the provided table and returns the resulting updates.
3939
/// NOTE: This function is intended for internal use only and should not be called directly by users.
4040
///
@@ -108,6 +108,7 @@ mod tests {
108108
use std::str::FromStr;
109109
use std::sync::Arc;
110110

111+
use as_any::Downcast;
111112
use async_trait::async_trait;
112113
use uuid::Uuid;
113114

@@ -158,9 +159,12 @@ mod tests {
158159
let tx = Transaction::new(&table);
159160

160161
let updated_tx = action.apply(tx).unwrap();
161-
162162
// There should be one action in the transaction now
163163
assert_eq!(updated_tx.actions.len(), 1);
164+
165+
(*updated_tx.actions[0])
166+
.downcast_ref::<TestAction>()
167+
.expect("TestAction was not applied to Transaction!");
164168
}
165169

166170
#[test]

0 commit comments

Comments
 (0)