Skip to content

Commit cd103fb

Browse files
CTTYliurenjie1024
andauthored
feat(transaction): Make Transaction own base_table (#1421)
## Which issue does this PR close? This is a part of the effort to refactor transaction commit path and enable retry for write operations. Please find the POC here: #1400 Related Issues: - #1382 [EPIC] - #1386 - #1387 - #1388 - #1389 ## What changes are included in this PR? - Make Transaction own base_table instead of a reference, so we can refresh base_table using catalog within the Transaction when retry ## Are these changes tested? Using the existing unit tests --------- Co-authored-by: Renjie Liu <liurenjie2008@gmail.com>
1 parent b10d48e commit cd103fb

File tree

4 files changed

+24
-24
lines changed

4 files changed

+24
-24
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ use crate::writer::file_writer::ParquetWriter;
2929
use crate::{Error, ErrorKind};
3030

3131
/// FastAppendAction is a transaction action for fast append data files to the table.
32-
pub struct FastAppendAction<'a> {
33-
snapshot_produce_action: SnapshotProduceAction<'a>,
32+
pub struct FastAppendAction {
33+
snapshot_produce_action: SnapshotProduceAction,
3434
check_duplicate: bool,
3535
}
3636

37-
impl<'a> FastAppendAction<'a> {
37+
impl FastAppendAction {
3838
#[allow(clippy::too_many_arguments)]
3939
pub(crate) fn new(
40-
tx: Transaction<'a>,
40+
tx: Transaction,
4141
snapshot_id: i64,
4242
commit_uuid: Uuid,
4343
key_metadata: Vec<u8>,
@@ -87,7 +87,7 @@ impl<'a> FastAppendAction<'a> {
8787
/// Specifically, schema compatibility checks and support for adding to partitioned tables
8888
/// have not yet been implemented.
8989
#[allow(dead_code)]
90-
async fn add_parquet_files(mut self, file_path: Vec<String>) -> Result<Transaction<'a>> {
90+
async fn add_parquet_files(mut self, file_path: Vec<String>) -> Result<Transaction> {
9191
if !self
9292
.snapshot_produce_action
9393
.tx
@@ -117,7 +117,7 @@ impl<'a> FastAppendAction<'a> {
117117
}
118118

119119
/// Finished building the action and apply it to the transaction.
120-
pub async fn apply(self) -> Result<Transaction<'a>> {
120+
pub async fn apply(self) -> Result<Transaction> {
121121
// Checks duplicate files
122122
if self.check_duplicate {
123123
let new_files: HashSet<&str> = self
@@ -170,14 +170,14 @@ impl SnapshotProduceOperation for FastAppendOperation {
170170

171171
async fn delete_entries(
172172
&self,
173-
_snapshot_produce: &SnapshotProduceAction<'_>,
173+
_snapshot_produce: &SnapshotProduceAction,
174174
) -> Result<Vec<ManifestEntry>> {
175175
Ok(vec![])
176176
}
177177

178178
async fn existing_manifest(
179179
&self,
180-
snapshot_produce: &SnapshotProduceAction<'_>,
180+
snapshot_produce: &SnapshotProduceAction,
181181
) -> Result<Vec<ManifestFile>> {
182182
let Some(snapshot) = snapshot_produce
183183
.tx

crates/iceberg/src/transaction/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,19 @@ use crate::transaction::sort_order::ReplaceSortOrderAction;
3939
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
4040

4141
/// Table transaction.
42-
pub struct Transaction<'a> {
43-
base_table: &'a Table,
42+
pub struct Transaction {
43+
base_table: Table,
4444
current_table: Table,
4545
actions: Vec<BoxedTransactionAction>,
4646
updates: Vec<TableUpdate>,
4747
requirements: Vec<TableRequirement>,
4848
}
4949

50-
impl<'a> Transaction<'a> {
50+
impl Transaction {
5151
/// Creates a new transaction.
52-
pub fn new(table: &'a Table) -> Self {
52+
pub fn new(table: &Table) -> Self {
5353
Self {
54-
base_table: table,
54+
base_table: table.clone(),
5555
current_table: table.clone(),
5656
actions: vec![],
5757
updates: vec![],
@@ -159,7 +159,7 @@ impl<'a> Transaction<'a> {
159159
self,
160160
commit_uuid: Option<Uuid>,
161161
key_metadata: Vec<u8>,
162-
) -> Result<FastAppendAction<'a>> {
162+
) -> Result<FastAppendAction> {
163163
let snapshot_id = self.generate_unique_snapshot_id();
164164
FastAppendAction::new(
165165
self,
@@ -171,7 +171,7 @@ impl<'a> Transaction<'a> {
171171
}
172172

173173
/// Creates replace sort order action.
174-
pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
174+
pub fn replace_sort_order(self) -> ReplaceSortOrderAction {
175175
ReplaceSortOrderAction {
176176
tx: self,
177177
sort_fields: vec![],

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ pub(crate) trait ManifestProcess: Send + Sync {
5959
fn process_manifests(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>;
6060
}
6161

62-
pub(crate) struct SnapshotProduceAction<'a> {
63-
pub tx: Transaction<'a>,
62+
pub(crate) struct SnapshotProduceAction {
63+
pub tx: Transaction,
6464
snapshot_id: i64,
6565
key_metadata: Vec<u8>,
6666
commit_uuid: Uuid,
@@ -72,9 +72,9 @@ pub(crate) struct SnapshotProduceAction<'a> {
7272
manifest_counter: RangeFrom<u64>,
7373
}
7474

75-
impl<'a> SnapshotProduceAction<'a> {
75+
impl SnapshotProduceAction {
7676
pub(crate) fn new(
77-
tx: Transaction<'a>,
77+
tx: Transaction,
7878
snapshot_id: i64,
7979
key_metadata: Vec<u8>,
8080
commit_uuid: Uuid,
@@ -310,7 +310,7 @@ impl<'a> SnapshotProduceAction<'a> {
310310
mut self,
311311
snapshot_produce_operation: OP,
312312
process: MP,
313-
) -> Result<Transaction<'a>> {
313+
) -> Result<Transaction> {
314314
let new_manifests = self
315315
.manifest_file(&snapshot_produce_operation, &process)
316316
.await?;

crates/iceberg/src/transaction/sort_order.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ use crate::transaction::Transaction;
2121
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
2222

2323
/// Transaction action for replacing sort order.
24-
pub struct ReplaceSortOrderAction<'a> {
25-
pub tx: Transaction<'a>,
24+
pub struct ReplaceSortOrderAction {
25+
pub tx: Transaction,
2626
pub sort_fields: Vec<SortField>,
2727
}
2828

29-
impl<'a> ReplaceSortOrderAction<'a> {
29+
impl ReplaceSortOrderAction {
3030
/// Adds a field for sorting in ascending order.
3131
pub fn asc(self, name: &str, null_order: NullOrder) -> Result<Self> {
3232
self.add_sort_field(name, SortDirection::Ascending, null_order)
@@ -38,7 +38,7 @@ impl<'a> ReplaceSortOrderAction<'a> {
3838
}
3939

4040
/// Finished building the action and apply it to the transaction.
41-
pub fn apply(mut self) -> Result<Transaction<'a>> {
41+
pub fn apply(mut self) -> Result<Transaction> {
4242
let unbound_sort_order = SortOrder::builder()
4343
.with_fields(self.sort_fields)
4444
.build_unbound()?;

0 commit comments

Comments
 (0)