Skip to content

Commit f0c5d3d

Browse files
authored
feat(transaction): Implement TransactionAction for ReplaceSortOrderAction (#1441)
## Which issue does this PR close? Related Issues: - #1382 [EPIC] - #1386 - #1387 - #1388 - #1389 ## What changes are included in this PR? - Implement `TransactionAction` trait for `ReplaceSortOrderAction` ## Are these changes tested? Added unit tests
1 parent 1725a3b commit f0c5d3d

File tree

2 files changed

+114
-87
lines changed

2 files changed

+114
-87
lines changed

crates/iceberg/src/transaction/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,8 @@ impl Transaction {
158158
}
159159

160160
/// Creates replace sort order action.
161-
pub fn replace_sort_order(self) -> ReplaceSortOrderAction {
162-
ReplaceSortOrderAction {
163-
tx: self,
164-
sort_fields: vec![],
165-
}
161+
pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
162+
ReplaceSortOrderAction::new()
166163
}
167164

168165
/// Set the location of table

crates/iceberg/src/transaction/sort_order.rs

Lines changed: 112 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -15,128 +15,158 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::sync::Arc;
19+
20+
use async_trait::async_trait;
21+
1822
use crate::error::Result;
19-
use crate::spec::{NullOrder, SortDirection, SortField, SortOrder, Transform};
20-
use crate::transaction::Transaction;
23+
use crate::spec::{NullOrder, SchemaRef, SortDirection, SortField, SortOrder, Transform};
24+
use crate::table::Table;
25+
use crate::transaction::{ActionCommit, TransactionAction};
2126
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
2227

28+
/// Represents a sort field whose construction and validation are deferred until commit time.
29+
/// This avoids the need to pass a `Table` reference into methods like `asc` or `desc` when
30+
/// adding sort orders.
31+
#[derive(Debug, PartialEq, Eq, Clone)]
32+
struct PendingSortField {
33+
name: String,
34+
direction: SortDirection,
35+
null_order: NullOrder,
36+
}
37+
38+
impl PendingSortField {
39+
fn to_sort_field(&self, schema: &SchemaRef) -> Result<SortField> {
40+
let field_id = schema.field_id_by_name(self.name.as_str()).ok_or_else(|| {
41+
Error::new(
42+
ErrorKind::DataInvalid,
43+
format!("Cannot find field {} in table schema", self.name),
44+
)
45+
})?;
46+
47+
Ok(SortField::builder()
48+
.source_id(field_id)
49+
.transform(Transform::Identity)
50+
.direction(self.direction)
51+
.null_order(self.null_order)
52+
.build())
53+
}
54+
}
55+
2356
/// Transaction action for replacing sort order.
2457
pub struct ReplaceSortOrderAction {
25-
pub tx: Transaction,
26-
pub sort_fields: Vec<SortField>,
58+
pending_sort_fields: Vec<PendingSortField>,
2759
}
2860

2961
impl ReplaceSortOrderAction {
62+
pub fn new() -> Self {
63+
ReplaceSortOrderAction {
64+
pending_sort_fields: vec![],
65+
}
66+
}
67+
3068
/// Adds a field for sorting in ascending order.
31-
pub fn asc(self, name: &str, null_order: NullOrder) -> Result<Self> {
69+
pub fn asc(self, name: &str, null_order: NullOrder) -> Self {
3270
self.add_sort_field(name, SortDirection::Ascending, null_order)
3371
}
3472

3573
/// Adds a field for sorting in descending order.
36-
pub fn desc(self, name: &str, null_order: NullOrder) -> Result<Self> {
74+
pub fn desc(self, name: &str, null_order: NullOrder) -> Self {
3775
self.add_sort_field(name, SortDirection::Descending, null_order)
3876
}
3977

40-
/// Finished building the action and apply it to the transaction.
41-
pub fn apply(mut self) -> Result<Transaction> {
42-
let unbound_sort_order = SortOrder::builder()
43-
.with_fields(self.sort_fields)
44-
.build_unbound()?;
78+
fn add_sort_field(
79+
mut self,
80+
name: &str,
81+
sort_direction: SortDirection,
82+
null_order: NullOrder,
83+
) -> Self {
84+
self.pending_sort_fields.push(PendingSortField {
85+
name: name.to_string(),
86+
direction: sort_direction,
87+
null_order,
88+
});
89+
90+
self
91+
}
92+
}
93+
94+
impl Default for ReplaceSortOrderAction {
95+
fn default() -> Self {
96+
Self::new()
97+
}
98+
}
99+
100+
#[async_trait]
101+
impl TransactionAction for ReplaceSortOrderAction {
102+
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
103+
let current_schema = table.metadata().current_schema();
104+
let sort_fields: Result<Vec<SortField>> = self
105+
.pending_sort_fields
106+
.iter()
107+
.map(|p| p.to_sort_field(current_schema))
108+
.collect();
109+
110+
let bound_sort_order = SortOrder::builder()
111+
.with_fields(sort_fields?)
112+
.build(current_schema)?;
45113

46114
let updates = vec![
47115
TableUpdate::AddSortOrder {
48-
sort_order: unbound_sort_order,
116+
sort_order: bound_sort_order,
49117
},
50118
TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
51119
];
52120

53121
let requirements = vec![
54122
TableRequirement::CurrentSchemaIdMatch {
55-
current_schema_id: self
56-
.tx
57-
.current_table
58-
.metadata()
59-
.current_schema()
60-
.schema_id(),
123+
current_schema_id: current_schema.schema_id(),
61124
},
62125
TableRequirement::DefaultSortOrderIdMatch {
63-
default_sort_order_id: self
64-
.tx
65-
.current_table
66-
.metadata()
67-
.default_sort_order()
68-
.order_id,
126+
default_sort_order_id: table.metadata().default_sort_order().order_id,
69127
},
70128
];
71129

72-
self.tx.apply(updates, requirements)?;
73-
74-
Ok(self.tx)
75-
}
76-
77-
fn add_sort_field(
78-
mut self,
79-
name: &str,
80-
sort_direction: SortDirection,
81-
null_order: NullOrder,
82-
) -> Result<Self> {
83-
let field_id = self
84-
.tx
85-
.current_table
86-
.metadata()
87-
.current_schema()
88-
.field_id_by_name(name)
89-
.ok_or_else(|| {
90-
Error::new(
91-
ErrorKind::DataInvalid,
92-
format!("Cannot find field {} in table schema", name),
93-
)
94-
})?;
95-
96-
let sort_field = SortField::builder()
97-
.source_id(field_id)
98-
.transform(Transform::Identity)
99-
.direction(sort_direction)
100-
.null_order(null_order)
101-
.build();
102-
103-
self.sort_fields.push(sort_field);
104-
Ok(self)
130+
Ok(ActionCommit::new(updates, requirements))
105131
}
106132
}
107133

108134
#[cfg(test)]
109135
mod tests {
110-
use crate::transaction::Transaction;
136+
use as_any::Downcast;
137+
138+
use crate::spec::{NullOrder, SortDirection};
139+
use crate::transaction::sort_order::{PendingSortField, ReplaceSortOrderAction};
111140
use crate::transaction::tests::make_v2_table;
112-
use crate::{TableRequirement, TableUpdate};
141+
use crate::transaction::{ApplyTransactionAction, Transaction};
113142

114143
#[test]
115144
fn test_replace_sort_order() {
116145
let table = make_v2_table();
117146
let tx = Transaction::new(&table);
118-
let tx = tx.replace_sort_order().apply().unwrap();
119-
120-
assert_eq!(
121-
vec![
122-
TableUpdate::AddSortOrder {
123-
sort_order: Default::default()
124-
},
125-
TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }
126-
],
127-
tx.updates
128-
);
129-
130-
assert_eq!(
131-
vec![
132-
TableRequirement::CurrentSchemaIdMatch {
133-
current_schema_id: 1
134-
},
135-
TableRequirement::DefaultSortOrderIdMatch {
136-
default_sort_order_id: 3
137-
}
138-
],
139-
tx.requirements
140-
);
147+
let replace_sort_order = tx.replace_sort_order();
148+
149+
let tx = replace_sort_order
150+
.asc("x", NullOrder::First)
151+
.desc("y", NullOrder::Last)
152+
.apply(tx)
153+
.unwrap();
154+
155+
let replace_sort_order = (*tx.actions[0])
156+
.downcast_ref::<ReplaceSortOrderAction>()
157+
.unwrap();
158+
159+
assert_eq!(replace_sort_order.pending_sort_fields, vec![
160+
PendingSortField {
161+
name: String::from("x"),
162+
direction: SortDirection::Ascending,
163+
null_order: NullOrder::First,
164+
},
165+
PendingSortField {
166+
name: String::from("y"),
167+
direction: SortDirection::Descending,
168+
null_order: NullOrder::Last,
169+
}
170+
]);
141171
}
142172
}

0 commit comments

Comments
 (0)