Skip to content

Commit bddffa1

Browse files
ErigaraRoman Shanin
andauthored
fix(iceberg): add_files correctly check duplicates (#1395)
## Which issue does this PR close? - Closes #1394. ## What changes are included in this PR? - compare duplicates by loading manifest files and taking `file_path` from it - use direct calls instead of `scan` ## Are these changes tested? - work for me local experiments - fixed existing tests - added a new test to showcase behavior Co-authored-by: Roman Shanin <rshanin@bhft.com>
1 parent d8347c7 commit bddffa1

File tree

2 files changed

+31
-109
lines changed

2 files changed

+31
-109
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 31 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
use std::collections::{HashMap, HashSet};
1919

20-
use arrow_array::StringArray;
21-
use futures::TryStreamExt;
2220
use uuid::Uuid;
2321

2422
use crate::error::Result;
@@ -129,32 +127,19 @@ impl<'a> FastAppendAction<'a> {
129127
.map(|df| df.file_path.as_str())
130128
.collect();
131129

132-
let mut manifest_stream = self
133-
.snapshot_produce_action
134-
.tx
135-
.current_table
136-
.inspect()
137-
.manifests()
138-
.scan()
139-
.await?;
140130
let mut referenced_files = Vec::new();
141-
142-
while let Some(batch) = manifest_stream.try_next().await? {
143-
let file_path_array = batch
144-
.column(1)
145-
.as_any()
146-
.downcast_ref::<StringArray>()
147-
.ok_or_else(|| {
148-
Error::new(
149-
ErrorKind::DataInvalid,
150-
"Failed to downcast file_path column to StringArray",
151-
)
152-
})?;
153-
154-
for i in 0..batch.num_rows() {
155-
let file_path = file_path_array.value(i);
156-
if new_files.contains(file_path) {
157-
referenced_files.push(file_path.to_string());
131+
let table = &self.snapshot_produce_action.tx.current_table;
132+
if let Some(current_snapshot) = table.metadata().current_snapshot() {
133+
let manifest_list = current_snapshot
134+
.load_manifest_list(table.file_io(), &table.metadata_ref())
135+
.await?;
136+
for manifest_list_entry in manifest_list.entries() {
137+
let manifest = manifest_list_entry.load_manifest(table.file_io()).await?;
138+
for entry in manifest.entries() {
139+
let file_path = entry.file_path();
140+
if new_files.contains(file_path) && entry.is_alive() {
141+
referenced_files.push(file_path.to_string());
142+
}
158143
}
159144
}
160145
}
@@ -364,81 +349,39 @@ mod tests {
364349
}
365350

366351
#[tokio::test]
367-
async fn test_add_existing_parquet_files_to_unpartitioned_table() {
352+
async fn test_add_duplicated_parquet_files_to_unpartitioned_table() {
368353
let mut fixture = TableTestFixture::new_unpartitioned();
369354
fixture.setup_unpartitioned_manifest_files().await;
370355
let tx = crate::transaction::Transaction::new(&fixture.table);
371356

372357
let file_paths = vec![
373358
format!("{}/1.parquet", &fixture.table_location),
374-
format!("{}/2.parquet", &fixture.table_location),
375359
format!("{}/3.parquet", &fixture.table_location),
376360
];
377361

378362
let fast_append_action = tx.fast_append(None, vec![]).unwrap();
379363

380-
// Attempt to add the existing Parquet files with fast append.
381-
let new_tx = fast_append_action
382-
.add_parquet_files(file_paths.clone())
383-
.await
384-
.expect("Adding existing Parquet files should succeed");
385-
386-
let mut found_add_snapshot = false;
387-
let mut found_set_snapshot_ref = false;
388-
for update in new_tx.updates.iter() {
389-
match update {
390-
TableUpdate::AddSnapshot { .. } => {
391-
found_add_snapshot = true;
392-
}
393-
TableUpdate::SetSnapshotRef {
394-
ref_name,
395-
reference,
396-
} => {
397-
found_set_snapshot_ref = true;
398-
assert_eq!(ref_name, MAIN_BRANCH);
399-
assert!(reference.snapshot_id > 0);
400-
}
401-
_ => {}
402-
}
403-
}
404-
assert!(found_add_snapshot);
405-
assert!(found_set_snapshot_ref);
406-
407-
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &new_tx.updates[0] {
408-
snapshot
409-
} else {
410-
panic!("Expected the first update to be an AddSnapshot update");
411-
};
412-
413-
let manifest_list = new_snapshot
414-
.load_manifest_list(fixture.table.file_io(), fixture.table.metadata())
415-
.await
416-
.expect("Failed to load manifest list");
417-
418-
assert_eq!(
419-
manifest_list.entries().len(),
420-
2,
421-
"Expected 2 manifest list entries, got {}",
422-
manifest_list.entries().len()
364+
// Attempt to add duplicated Parquet files with fast append.
365+
assert!(
366+
fast_append_action
367+
.add_parquet_files(file_paths.clone())
368+
.await
369+
.is_err(),
370+
"file already in table"
423371
);
424372

425-
// Load the manifest from the manifest list
426-
let manifest = manifest_list.entries()[0]
427-
.load_manifest(fixture.table.file_io())
428-
.await
429-
.expect("Failed to load manifest");
373+
let file_paths = vec![format!("{}/2.parquet", &fixture.table_location)];
430374

431-
// Check that the manifest contains three entries.
432-
assert_eq!(manifest.entries().len(), 3);
375+
let tx = crate::transaction::Transaction::new(&fixture.table);
376+
let fast_append_action = tx.fast_append(None, vec![]).unwrap();
433377

434-
// Verify each file path appears in manifest.
435-
let manifest_paths: Vec<String> = manifest
436-
.entries()
437-
.iter()
438-
.map(|entry| entry.data_file().file_path.clone())
439-
.collect();
440-
for path in file_paths {
441-
assert!(manifest_paths.contains(&path));
442-
}
378+
// Attempt to add Parquet file which was deleted from table.
379+
assert!(
380+
fast_append_action
381+
.add_parquet_files(file_paths.clone())
382+
.await
383+
.is_ok(),
384+
"file not in table"
385+
);
443386
}
444387
}

crates/integration_tests/tests/shared_tests/append_data_file_test.rs

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -129,25 +129,4 @@ async fn test_append_data_file() {
129129
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
130130
assert_eq!(batches.len(), 1);
131131
assert_eq!(batches[0], batch);
132-
133-
// commit result again
134-
let tx = Transaction::new(&table);
135-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
136-
append_action.add_data_files(data_file.clone()).unwrap();
137-
let tx = append_action.apply().await.unwrap();
138-
let table = tx.commit(&rest_catalog).await.unwrap();
139-
140-
// check result again
141-
let batch_stream = table
142-
.scan()
143-
.select_all()
144-
.build()
145-
.unwrap()
146-
.to_arrow()
147-
.await
148-
.unwrap();
149-
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
150-
assert_eq!(batches.len(), 2);
151-
assert_eq!(batches[0], batch);
152-
assert_eq!(batches[1], batch);
153132
}

0 commit comments

Comments
 (0)