diff --git a/Cargo.lock b/Cargo.lock index a356bb387..effd90610 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -593,6 +593,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "async-task" version = "4.7.1" @@ -1534,7 +1556,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -3499,6 +3521,7 @@ dependencies = [ "arrow-string", "as-any", "async-std", + "async-stream", "async-trait", "base64 0.22.1", "bimap", @@ -4202,7 +4225,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -7718,7 +7741,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fd83fd55c..d813742bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ arrow-select = { version = "55" } arrow-string = { version = "55" } as-any = "0.3.2" async-std = "1.12" +async-stream = "0.3" async-trait = "0.1.88" aws-config = "1.6.1" aws-sdk-glue = "1.39" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 52f5b4d8b..c20179718 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -56,6 +56,7 @@ arrow-select = { workspace = true } arrow-string = { workspace = true } as-any = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } +async-stream = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } bimap = { workspace = true } diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index d8f7a872e..5d5d48055 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -16,131 +16,45 @@ // under the License. use std::collections::HashMap; -use std::ops::Deref; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; -use futures::StreamExt; -use futures::channel::mpsc::{Sender, channel}; -use tokio::sync::Notify; - -use crate::runtime::spawn; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; /// Index of delete files -#[derive(Debug, Clone)] +#[derive(Debug, Default)] pub(crate) struct DeleteFileIndex { - state: Arc>, -} - -#[derive(Debug)] -enum DeleteFileIndexState { - Populating(Arc), - Populated(PopulatedDeleteFileIndex), -} - -#[derive(Debug)] -struct PopulatedDeleteFileIndex { #[allow(dead_code)] global_deletes: Vec>, eq_deletes_by_partition: HashMap>>, pos_deletes_by_partition: HashMap>>, - // TODO: do we need this? - // pos_deletes_by_path: HashMap>>, - // TODO: Deletion Vector support } -impl DeleteFileIndex { - /// create a new `DeleteFileIndex` along with the sender that populates it with delete files - pub(crate) fn new() -> (DeleteFileIndex, Sender) { - // TODO: what should the channel limit be? - let (tx, rx) = channel(10); - let notify = Arc::new(Notify::new()); - let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating( - notify.clone(), - ))); - let delete_file_stream = rx.boxed(); - - spawn({ - let state = state.clone(); - async move { - let delete_files = delete_file_stream.collect::>().await; - - let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files); - - { - let mut guard = state.write().unwrap(); - *guard = DeleteFileIndexState::Populated(populated_delete_file_index); - } - notify.notify_waiters(); - } - }); - - (DeleteFileIndex { state }, tx) - } - - /// Gets all the delete files that apply to the specified data file. - pub(crate) async fn get_deletes_for_data_file( - &self, - data_file: &DataFile, - seq_num: Option, - ) -> Vec { - let notifier = { - let guard = self.state.read().unwrap(); - match *guard { - DeleteFileIndexState::Populating(ref notifier) => notifier.clone(), - DeleteFileIndexState::Populated(ref index) => { - return index.get_deletes_for_data_file(data_file, seq_num); - } - } - }; - - notifier.notified().await; - - let guard = self.state.read().unwrap(); - match guard.deref() { - DeleteFileIndexState::Populated(index) => { - index.get_deletes_for_data_file(data_file, seq_num) - } - _ => unreachable!("Cannot be any other state than loaded"), - } - } -} - -impl PopulatedDeleteFileIndex { - /// Creates a new populated delete file index from a list of delete file contexts, which - /// allows for fast lookup when determining which delete files apply to a given data file. - /// - /// 1. The partition information is extracted from each delete file's manifest entry. - /// 2. If the partition is empty and the delete file is not a positional delete, - /// it is added to the `global_deletes` vector - /// 3. Otherwise, the delete file is added to one of two hash maps based on its content type. - fn new(files: Vec) -> PopulatedDeleteFileIndex { - let mut eq_deletes_by_partition: HashMap>> = - HashMap::default(); - let mut pos_deletes_by_partition: HashMap>> = - HashMap::default(); - - let mut global_deletes: Vec> = vec![]; - - files.into_iter().for_each(|ctx| { +impl Extend for DeleteFileIndex { + fn extend>(&mut self, iter: T) { + // 1. The partition information is extracted from each delete file's manifest entry. + // 2. If the partition is empty and the delete file is not a positional delete, + // it is added to the `global_deletes` vector + // 3. Otherwise, the delete file is added to one of two hash maps based on its content type. + for ctx in iter { let arc_ctx = Arc::new(ctx); let partition = arc_ctx.manifest_entry.data_file().partition(); - // The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes". + // The spec states that "Equality delete files stored with an unpartitioned spec + // are applied as global deletes". if partition.fields().is_empty() { // TODO: confirm we're good to skip here if we encounter a pos del if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes { - global_deletes.push(arc_ctx); - return; + self.global_deletes.push(arc_ctx); + continue; } } let destination_map = match arc_ctx.manifest_entry.content_type() { - DataContentType::PositionDeletes => &mut pos_deletes_by_partition, - DataContentType::EqualityDeletes => &mut eq_deletes_by_partition, + DataContentType::PositionDeletes => &mut self.pos_deletes_by_partition, + DataContentType::EqualityDeletes => &mut self.eq_deletes_by_partition, _ => unreachable!(), }; @@ -150,17 +64,13 @@ impl PopulatedDeleteFileIndex { entry.push(arc_ctx.clone()); }) .or_insert(vec![arc_ctx.clone()]); - }); - - PopulatedDeleteFileIndex { - global_deletes, - eq_deletes_by_partition, - pos_deletes_by_partition, } } +} +impl DeleteFileIndex { /// Determine all the delete files that apply to the provided `DataFile`. - fn get_deletes_for_data_file( + pub(crate) fn get_deletes_for_data_file( &self, data_file: &DataFile, seq_num: Option, diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 1e4ef41b2..26b4c0241 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -17,8 +17,8 @@ use std::sync::Arc; -use futures::channel::mpsc::Sender; -use futures::{SinkExt, TryFutureExt}; +use futures::StreamExt; +use futures::stream::BoxStream; use crate::delete_file_index::DeleteFileIndex; use crate::expr::{Bind, BoundPredicate, Predicate}; @@ -37,81 +37,73 @@ use crate::{Error, ErrorKind, Result}; /// to process it in a thread-safe manner pub(crate) struct ManifestFileContext { manifest_file: ManifestFile, - - sender: Sender, - field_ids: Arc>, bound_predicates: Option>, object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, - delete_file_index: DeleteFileIndex, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed /// to process it in a thread-safe manner pub(crate) struct ManifestEntryContext { pub manifest_entry: ManifestEntryRef, - pub expression_evaluator_cache: Arc, pub field_ids: Arc>, pub bound_predicates: Option>, pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, - pub delete_file_index: DeleteFileIndex, } impl ManifestFileContext { /// Consumes this [`ManifestFileContext`], fetching its Manifest from FileIO and then - /// streaming its constituent [`ManifestEntries`] to the channel provided in the context - pub(crate) async fn fetch_manifest_and_stream_manifest_entries(self) -> Result<()> { + /// streaming its constituent [`ManifestEntries`] + pub(crate) async fn fetch_manifest_and_stream_entries( + self, + ) -> Result>> { let ManifestFileContext { object_cache, manifest_file, bound_predicates, snapshot_schema, field_ids, - mut sender, expression_evaluator_cache, - delete_file_index, .. } = self; let manifest = object_cache.get_manifest(&manifest_file).await?; - for manifest_entry in manifest.entries() { - let manifest_entry_context = ManifestEntryContext { - // TODO: refactor to avoid the expensive ManifestEntry clone - manifest_entry: manifest_entry.clone(), - expression_evaluator_cache: expression_evaluator_cache.clone(), - field_ids: field_ids.clone(), - partition_spec_id: manifest_file.partition_spec_id, - bound_predicates: bound_predicates.clone(), - snapshot_schema: snapshot_schema.clone(), - delete_file_index: delete_file_index.clone(), - }; - - sender - .send(manifest_entry_context) - .map_err(|_| Error::new(ErrorKind::Unexpected, "mpsc channel SendError")) - .await?; + Ok(async_stream::stream! { + for manifest_entry in manifest.entries() { + yield Ok(ManifestEntryContext { + manifest_entry: manifest_entry.clone(), + expression_evaluator_cache: expression_evaluator_cache.clone(), + field_ids: field_ids.clone(), + partition_spec_id: manifest_file.partition_spec_id, + bound_predicates: bound_predicates.clone(), + snapshot_schema: snapshot_schema.clone(), + }); + } } + .boxed()) + } - Ok(()) + pub(crate) fn is_delete(&self) -> bool { + self.manifest_file.content == ManifestContentType::Deletes } } impl ManifestEntryContext { /// consume this `ManifestEntryContext`, returning a `FileScanTask` /// created from it - pub(crate) async fn into_file_scan_task(self) -> Result { - let deletes = self - .delete_file_index - .get_deletes_for_data_file( - self.manifest_entry.data_file(), - self.manifest_entry.sequence_number(), - ) - .await; + pub(crate) fn into_file_scan_task( + self, + delete_file_index: Arc, + ) -> Result { + let deletes = delete_file_index.get_deletes_for_data_file( + self.manifest_entry.data_file(), + self.manifest_entry.sequence_number(), + ); Ok(FileScanTask { start: 0, @@ -134,7 +126,7 @@ impl ManifestEntryContext { /// PlanContext wraps a [`SnapshotRef`] alongside all the other /// objects that are required to perform a scan file plan. -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) struct PlanContext { pub snapshot: SnapshotRef, @@ -180,66 +172,46 @@ impl PlanContext { Ok(partition_filter) } - pub(crate) fn build_manifest_file_contexts( + pub(crate) fn build_manifest_file_context_iter( &self, manifest_list: Arc, - tx_data: Sender, - delete_file_idx: DeleteFileIndex, - delete_file_tx: Sender, - ) -> Result> + 'static>> { - let manifest_files = manifest_list.entries().iter(); - - // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. - let mut filtered_mfcs = vec![]; - - for manifest_file in manifest_files { - let tx = if manifest_file.content == ManifestContentType::Deletes { - delete_file_tx.clone() - } else { - tx_data.clone() - }; - - let partition_bound_predicate = if self.predicate.is_some() { - let partition_bound_predicate = self.get_partition_filter(manifest_file)?; - - // evaluate the ManifestFile against the partition filter. Skip - // if it cannot contain any matching rows - if !self - .manifest_evaluator_cache - .get( - manifest_file.partition_spec_id, - partition_bound_predicate.clone(), - ) - .eval(manifest_file)? - { - continue; - } - - Some(partition_bound_predicate) - } else { - None - }; - - let mfc = self.create_manifest_file_context( - manifest_file, - partition_bound_predicate, - tx, - delete_file_idx.clone(), - ); - - filtered_mfcs.push(Ok(mfc)); - } - - Ok(Box::new(filtered_mfcs.into_iter())) + ) -> impl Iterator> { + let has_predicate = self.predicate.is_some(); + + (0..manifest_list.entries().len()) + .map(move |i| manifest_list.entries()[i].clone()) + .filter_map(move |manifest_file| { + // TODO: replace closure when `try_blocks` stabilizes + (|| { + let partition_bound_predicate = if has_predicate { + let predicate = self.get_partition_filter(&manifest_file)?; + + if !self + .manifest_evaluator_cache + .get(manifest_file.partition_spec_id, predicate.clone()) + .eval(&manifest_file)? + { + return Ok(None); // Skip this file. + } + Some(predicate) + } else { + None + }; + + let context = self + .create_manifest_file_context(manifest_file, partition_bound_predicate)?; + + Ok(Some(context)) + })() + .transpose() + }) } fn create_manifest_file_context( &self, - manifest_file: &ManifestFile, + manifest_file: ManifestFile, partition_filter: Option>, - sender: Sender, - delete_file_index: DeleteFileIndex, - ) -> ManifestFileContext { + ) -> Result { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = (partition_filter, &self.snapshot_bound_predicate) @@ -252,15 +224,13 @@ impl PlanContext { None }; - ManifestFileContext { - manifest_file: manifest_file.clone(), + Ok(ManifestFileContext { + manifest_file, bound_predicates, - sender, object_cache: self.object_cache.clone(), snapshot_schema: self.snapshot_schema.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), - delete_file_index, - } + }) } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e987de859..43cc5b635 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -26,9 +26,8 @@ mod task; use std::sync::Arc; use arrow_array::RecordBatch; -use futures::channel::mpsc::{Sender, channel}; use futures::stream::BoxStream; -use futures::{SinkExt, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; pub use task::*; use crate::arrow::ArrowReaderBuilder; @@ -36,7 +35,6 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; -use crate::runtime::spawn; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::utils::available_parallelism; @@ -336,94 +334,33 @@ impl TableScan { return Ok(Box::pin(futures::stream::empty())); }; - let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files; - let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; - - // used to stream ManifestEntryContexts between stages of the file plan operation - let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = - channel(concurrency_limit_manifest_files); - let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = - channel(concurrency_limit_manifest_files); - - // used to stream the results back to the caller - let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); - - let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); - let manifest_list = plan_context.get_manifest_list().await?; - // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any - // whose partitions cannot match this - // scan's filter - let manifest_file_contexts = plan_context.build_manifest_file_contexts( - manifest_list, - manifest_entry_data_ctx_tx, - delete_file_idx.clone(), - manifest_entry_delete_ctx_tx, - )?; - - let mut channel_for_manifest_error = file_scan_task_tx.clone(); - - // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s - spawn(async move { - let result = futures::stream::iter(manifest_file_contexts) - .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move { - ctx.fetch_manifest_and_stream_manifest_entries().await - }) - .await; - - if let Err(error) = result { - let _ = channel_for_manifest_error.send(Err(error)).await; - } - }); - - let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); - let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); - - // Process the delete file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_delete_ctx_rx - .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_delete_manifest_entry(manifest_entry_context, tx).await - }) - .await - }, - ) - .await; - - if let Err(error) = result { - let _ = channel_for_delete_manifest_entry_error - .send(Err(error)) - .await; - } - }) - .await; - - // Process the data file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_data_ctx_rx - .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_data_manifest_entry(manifest_entry_context, tx).await - }) - .await - }, - ) - .await; - - if let Err(error) = result { - let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; - } - }); - - Ok(file_scan_task_rx.boxed()) + let (delete_contexts, data_contexts): (Vec<_>, Vec<_>) = plan_context + .build_manifest_file_context_iter(manifest_list) + .partition(|ctx| ctx.as_ref().map_or(true, |ctx| ctx.is_delete())); + + let delete_file_index: DeleteFileIndex = TableScan::process_manifest_contexts( + delete_contexts, + self.concurrency_limit_manifest_files, + self.concurrency_limit_manifest_entries, + |ctx| async move { Self::process_delete_manifest_entry(ctx) }, + ) + .try_collect() + .await?; + + let delete_file_index = Arc::new(delete_file_index); + + Ok(TableScan::process_manifest_contexts( + data_contexts, + self.concurrency_limit_manifest_files, + self.concurrency_limit_manifest_entries, + move |ctx| { + let delete_file_index = delete_file_index.clone(); + async move { Self::process_data_manifest_entry(ctx, delete_file_index) } + }, + ) + .boxed()) } /// Returns an [`ArrowRecordBatchStream`]. @@ -453,13 +390,44 @@ impl TableScan { self.plan_context.as_ref().map(|x| &x.snapshot) } - async fn process_data_manifest_entry( - manifest_entry_context: ManifestEntryContext, - mut file_scan_task_tx: Sender>, - ) -> Result<()> { + /// Helper method to process manifest file contexts into a stream of results + fn process_manifest_contexts( + contexts: Vec>, + concurrency_limit_manifest_files: usize, + concurrency_limit_manifest_entries: usize, + processor: F, + ) -> impl Stream> + where + F: Fn(Result) -> Fut + Send + Sync + 'static + Clone, + Fut: Future>> + Send + 'static, + T: Send + 'static, + { + futures::stream::iter(contexts) + .map(|ctx: Result| async move { + match ctx { + Ok(ctx) => ctx.fetch_manifest_and_stream_entries().await, + Err(error) => Err(error), + } + }) + .buffer_unordered(concurrency_limit_manifest_files) + .try_flatten_unordered(None) + .map(move |ctx| { + let processor = processor.clone(); + async move { processor(ctx).await } + }) + .buffer_unordered(concurrency_limit_manifest_entries) + .try_filter_map(|opt_task| async move { Ok(opt_task) }) + } + + fn process_data_manifest_entry( + manifest_entry_context: Result, + delete_file_index: Arc, + ) -> Result> { + let manifest_entry_context = manifest_entry_context?; + // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { - return Ok(()); + return Ok(None); } // abort the plan if we encounter a manifest entry for a delete file @@ -487,7 +455,7 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { - return Ok(()); + return Ok(None); } // skip any data file whose metrics don't match this scan's filter @@ -496,27 +464,26 @@ impl TableScan { manifest_entry_context.manifest_entry.data_file(), false, )? { - return Ok(()); + return Ok(None); } } // congratulations! the manifest entry has made its way through the // entire plan without getting filtered out. Create a corresponding // FileScanTask and push it to the result stream - file_scan_task_tx - .send(Ok(manifest_entry_context.into_file_scan_task().await?)) - .await?; - - Ok(()) + Ok(Some( + manifest_entry_context.into_file_scan_task(delete_file_index)?, + )) } - async fn process_delete_manifest_entry( - manifest_entry_context: ManifestEntryContext, - mut delete_file_ctx_tx: Sender, - ) -> Result<()> { + fn process_delete_manifest_entry( + manifest_entry_context: Result, + ) -> Result> { + let manifest_entry_context = manifest_entry_context?; + // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { - return Ok(()); + return Ok(None); } // abort the plan if we encounter a manifest entry that is not for a delete file @@ -539,18 +506,14 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { - return Ok(()); + return Ok(None); } } - delete_file_ctx_tx - .send(DeleteFileContext { - manifest_entry: manifest_entry_context.manifest_entry.clone(), - partition_spec_id: manifest_entry_context.partition_spec_id, - }) - .await?; - - Ok(()) + Ok(Some(DeleteFileContext { + manifest_entry: manifest_entry_context.manifest_entry.clone(), + partition_spec_id: manifest_entry_context.partition_spec_id, + })) } }