diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 703cbd01a..554f386db 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; use std::sync::Arc; use futures::channel::mpsc::Sender; use futures::{SinkExt, TryFutureExt}; +use itertools::Itertools; use crate::delete_file_index::DeleteFileIndex; use crate::expr::{Bind, BoundPredicate, Predicate}; @@ -28,11 +30,12 @@ use crate::scan::{ PartitionFilterCache, }; use crate::spec::{ - ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef, - TableMetadataRef, + DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, + ManifestStatus, Operation, SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::{Error, ErrorKind, Result}; +type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync; /// Wraps a [`ManifestFile`] alongside the objects that are needed /// to process it in a thread-safe manner pub(crate) struct ManifestFileContext { @@ -45,7 +48,11 @@ pub(crate) struct ManifestFileContext { object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, - delete_file_index: DeleteFileIndex, + delete_file_index: Option, + + /// filter manifest entries. + /// Used for different kind of scans, e.g., only scan newly added files without delete files. + filter_fn: Option>, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -58,7 +65,7 @@ pub(crate) struct ManifestEntryContext { pub bound_predicates: Option>, pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, - pub delete_file_index: DeleteFileIndex, + pub delete_file_index: Option, } impl ManifestFileContext { @@ -74,12 +81,13 @@ impl ManifestFileContext { mut sender, expression_evaluator_cache, delete_file_index, - .. + filter_fn, } = self; + let filter_fn = filter_fn.unwrap_or_else(|| Arc::new(|_| true)); let manifest = object_cache.get_manifest(&manifest_file).await?; - for manifest_entry in manifest.entries() { + for manifest_entry in manifest.entries().iter().filter(|e| filter_fn(e)) { let manifest_entry_context = ManifestEntryContext { // TODO: refactor to avoid the expensive ManifestEntry clone manifest_entry: manifest_entry.clone(), @@ -105,13 +113,16 @@ impl ManifestEntryContext { /// consume this `ManifestEntryContext`, returning a `FileScanTask` /// created from it pub(crate) async fn into_file_scan_task(self) -> Result { - let deletes = self - .delete_file_index - .get_deletes_for_data_file( - self.manifest_entry.data_file(), - self.manifest_entry.sequence_number(), - ) - .await; + let deletes = if let Some(delete_file_index) = self.delete_file_index { + delete_file_index + .get_deletes_for_data_file( + self.manifest_entry.data_file(), + self.manifest_entry.sequence_number(), + ) + .await + } else { + vec![] + }; Ok(FileScanTask { start: 0, @@ -150,6 +161,11 @@ pub(crate) struct PlanContext { pub partition_filter_cache: Arc, pub manifest_evaluator_cache: Arc, pub expression_evaluator_cache: Arc, + + // for incremental scan. + // If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`. + pub from_snapshot_id: Option, + pub to_snapshot_id: Option, } impl PlanContext { @@ -181,23 +197,89 @@ impl PlanContext { Ok(partition_filter) } - pub(crate) fn build_manifest_file_contexts( + pub(crate) async fn build_manifest_file_contexts( &self, - manifest_list: Arc, tx_data: Sender, - delete_file_idx: DeleteFileIndex, - delete_file_tx: Sender, + delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender)>, ) -> Result> + 'static>> { - let manifest_files = manifest_list.entries().iter(); + let mut filter_fn: Option> = None; + let manifest_files = { + if let Some(to_snapshot_id) = self.to_snapshot_id { + // Incremental scan mode: + // Get all added files between two snapshots. + // - data files in `Append` and `Overwrite` snapshots are included. + // - delete files are ignored + // - `Replace` snapshots (e.g., compaction) are ignored. + // + // `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive. + + // prevent misuse + assert!( + delete_file_idx_and_tx.is_none(), + "delete file is not supported in incremental scan mode" + ); + + let snapshots = + ancestors_between(&self.table_metadata, to_snapshot_id, self.from_snapshot_id) + .filter(|snapshot| { + matches!( + snapshot.summary().operation, + Operation::Append | Operation::Overwrite + ) + }) + .collect_vec(); + let snapshot_ids: HashSet = snapshots + .iter() + .map(|snapshot| snapshot.snapshot_id()) + .collect(); + + let mut manifest_files = vec![]; + for snapshot in snapshots { + let manifest_list = self + .object_cache + .get_manifest_list(&snapshot, &self.table_metadata) + .await?; + for entry in manifest_list.entries() { + if !snapshot_ids.contains(&entry.added_snapshot_id) { + continue; + } + manifest_files.push(entry.clone()); + } + } + + filter_fn = Some(Arc::new(move |entry: &ManifestEntryRef| { + matches!(entry.status(), ManifestStatus::Added) + && matches!(entry.data_file().content_type(), DataContentType::Data) + && ( + entry + .snapshot_id() + .map(|id| snapshot_ids.contains(&id)) + .unwrap_or(true) + // Include entries without snapshot_id + ) + })); + + manifest_files + } else { + let manifest_list = self.get_manifest_list().await?; + manifest_list.entries().to_vec() + } + }; // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; - for manifest_file in manifest_files { - let tx = if manifest_file.content == ManifestContentType::Deletes { - delete_file_tx.clone() + for manifest_file in &manifest_files { + let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes { + let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else { + continue; + }; + (Some(delete_file_idx.clone()), tx.clone()) } else { - tx_data.clone() + ( + delete_file_idx_and_tx.as_ref().map(|(idx, _)| idx.clone()), + tx_data.clone(), + ) }; let partition_bound_predicate = if self.predicate.is_some() { @@ -225,7 +307,8 @@ impl PlanContext { manifest_file, partition_bound_predicate, tx, - delete_file_idx.clone(), + delete_file_idx, + filter_fn.clone(), ); filtered_mfcs.push(Ok(mfc)); @@ -239,7 +322,8 @@ impl PlanContext { manifest_file: &ManifestFile, partition_filter: Option>, sender: Sender, - delete_file_index: DeleteFileIndex, + delete_file_index: Option, + filter_fn: Option>, ) -> ManifestFileContext { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = @@ -262,6 +346,61 @@ impl PlanContext { field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, + filter_fn, } } } + +struct Ancestors { + next: Option, + get_snapshot: Box Option + Send>, +} + +impl Iterator for Ancestors { + type Item = SnapshotRef; + + fn next(&mut self) -> Option { + let snapshot = self.next.take()?; + let result = snapshot.clone(); + self.next = snapshot + .parent_snapshot_id() + .and_then(|id| (self.get_snapshot)(id)); + Some(result) + } +} + +/// Iterate starting from `snapshot` (inclusive) to the root snapshot. +fn ancestors_of( + table_metadata: &TableMetadataRef, + snapshot: i64, +) -> Box + Send> { + if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) { + let table_metadata = table_metadata.clone(); + Box::new(Ancestors { + next: Some(snapshot.clone()), + get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()), + }) + } else { + Box::new(std::iter::empty()) + } +} + +/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive). +fn ancestors_between( + table_metadata: &TableMetadataRef, + latest_snapshot_id: i64, + oldest_snapshot_id: Option, +) -> Box + Send> { + let Some(oldest_snapshot_id) = oldest_snapshot_id else { + return Box::new(ancestors_of(table_metadata, latest_snapshot_id)); + }; + + if latest_snapshot_id == oldest_snapshot_id { + return Box::new(std::iter::empty()); + } + + Box::new( + ancestors_of(table_metadata, latest_snapshot_id) + .take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id), + ) +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 27dc1af98..251ef5166 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -51,6 +51,10 @@ pub struct TableScanBuilder<'a> { // Defaults to none which means select all columns column_names: Option>, snapshot_id: Option, + /// Exclusive. Used for incremental scan. + from_snapshot_id: Option, + /// Inclusive. Used for incremental scan. + to_snapshot_id: Option, batch_size: Option, case_sensitive: bool, filter: Option, @@ -69,6 +73,8 @@ impl<'a> TableScanBuilder<'a> { table, column_names: None, snapshot_id: None, + from_snapshot_id: None, + to_snapshot_id: None, batch_size: None, case_sensitive: true, filter: None, @@ -130,6 +136,24 @@ impl<'a> TableScanBuilder<'a> { self } + /// Set the starting snapshot id (exclusive) for incremental scan. + /// + /// # Behavior + /// - Only includes files from Append and Overwrite operations + /// - Excludes Replace operations (e.g., compaction) + /// - Only returns Added manifest entries with Data content + /// - Delete files are not supported in incremental scans + pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self { + self.from_snapshot_id = Some(from_snapshot_id); + self + } + + /// Set the ending snapshot id (inclusive) for incremental scan (See [`Self::from_snapshot_id`]). + pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self { + self.to_snapshot_id = Some(to_snapshot_id); + self + } + /// Sets the concurrency limit for both manifest files and manifest /// entries for this scan pub fn with_concurrency_limit(mut self, limit: usize) -> Self { @@ -185,6 +209,59 @@ impl<'a> TableScanBuilder<'a> { /// Build the table scan. pub fn build(self) -> Result { + // Validate that we have either a snapshot scan or an incremental scan configuration + if self.from_snapshot_id.is_some() || self.to_snapshot_id.is_some() { + // For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional. + if self.to_snapshot_id.is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Incremental scan requires to_snapshot_id to be set. Use from_snapshot_id (exclusive) and to_snapshot_id (inclusive) to specify the range.", + )); + } + + // snapshot_id should not be set for incremental scan + if self.snapshot_id.is_some() { + return Err(Error::new( + ErrorKind::DataInvalid, + "snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.", + )); + } + + let to_snapshot_id = self.to_snapshot_id.unwrap(); + + // Validate to_snapshot_id exists + if self + .table + .metadata() + .snapshot_by_id(to_snapshot_id) + .is_none() + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("to_snapshot_id {} not found", to_snapshot_id), + )); + } + + // Validate from_snapshot_id if provided + if let Some(from_id) = self.from_snapshot_id { + // Validate from_snapshot_id exists + if self.table.metadata().snapshot_by_id(from_id).is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("from_snapshot_id {} not found", from_id), + )); + } + + // Validate snapshot order + if to_snapshot_id <= from_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "to_snapshot_id must be greater than from_snapshot_id", + )); + } + } + } + let snapshot = match self.snapshot_id { Some(snapshot_id) => self .table @@ -214,7 +291,6 @@ impl<'a> TableScanBuilder<'a> { current_snapshot_id.clone() } }; - let schema = snapshot.schema(self.table.metadata())?; // Check that all column names exist in the schema. @@ -284,6 +360,8 @@ impl<'a> TableScanBuilder<'a> { snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), object_cache: self.table.object_cache(), field_ids: Arc::new(field_ids), + from_snapshot_id: self.from_snapshot_id, + to_snapshot_id: self.to_snapshot_id, partition_filter_cache: Arc::new(PartitionFilterCache::new()), manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), @@ -348,19 +426,25 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); - let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); - - let manifest_list = plan_context.get_manifest_list().await?; + // For incremental scan, disable delete file processing + let delete_file_idx_and_tx = if plan_context.to_snapshot_id.is_some() { + None + } else { + let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); + Some((delete_file_idx.clone(), delete_file_tx.clone())) + }; // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any // whose partitions cannot match this // scan's filter - let manifest_file_contexts = plan_context.build_manifest_file_contexts( - manifest_list, - manifest_entry_data_ctx_tx, - delete_file_idx.clone(), - manifest_entry_delete_ctx_tx, - )?; + let manifest_file_contexts = plan_context + .build_manifest_file_contexts( + manifest_entry_data_ctx_tx, + delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| { + (delete_file_idx.clone(), manifest_entry_delete_ctx_tx) + }), + ) + .await?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); @@ -377,32 +461,34 @@ impl TableScan { } }); - let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); - let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); - - // Process the delete file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_delete_ctx_rx - .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_delete_manifest_entry(manifest_entry_context, tx).await - }) - .await - }, - ) - .await; - - if let Err(error) = result { - let _ = channel_for_delete_manifest_entry_error - .send(Err(error)) + if let Some((_, delete_file_tx)) = delete_file_idx_and_tx { + let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); + + // Process the delete file [`ManifestEntry`] stream in parallel + spawn(async move { + let result = manifest_entry_delete_ctx_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| async move { + spawn(async move { + Self::process_delete_manifest_entry(manifest_entry_context, tx) + .await + }) + .await + }, + ) .await; - } - }) - .await; + if let Err(error) = result { + let _ = channel_for_delete_manifest_entry_error + .send(Err(error)) + .await; + } + }); + } + + let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); // Process the data file [`ManifestEntry`] stream in parallel spawn(async move { let result = manifest_entry_data_ctx_rx @@ -1804,4 +1890,228 @@ pub mod tests { }; test_fn(task); } + + #[tokio::test] + async fn test_incremental_scan() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Get the two snapshots in the table + let snapshots = fixture.table.metadata().snapshots().collect::>(); + + assert_eq!(snapshots.len(), 2, "Test fixture should have two snapshots"); + + // Determine the correct order by snapshot IDs (since validation requires to_snapshot_id > from_snapshot_id) + // Sort snapshots by snapshot_id to ensure consistent ordering + let mut snapshot_ids: Vec = snapshots.iter().map(|s| s.snapshot_id()).collect(); + snapshot_ids.sort(); + + let first_snapshot_id = snapshot_ids[0]; + let second_snapshot_id = snapshot_ids[1]; + + // Create an incremental scan from first to second snapshot + let table_scan = fixture + .table + .scan() + .from_snapshot_id(first_snapshot_id) + .to_snapshot_id(second_snapshot_id) + .build() + .unwrap(); + + // Plan files and verify we get the expected files + let tasks = table_scan + .plan_files() + .await + .unwrap() + .try_fold(vec![], |mut acc, task| async move { + acc.push(task); + Ok(acc) + }) + .await + .unwrap(); + + // Only files added between first and second snapshot should be included + // The way our test fixture is set up, the added files should be in the second snapshot + // For our test fixture, only one file should be returned by incremental scan + assert_eq!( + tasks.len(), + 1, + "Incremental scan should return only added files between snapshots" + ); + + // Verify this is the expected file (file 1.parquet which was added in the second snapshot) + assert_eq!( + tasks[0].data_file_path, + format!("{}/1.parquet", &fixture.table_location), + "Incremental scan should return the file added in the second snapshot" + ); + + // Verify we can read the data + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify data contents from 1.parquet + assert_eq!(batches.len(), 1, "Should have one record batch"); + assert_eq!(batches[0].num_rows(), 1024, "Should have 1024 rows"); + + // Verify content of some columns + let col_x = batches[0].column_by_name("x").unwrap(); + let int64_arr = col_x.as_any().downcast_ref::().unwrap(); + assert_eq!( + int64_arr.value(0), + 1, + "First value of column 'x' should be 1" + ); + + let col_a = batches[0].column_by_name("a").unwrap(); + let string_arr = col_a.as_any().downcast_ref::().unwrap(); + assert!( + string_arr.value(0) == "Apache" || string_arr.value(0) == "Iceberg", + "First value of column 'a' should be either 'Apache' or 'Iceberg'" + ); + } + + #[test] + fn test_invalid_incremental_scan_configurations() { + let table = TableTestFixture::new().table; + + // Test case 1: to_snapshot_id is required for incremental scan + let result = table + .scan() + .from_snapshot_id(1234) // Only providing from_snapshot_id + .build(); + + assert!( + result.is_err(), + "Should fail when to_snapshot_id is not set" + ); + assert_eq!( + result.unwrap_err().to_string(), + "DataInvalid => Incremental scan requires to_snapshot_id to be set. Use from_snapshot_id (exclusive) and to_snapshot_id (inclusive) to specify the range.", + "Should have correct error message for missing to_snapshot_id" + ); + + // Test case 2: snapshot_id should not be set with incremental scan + let result = table + .scan() + .snapshot_id(1234) + .from_snapshot_id(1234) + .to_snapshot_id(5678) + .build(); + + assert!( + result.is_err(), + "Should fail when snapshot_id is set with incremental scan" + ); + assert_eq!( + result.unwrap_err().to_string(), + "DataInvalid => snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.", + "Should have correct error message for setting both snapshot_id and incremental scan parameters" + ); + } + + #[test] + fn test_incremental_scan_edge_cases() { + let fixture = TableTestFixture::new(); + let table = &fixture.table; + + // Test case 1: Non-existent to_snapshot_id + let result = table + .scan() + .to_snapshot_id(999999) // Non-existent snapshot ID + .build(); + + assert!( + result.is_err(), + "Should fail when to_snapshot_id does not exist" + ); + assert_eq!( + result.unwrap_err().to_string(), + "DataInvalid => to_snapshot_id 999999 not found", + "Should have correct error message for non-existent to_snapshot_id" + ); + + // Test case 2: Non-existent from_snapshot_id + let result = table + .scan() + .from_snapshot_id(999998) // Non-existent snapshot ID + .to_snapshot_id(999999) // Non-existent snapshot ID + .build(); + + assert!( + result.is_err(), + "Should fail when to_snapshot_id does not exist" + ); + // This should fail on to_snapshot_id first since that's checked first + assert_eq!( + result.unwrap_err().to_string(), + "DataInvalid => to_snapshot_id 999999 not found", + "Should fail on to_snapshot_id check first" + ); + + // We need to set up snapshots for the remaining tests + let snapshots = table.metadata().snapshots().collect::>(); + assert_eq!(snapshots.len(), 2, "Test fixture should have two snapshots"); + + // Sort snapshots by snapshot_id to ensure consistent ordering + let mut snapshot_ids: Vec = snapshots.iter().map(|s| s.snapshot_id()).collect(); + snapshot_ids.sort(); + + let first_snapshot_id = snapshot_ids[0]; + let second_snapshot_id = snapshot_ids[1]; + + // Test case 3: from_snapshot_id doesn't exist but to_snapshot_id does + let result = table + .scan() + .from_snapshot_id(999998) // Non-existent + .to_snapshot_id(second_snapshot_id) // Existent + .build(); + + assert!( + result.is_err(), + "Should fail when from_snapshot_id does not exist" + ); + assert_eq!( + result.unwrap_err().to_string(), + "DataInvalid => from_snapshot_id 999998 not found", + "Should have correct error message for non-existent from_snapshot_id" + ); + + // Determine which snapshot is newer based on snapshot IDs + let (older_snapshot_id, newer_snapshot_id) = (first_snapshot_id, second_snapshot_id); + + // Test case 4: Reversed snapshot order (to_snapshot_id <= from_snapshot_id) + let result = table + .scan() + .from_snapshot_id(newer_snapshot_id) // Later snapshot + .to_snapshot_id(older_snapshot_id) // Earlier snapshot + .build(); + + assert!( + result.is_err(), + "Should fail when to_snapshot_id <= from_snapshot_id" + ); + assert_eq!( + result.unwrap_err().to_string(), + "DataInvalid => to_snapshot_id must be greater than from_snapshot_id", + "Should have correct error message for reversed snapshot order" + ); + + // Test case 5: Equal snapshot IDs (empty range) + let result = table + .scan() + .from_snapshot_id(older_snapshot_id) + .to_snapshot_id(older_snapshot_id) + .build(); + + assert!( + result.is_err(), + "Should fail when from_snapshot_id == to_snapshot_id" + ); + assert_eq!( + result.unwrap_err().to_string(), + "DataInvalid => to_snapshot_id must be greater than from_snapshot_id", + "Should have correct error message for equal snapshot IDs" + ); + } }