Skip to content

Commit d30ff23

Browse files
author
ZENOTME
committed
refine validate_added_data_files && validate_duplicate_files
1 parent fd1ac4f commit d30ff23

File tree

2 files changed

+19
-18
lines changed

2 files changed

+19
-18
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,6 @@ impl FastAppendAction {
8484
#[async_trait]
8585
impl TransactionAction for FastAppendAction {
8686
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
87-
// validate added files
88-
SnapshotProducer::validate_added_data_files(table, &self.added_data_files)?;
89-
90-
// Checks duplicate files
91-
if self.check_duplicate {
92-
SnapshotProducer::validate_duplicate_files(table, &self.added_data_files).await?;
93-
}
94-
9587
let snapshot_producer = SnapshotProducer::new(
9688
table,
9789
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
@@ -100,6 +92,16 @@ impl TransactionAction for FastAppendAction {
10092
self.added_data_files.clone(),
10193
);
10294

95+
// validate added files
96+
snapshot_producer.validate_added_data_files(&self.added_data_files)?;
97+
98+
// Checks duplicate files
99+
if self.check_duplicate {
100+
snapshot_producer
101+
.validate_duplicate_files(&self.added_data_files)
102+
.await?;
103+
}
104+
103105
snapshot_producer
104106
.commit(FastAppendOperation, DefaultManifestProcess)
105107
.await

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,7 @@ impl<'a> SnapshotProducer<'a> {
100100
}
101101
}
102102

103-
pub(crate) fn validate_added_data_files(
104-
table: &Table,
105-
added_data_files: &[DataFile],
106-
) -> Result<()> {
103+
pub(crate) fn validate_added_data_files(&self, added_data_files: &[DataFile]) -> Result<()> {
107104
for data_file in added_data_files {
108105
if data_file.content_type() != crate::spec::DataContentType::Data {
109106
return Err(Error::new(
@@ -112,23 +109,23 @@ impl<'a> SnapshotProducer<'a> {
112109
));
113110
}
114111
// Check if the data file partition spec id matches the table default partition spec id.
115-
if table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
112+
if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
116113
return Err(Error::new(
117114
ErrorKind::DataInvalid,
118115
"Data file partition spec id does not match table default partition spec id",
119116
));
120117
}
121118
Self::validate_partition_value(
122119
data_file.partition(),
123-
table.metadata().default_partition_type(),
120+
self.table.metadata().default_partition_type(),
124121
)?;
125122
}
126123

127124
Ok(())
128125
}
129126

130127
pub(crate) async fn validate_duplicate_files(
131-
table: &Table,
128+
&self,
132129
added_data_files: &[DataFile],
133130
) -> Result<()> {
134131
let new_files: HashSet<&str> = added_data_files
@@ -137,12 +134,14 @@ impl<'a> SnapshotProducer<'a> {
137134
.collect();
138135

139136
let mut referenced_files = Vec::new();
140-
if let Some(current_snapshot) = table.metadata().current_snapshot() {
137+
if let Some(current_snapshot) = self.table.metadata().current_snapshot() {
141138
let manifest_list = current_snapshot
142-
.load_manifest_list(table.file_io(), &table.metadata_ref())
139+
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
143140
.await?;
144141
for manifest_list_entry in manifest_list.entries() {
145-
let manifest = manifest_list_entry.load_manifest(table.file_io()).await?;
142+
let manifest = manifest_list_entry
143+
.load_manifest(self.table.file_io())
144+
.await?;
146145
for entry in manifest.entries() {
147146
let file_path = entry.file_path();
148147
if new_files.contains(file_path) && entry.is_alive() {

0 commit comments

Comments
 (0)