diff --git a/Cargo.lock b/Cargo.lock index 786b1b726..3ab1495bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3534,6 +3534,7 @@ dependencies = [ "tera", "thrift", "tokio", + "tracing", "typed-builder 0.20.1", "url", "uuid", diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 52f5b4d8b..e3425efb5 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -90,6 +90,7 @@ typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } zstd = { workspace = true } +tracing = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index d8f7a872e..535b6dd22 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -21,9 +21,10 @@ use std::sync::{Arc, RwLock}; use futures::StreamExt; use futures::channel::mpsc::{Sender, channel}; +use itertools::Itertools; use tokio::sync::Notify; -use crate::runtime::spawn; +use crate::runtime::{JoinHandle, spawn}; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; @@ -51,33 +52,52 @@ struct PopulatedDeleteFileIndex { // TODO: Deletion Vector support } +#[derive(Debug)] +pub(crate) struct DeleteIndexMetrics { + pub(crate) indexed_delete_files: u32, + pub(crate) equality_delete_files: u32, + pub(crate) positional_delete_files: u32, +} + impl DeleteFileIndex { - /// create a new `DeleteFileIndex` along with the sender that populates it with delete files - pub(crate) fn new() -> (DeleteFileIndex, Sender) { + /// Create a new `DeleteFileIndex` along with the sender that populates it + /// with delete files + /// + /// It will asynchronously wait for all delete files to come in before it + /// starts indexing. + pub(crate) fn new() -> ( + DeleteFileIndex, + Sender, + JoinHandle, + ) { // TODO: what should the channel limit be? - let (tx, rx) = channel(10); + let (delete_file_tx, delete_file_rx) = channel(10); let notify = Arc::new(Notify::new()); let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating( notify.clone(), ))); - let delete_file_stream = rx.boxed(); + let delete_file_stream = delete_file_rx.boxed(); - spawn({ + let metrics_handle = spawn({ let state = state.clone(); async move { let delete_files = delete_file_stream.collect::>().await; let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files); + let metrics = populated_delete_file_index.metrics(); + { let mut guard = state.write().unwrap(); *guard = DeleteFileIndexState::Populated(populated_delete_file_index); } notify.notify_waiters(); + + metrics } }); - (DeleteFileIndex { state }, tx) + (DeleteFileIndex { state }, delete_file_tx, metrics_handle) } /// Gets all the delete files that apply to the specified data file. @@ -207,4 +227,21 @@ impl PopulatedDeleteFileIndex { results } + + fn metrics(&self) -> DeleteIndexMetrics { + // We count both partitioned and globally applied equality deletes. + let equality_delete_files = + flattened_len(&self.eq_deletes_by_partition) + self.global_deletes.len() as u32; + let positional_delete_files = flattened_len(&self.pos_deletes_by_partition); + + DeleteIndexMetrics { + indexed_delete_files: equality_delete_files + positional_delete_files, + equality_delete_files, + positional_delete_files, + } + } +} + +fn flattened_len(map: &HashMap>>) -> u32 { + map.values().flatten().try_len().unwrap_or(0) as u32 } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 63af25e9a..b2deccf60 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -70,11 +70,11 @@ pub mod table; mod avro; pub mod cache; -pub mod io; -pub mod spec; - pub mod inspect; +pub mod io; +pub mod metrics; pub mod scan; +pub mod spec; pub mod expr; pub mod transaction; diff --git a/crates/iceberg/src/metrics.rs b/crates/iceberg/src/metrics.rs new file mode 100644 index 000000000..6f37038a5 --- /dev/null +++ b/crates/iceberg/src/metrics.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the metrics reporting API for Iceberg. +//! +//! It is used to report table operations in a pluggable way. See the [docs] +//! for more details. +//! +//! [docs] https://iceberg.apache.org/docs/latest/metrics-reporting + +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tracing::info; + +use crate::TableIdent; +use crate::expr::Predicate; +use crate::spec::SchemaId; + +/// This trait defines the API for reporting metrics of table operations. +/// +/// Refer to the [Iceberg docs] for details. +/// +/// [Iceberg docs]: https://iceberg.apache.org/docs/latest/metrics-reporting/ +#[async_trait] +pub(crate) trait MetricsReporter: Debug + Send + Sync { + /// Indicates that an operation is done by reporting a MetricsReport. + /// + /// Any errors are expected to be handled internally. + async fn report(&self, report: MetricsReport); +} + +/// An enum of all metrics reports. +#[derive(Debug)] +pub(crate) enum MetricsReport { + /// A Table Scan report that contains all relevant information from a Table Scan. + Scan { + table: TableIdent, + snapshot_id: i64, + schema_id: SchemaId, + + /// If None, the scan is an unfiltered full table scan. + filter: Option>, + + /// If None, the scan projects all fields. + // TODO: We could default to listing all field names in those cases: check what Java is doing. + projected_field_names: Option>, + projected_field_ids: Arc>, + + metrics: Arc, + metadata: HashMap, + }, +} + +/// Carries all metrics for a particular scan. +#[derive(Debug)] +pub(crate) struct ScanMetrics { + pub(crate) total_planning_duration: Duration, + + // Manifest-level metrics, computed by walking the snapshot's manifest list + // file entries and checking which manifests match the scan's predicates. + pub(crate) total_data_manifests: u32, + pub(crate) total_delete_manifests: u32, + pub(crate) skipped_data_manifests: u32, + pub(crate) skipped_delete_manifests: u32, + pub(crate) scanned_data_manifests: u32, + pub(crate) scanned_delete_manifests: u32, + + // Data file-level metrics. + pub(crate) result_data_files: u32, + pub(crate) skipped_data_files: u32, + pub(crate) total_file_size_in_bytes: u64, + + // Delete file-level metrics. + pub(crate) result_delete_files: u32, + pub(crate) skipped_delete_files: u32, + pub(crate) total_delete_file_size_in_bytes: u64, + + pub(crate) indexed_delete_files: u32, + pub(crate) equality_delete_files: u32, + pub(crate) positional_delete_files: u32, +} + +/// A reporter that logs the metrics to the console. +#[derive(Clone, Debug)] +pub(crate) struct LoggingMetricsReporter {} + +impl LoggingMetricsReporter { + pub(crate) fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl MetricsReporter for LoggingMetricsReporter { + async fn report(&self, report: MetricsReport) { + match report { + MetricsReport::Scan { + table, + snapshot_id, + schema_id, + filter, + projected_field_names, + projected_field_ids, + metrics, + metadata, + } => { + info!( + table = %table, + snapshot_id = snapshot_id, + schema_id = schema_id, + filter = ?filter, + projected_field_names = ?projected_field_names, + projected_field_ids = ?projected_field_ids, + scan_metrics.total_planning_duration = ?metrics.total_planning_duration, + scan_metrics.total_data_manifests = metrics.total_data_manifests, + scan_metrics.total_delete_manifests = metrics.total_delete_manifests, + scan_metrics.scanned_data_manifests = metrics.scanned_data_manifests, + scan_metrics.scanned_delete_manifests = metrics.scanned_delete_manifests, + scan_metrics.skipped_data_manifests = metrics.skipped_data_manifests, + scan_metrics.skipped_delete_manifests = metrics.skipped_delete_manifests, + scan_metrics.result_data_files = metrics.result_data_files, + scan_metrics.result_delete_files = metrics.result_delete_files, + scan_metrics.skipped_data_files = metrics.skipped_data_files, + scan_metrics.skipped_delete_files = metrics.skipped_delete_files, + scan_metrics.total_file_size_in_bytes = metrics.total_file_size_in_bytes, + scan_metrics.total_delete_file_size_in_bytes = metrics.total_delete_file_size_in_bytes, + scan_metrics.indexed_delete_files = metrics.indexed_delete_files, + scan_metrics.equality_delete_files = metrics.equality_delete_files, + scan_metrics.positional_delete_files = metrics.positional_delete_files, + metadata = ?metadata, + "Received metrics report" + ); + } + } + } +} diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 1e4ef41b2..8ccbcd2c4 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -23,6 +23,7 @@ use futures::{SinkExt, TryFutureExt}; use crate::delete_file_index::DeleteFileIndex; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::object_cache::ObjectCache; +use crate::scan::metrics::ManifestMetrics; use crate::scan::{ BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache, PartitionFilterCache, @@ -186,16 +187,25 @@ impl PlanContext { tx_data: Sender, delete_file_idx: DeleteFileIndex, delete_file_tx: Sender, - ) -> Result> + 'static>> { + ) -> Result<(Vec>, ManifestMetrics)> { let manifest_files = manifest_list.entries().iter(); - // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. + // TODO: Ideally we could ditch this intermediate Vec as we can return + // an iterator over the results. Updates to the manifest metrics somewhat + // complicate this because they need to be serialized somewhere, and an + // iterator can't easily take ownership of the metrics. + // A vec allows us to apply the mutations within this function. + // A vec also implicitly implements Send and Sync, meaning we can pass + // it around more easily in the concurrent planning step. let mut filtered_mfcs = vec![]; + let mut metrics = ManifestMetrics::default(); for manifest_file in manifest_files { let tx = if manifest_file.content == ManifestContentType::Deletes { + metrics.total_delete_manifests += 1; delete_file_tx.clone() } else { + metrics.total_data_manifests += 1; tx_data.clone() }; @@ -212,6 +222,10 @@ impl PlanContext { ) .eval(manifest_file)? { + match manifest_file.content { + ManifestContentType::Data => metrics.skipped_data_manifests += 1, + ManifestContentType::Deletes => metrics.skipped_delete_manifests += 1, + } continue; } @@ -230,7 +244,14 @@ impl PlanContext { filtered_mfcs.push(Ok(mfc)); } - Ok(Box::new(filtered_mfcs.into_iter())) + // They're not yet scanned, but will be scanned concurrently in the + // next processing step. + metrics.scanned_data_manifests = + metrics.total_data_manifests - metrics.skipped_data_manifests; + metrics.scanned_delete_manifests = + metrics.total_delete_manifests - metrics.skipped_delete_manifests; + + Ok((filtered_mfcs, metrics)) } fn create_manifest_file_context( diff --git a/crates/iceberg/src/scan/metrics.rs b/crates/iceberg/src/scan/metrics.rs new file mode 100644 index 000000000..ac4b19ffa --- /dev/null +++ b/crates/iceberg/src/scan/metrics.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::time::Instant; + +use futures::channel::mpsc::Receiver; +use futures::{StreamExt, join}; + +use crate::delete_file_index::DeleteIndexMetrics; +use crate::metrics::ScanMetrics; +use crate::runtime::JoinHandle; + +/// Awaits metrics updates from different sources and combines them into the +/// [ScanMetrics] struct used for reporting. +pub(crate) async fn aggregate_metrics( + planning_start: Instant, + manifest_metrics: ManifestMetrics, + data_file_metrics_handle: JoinHandle, + delete_file_metrics_handle: JoinHandle, + index_metrics_handle: JoinHandle, +) -> ScanMetrics { + let (data_file_metrics, delete_file_metrics, index_metrics) = join!( + data_file_metrics_handle, + delete_file_metrics_handle, + index_metrics_handle + ); + + // Only now (after consuming all metrics updates) do we know that + // all concurrent work is finished and we can stop timing the + // planning phase. + let total_planning_duration = planning_start.elapsed(); + + ScanMetrics { + total_planning_duration, + + total_data_manifests: manifest_metrics.total_data_manifests, + total_delete_manifests: manifest_metrics.total_delete_manifests, + skipped_data_manifests: manifest_metrics.skipped_data_manifests, + skipped_delete_manifests: manifest_metrics.skipped_delete_manifests, + scanned_data_manifests: manifest_metrics.scanned_data_manifests, + scanned_delete_manifests: manifest_metrics.scanned_delete_manifests, + + result_data_files: data_file_metrics.result_files, + skipped_data_files: data_file_metrics.skipped_files, + total_file_size_in_bytes: data_file_metrics.total_file_size_in_bytes, + + result_delete_files: delete_file_metrics.result_files, + skipped_delete_files: delete_file_metrics.skipped_files, + total_delete_file_size_in_bytes: delete_file_metrics.total_file_size_in_bytes, + + indexed_delete_files: index_metrics.indexed_delete_files, + equality_delete_files: index_metrics.equality_delete_files, + positional_delete_files: index_metrics.positional_delete_files, + } +} + +/// Subset of [ScanMetrics] produced by manifest-handling functions. +#[derive(Default)] +pub(crate) struct ManifestMetrics { + pub(crate) total_data_manifests: u32, + pub(crate) total_delete_manifests: u32, + pub(crate) skipped_data_manifests: u32, + pub(crate) skipped_delete_manifests: u32, + pub(crate) scanned_data_manifests: u32, + pub(crate) scanned_delete_manifests: u32, +} + +/// Subset of [ScanMetrics] produced by file-handling functions. +#[derive(Default)] +pub(crate) struct FileMetrics { + result_files: u32, + skipped_files: u32, + total_file_size_in_bytes: u64, +} + +impl FileMetrics { + pub(crate) async fn accumulate(mut updates: Receiver) -> Self { + let mut accumulator = Self::default(); + while let Some(update) = updates.next().await { + match update { + FileMetricsUpdate::Skipped => accumulator.skipped_files += 1, + FileMetricsUpdate::Scanned { size_in_bytes } => { + accumulator.total_file_size_in_bytes += size_in_bytes; + accumulator.result_files += 1; + } + } + } + accumulator + } +} + +/// Represents an update to a single data or delete file. +pub(crate) enum FileMetricsUpdate { + Skipped, + Scanned { size_in_bytes: u64 }, +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e987de859..a43936e4e 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -21,26 +21,31 @@ mod cache; use cache::*; mod context; use context::*; +mod metrics; mod task; +use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use arrow_array::RecordBatch; -use futures::channel::mpsc::{Sender, channel}; +use futures::channel::mpsc::{Receiver, Sender, channel}; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; use crate::arrow::ArrowReaderBuilder; -use crate::delete_file_index::DeleteFileIndex; +use crate::delete_file_index::{DeleteFileIndex, DeleteIndexMetrics}; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; -use crate::runtime::spawn; +use crate::metrics::{LoggingMetricsReporter, MetricsReport, MetricsReporter}; +use crate::runtime::{JoinHandle, spawn}; +use crate::scan::metrics::{FileMetrics, FileMetricsUpdate, ManifestMetrics, aggregate_metrics}; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::utils::available_parallelism; -use crate::{Error, ErrorKind, Result}; +use crate::{Error, ErrorKind, Result, TableIdent}; /// A stream of arrow [`RecordBatch`]es. pub type ArrowRecordBatchStream = BoxStream<'static, Result>; @@ -59,6 +64,9 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + + /// If None, we default to a LoggingReporter. + metrics_reporter: Option>, } impl<'a> TableScanBuilder<'a> { @@ -77,6 +85,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + metrics_reporter: None, } } @@ -101,6 +110,17 @@ impl<'a> TableScanBuilder<'a> { self } + /// Sets the metrics reporter to use for this scan. + /// + /// If unset, we default to a LoggingReporter. + pub(crate) fn with_metrics_reporter( + mut self, + metrics_reporter: Arc, + ) -> Self { + self.metrics_reporter = Some(metrics_reporter); + self + } + /// Select all columns. pub fn select_all(mut self) -> Self { self.column_names = None; @@ -185,6 +205,11 @@ impl<'a> TableScanBuilder<'a> { /// Build the table scan. pub fn build(self) -> Result { + let metrics_reporter = match self.metrics_reporter { + Some(metrics_reporter) => metrics_reporter, + None => Arc::new(LoggingMetricsReporter::new()), + }; + let snapshot = match self.snapshot_id { Some(snapshot_id) => self .table @@ -200,6 +225,7 @@ impl<'a> TableScanBuilder<'a> { None => { let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else { return Ok(TableScan { + table: self.table.identifier().clone(), batch_size: self.batch_size, column_names: self.column_names, file_io: self.table.file_io().clone(), @@ -209,6 +235,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + metrics_reporter, }); }; current_snapshot_id.clone() @@ -290,6 +317,7 @@ impl<'a> TableScanBuilder<'a> { }; Ok(TableScan { + table: self.table.identifier().clone(), batch_size: self.batch_size, column_names: self.column_names, file_io: self.table.file_io().clone(), @@ -299,6 +327,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + metrics_reporter, }) } } @@ -306,6 +335,8 @@ impl<'a> TableScanBuilder<'a> { /// Table scan. #[derive(Debug)] pub struct TableScan { + table: TableIdent, + /// A [PlanContext], if this table has at least one snapshot, otherwise None. /// /// If this is None, then the scan contains no rows. @@ -327,6 +358,8 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + + metrics_reporter: Arc, } impl TableScan { @@ -336,59 +369,108 @@ impl TableScan { return Ok(Box::pin(futures::stream::empty())); }; - let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files; - let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; + // Start the planning phase timer. + let plan_start_time = Instant::now(); // used to stream ManifestEntryContexts between stages of the file plan operation let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = - channel(concurrency_limit_manifest_files); + channel(self.concurrency_limit_manifest_files); let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = - channel(concurrency_limit_manifest_files); - - // used to stream the results back to the caller - let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + channel(self.concurrency_limit_manifest_files); - let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); + let (delete_file_idx, delete_file_tx, index_metrics_handle) = DeleteFileIndex::new(); let manifest_list = plan_context.get_manifest_list().await?; // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any - // whose partitions cannot match this - // scan's filter - let manifest_file_contexts = plan_context.build_manifest_file_contexts( - manifest_list, - manifest_entry_data_ctx_tx, - delete_file_idx.clone(), - manifest_entry_delete_ctx_tx, - )?; - - let mut channel_for_manifest_error = file_scan_task_tx.clone(); - - // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s + // whose partitions cannot match this scan's filter + let (manifest_file_contexts, manifest_metrics) = plan_context + .build_manifest_file_contexts( + manifest_list, + manifest_entry_data_ctx_tx, + delete_file_idx, + manifest_entry_delete_ctx_tx, + )?; + + // used to stream the results back to the caller + let (result_tx, file_scan_task_rx) = channel(self.concurrency_limit_manifest_entries); + + let _handle = self.spawn_fetch_manifests(manifest_file_contexts, result_tx.clone()); + + let (delete_manifests_handle, delete_file_metrics_handle) = self + .spawn_process_delete_manifest_entries( + manifest_entry_delete_ctx_rx, + delete_file_tx, + result_tx.clone(), + ); + delete_manifests_handle.await; + + let (_handle, data_file_metrics_handle) = + self.spawn_process_manifest_entries(manifest_entry_data_ctx_rx, result_tx); + + let _handle = self.report_metrics( + plan_start_time, + plan_context, + data_file_metrics_handle, + delete_file_metrics_handle, + index_metrics_handle, + manifest_metrics, + ); + + Ok(file_scan_task_rx.boxed()) + } + + fn spawn_fetch_manifests( + &self, + manifest_files: Vec>, + mut error_tx: Sender>, + ) -> JoinHandle<()> { + let concurrency_limit = self.concurrency_limit_manifest_files; spawn(async move { - let result = futures::stream::iter(manifest_file_contexts) - .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move { + let result = futures::stream::iter(manifest_files) + .try_for_each_concurrent(concurrency_limit, |ctx| async move { ctx.fetch_manifest_and_stream_manifest_entries().await }) .await; if let Err(error) = result { - let _ = channel_for_manifest_error.send(Err(error)).await; + let _ = error_tx.send(Err(error)).await; } - }); + }) + } - let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); - let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); + fn spawn_process_delete_manifest_entries( + &self, + manifest_entry_rx: Receiver, + delete_file_tx: Sender, + mut error_tx: Sender>, + ) -> (JoinHandle<()>, JoinHandle) { + let concurrency_limit = self.concurrency_limit_manifest_entries; - // Process the delete file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_delete_ctx_rx - .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + let (metrics_update_tx, metrics_update_rx) = channel(1); + let metrics_handle = spawn(FileMetrics::accumulate(metrics_update_rx)); + + let handle = spawn(async move { + let result = manifest_entry_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone(), metrics_update_tx.clone()))) .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { + concurrency_limit, + |(manifest_entry, mut file_tx, mut metrics_tx)| async move { spawn(async move { - Self::process_delete_manifest_entry(manifest_entry_context, tx).await + let delete_file = Self::filter_delete_manifest_entry(manifest_entry)?; + + let metrics_update = if let Some(delete_file) = delete_file { + let size_in_bytes = delete_file.manifest_entry.file_size_in_bytes(); + file_tx.send(delete_file).await?; + + FileMetricsUpdate::Scanned { size_in_bytes } + } else { + FileMetricsUpdate::Skipped + }; + + metrics_tx.send(metrics_update).await?; + + Ok(()) }) .await }, @@ -396,22 +478,44 @@ impl TableScan { .await; if let Err(error) = result { - let _ = channel_for_delete_manifest_entry_error - .send(Err(error)) - .await; + let _ = error_tx.send(Err(error)).await; } - }) - .await; + }); - // Process the data file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_data_ctx_rx - .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) + (handle, metrics_handle) + } + + fn spawn_process_manifest_entries( + &self, + manifest_entry_rx: Receiver, + mut file_scan_task_tx: Sender>, + ) -> (JoinHandle<()>, JoinHandle) { + let concurrency_limit = self.concurrency_limit_manifest_entries; + + let (metrics_update_tx, metrics_update_rx) = channel(1); + let metrics_handle = spawn(FileMetrics::accumulate(metrics_update_rx)); + + let handle = spawn(async move { + let result = manifest_entry_rx + .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone(), metrics_update_tx.clone()))) .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { + concurrency_limit, + |(manifest_entry, mut task_tx, mut metrics_tx)| async move { spawn(async move { - Self::process_data_manifest_entry(manifest_entry_context, tx).await + let file_task = + Self::filter_data_manifest_entry(manifest_entry).await?; + + let metrics_update = if let Some(file_task) = file_task { + let size_in_bytes = file_task.length; + task_tx.send(Ok(file_task)).await?; + + FileMetricsUpdate::Scanned { size_in_bytes } + } else { + FileMetricsUpdate::Skipped + }; + + metrics_tx.send(metrics_update).await?; + Ok(()) }) .await }, @@ -419,11 +523,53 @@ impl TableScan { .await; if let Err(error) = result { - let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; + let _ = file_scan_task_tx.send(Err(error)).await; } }); - Ok(file_scan_task_rx.boxed()) + (handle, metrics_handle) + } + + fn report_metrics( + &self, + plan_start_time: Instant, + plan_context: &PlanContext, + data_file_metrics_handle: JoinHandle, + delete_file_metrics_handle: JoinHandle, + index_metrics_handle: JoinHandle, + manifest_metrics: ManifestMetrics, + ) -> JoinHandle<()> { + let table = self.table.clone(); + let snapshot_id = plan_context.snapshot.snapshot_id(); + let filter = plan_context.predicate.clone(); + let schema_id = plan_context.snapshot_schema.schema_id(); + let projected_field_ids = plan_context.field_ids.clone(); + let projected_field_names = self.column_names.clone(); + + let metrics_reporter = Arc::clone(&self.metrics_reporter); + spawn(async move { + let metrics = aggregate_metrics( + plan_start_time, + manifest_metrics, + data_file_metrics_handle, + delete_file_metrics_handle, + index_metrics_handle, + ) + .await; + + let report = MetricsReport::Scan { + table, + snapshot_id, + filter, + schema_id, + projected_field_ids, + projected_field_names, + metadata: HashMap::new(), + metrics: Arc::new(metrics), + }; + + metrics_reporter.report(report).await; + }) } /// Returns an [`ArrowRecordBatchStream`]. @@ -453,13 +599,12 @@ impl TableScan { self.plan_context.as_ref().map(|x| &x.snapshot) } - async fn process_data_manifest_entry( + async fn filter_data_manifest_entry( manifest_entry_context: ManifestEntryContext, - mut file_scan_task_tx: Sender>, - ) -> Result<()> { + ) -> Result> { // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { - return Ok(()); + return Ok(None); } // abort the plan if we encounter a manifest entry for a delete file @@ -487,7 +632,7 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { - return Ok(()); + return Ok(None); } // skip any data file whose metrics don't match this scan's filter @@ -496,27 +641,24 @@ impl TableScan { manifest_entry_context.manifest_entry.data_file(), false, )? { - return Ok(()); + return Ok(None); } } // congratulations! the manifest entry has made its way through the // entire plan without getting filtered out. Create a corresponding // FileScanTask and push it to the result stream - file_scan_task_tx - .send(Ok(manifest_entry_context.into_file_scan_task().await?)) - .await?; + let file_scan_task = manifest_entry_context.into_file_scan_task().await?; - Ok(()) + Ok(Some(file_scan_task)) } - async fn process_delete_manifest_entry( + fn filter_delete_manifest_entry( manifest_entry_context: ManifestEntryContext, - mut delete_file_ctx_tx: Sender, - ) -> Result<()> { + ) -> Result> { // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { - return Ok(()); + return Ok(None); } // abort the plan if we encounter a manifest entry that is not for a delete file @@ -539,18 +681,18 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { - return Ok(()); + return Ok(None); } } - delete_file_ctx_tx - .send(DeleteFileContext { - manifest_entry: manifest_entry_context.manifest_entry.clone(), - partition_spec_id: manifest_entry_context.partition_spec_id, - }) - .await?; + let delete_file_ctx = DeleteFileContext { + manifest_entry: manifest_entry_context.manifest_entry.clone(), + partition_spec_id: manifest_entry_context.partition_spec_id, + }; - Ok(()) + // let size_in_bytes = manifest_entry_context.manifest_entry.file_size_in_bytes(); + + Ok(Some(delete_file_ctx)) } } @@ -567,11 +709,12 @@ pub mod tests { use std::collections::HashMap; use std::fs; use std::fs::File; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; use arrow_array::{ ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, }; + use async_trait::async_trait; use futures::{TryStreamExt, stream}; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; @@ -584,6 +727,7 @@ pub mod tests { use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; + use crate::metrics::{MetricsReport, MetricsReporter, ScanMetrics}; use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, @@ -595,6 +739,7 @@ pub mod tests { pub struct TableTestFixture { pub table_location: String, pub table: Table, + metrics_reporter: Arc, } impl TableTestFixture { @@ -627,17 +772,21 @@ pub mod tests { serde_json::from_str::(&metadata_json).unwrap() }; + let metrics_reporter = Arc::new(TestMetricsReporter::new()); + let table = Table::builder() .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) .file_io(file_io.clone()) .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .metrics_reporter(Arc::clone(&metrics_reporter) as Arc) .build() .unwrap(); Self { table_location: table_location.to_str().unwrap().to_string(), table, + metrics_reporter, } } @@ -666,17 +815,21 @@ pub mod tests { serde_json::from_str::(&metadata_json).unwrap() }; + let metrics_reporter = Arc::new(TestMetricsReporter::new()); + let table = Table::builder() .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) .file_io(file_io.clone()) .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .metrics_reporter(Arc::clone(&metrics_reporter) as Arc) .build() .unwrap(); Self { table_location: table_location.to_str().unwrap().to_string(), table, + metrics_reporter, } } @@ -715,17 +868,21 @@ pub mod tests { .partition_specs .insert(0, table_metadata.default_spec.clone()); + let metrics_reporter = Arc::new(TestMetricsReporter::new()); + let table = Table::builder() .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) .file_io(file_io.clone()) .metadata_location(table_metadata1_location.to_str().unwrap()) + .metrics_reporter(Arc::clone(&metrics_reporter) as Arc) .build() .unwrap(); Self { table_location: table_location.to_str().unwrap().to_string(), table, + metrics_reporter, } } @@ -1164,6 +1321,27 @@ pub mod tests { } } + #[derive(Debug)] + struct TestMetricsReporter { + last_report: Mutex>, + } + + #[async_trait] + impl MetricsReporter for TestMetricsReporter { + async fn report(&self, report: MetricsReport) { + let mut guard = self.last_report.lock().unwrap(); + *guard = Some(report); + } + } + + impl TestMetricsReporter { + fn new() -> Self { + Self { + last_report: Mutex::new(None), + } + } + } + #[test] fn test_table_scan_columns() { let table = TableTestFixture::new().table; @@ -1802,4 +1980,78 @@ pub mod tests { }; test_fn(task); } + + #[tokio::test] + async fn test_metrics_reporter() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let table_scan = fixture.table.scan().select(["x", "y"]).build().unwrap(); + + // Consume the table scan's results to finish the planning process, and + // send a report. + let _batches: Vec<_> = table_scan + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let report_guard = fixture.metrics_reporter.last_report.lock().unwrap(); + assert!(report_guard.is_some()); + let report = report_guard.as_ref().unwrap(); + + match report { + MetricsReport::Scan { + table, + snapshot_id, + filter, + schema_id, + projected_field_ids, + projected_field_names, + metrics, + metadata, + } => { + assert_eq!(table, fixture.table.identifier()); + assert_eq!( + snapshot_id, + &fixture.table.metadata().current_snapshot_id().unwrap() + ); + assert!(filter.is_none()); + assert_eq!(schema_id, &fixture.table.metadata().current_schema_id); + assert_eq!(projected_field_ids, &Arc::new(vec![1, 2])); + assert_eq!( + projected_field_names, + &Some(vec!["x".to_string(), "y".to_string()]) + ); + + assert_metrics(metrics); + + assert!(metadata.is_empty()) + } + } + } + + fn assert_metrics(metrics: &Arc) { + assert!(!metrics.total_planning_duration.is_zero()); + assert_eq!(metrics.total_data_manifests, 1); + assert_eq!(metrics.total_delete_manifests, 0); + assert_eq!(metrics.skipped_data_manifests, 0); + assert_eq!(metrics.skipped_delete_manifests, 0); + assert_eq!(metrics.scanned_data_manifests, 1); + assert_eq!(metrics.scanned_delete_manifests, 0); + + assert_eq!(metrics.result_data_files, 2); + assert_eq!(metrics.skipped_data_files, 1); + assert_eq!(metrics.total_file_size_in_bytes, 200); + + assert_eq!(metrics.result_delete_files, 0); + assert_eq!(metrics.skipped_delete_files, 0); + assert_eq!(metrics.total_delete_file_size_in_bytes, 0); + + assert_eq!(metrics.indexed_delete_files, 0); + assert_eq!(metrics.equality_delete_files, 0); + assert_eq!(metrics.positional_delete_files, 0); + } } diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index f601dacbc..4c93252ee 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -23,6 +23,7 @@ use crate::arrow::ArrowReaderBuilder; use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; +use crate::metrics::MetricsReporter; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -36,6 +37,7 @@ pub struct TableBuilder { readonly: bool, disable_cache: bool, cache_size_bytes: Option, + metrics_reporter: Option>, } impl TableBuilder { @@ -48,6 +50,7 @@ impl TableBuilder { readonly: false, disable_cache: false, cache_size_bytes: None, + metrics_reporter: None, } } @@ -95,6 +98,14 @@ impl TableBuilder { self } + /// sets the implementation used to report metrics about operations on this + /// table. + #[cfg(test)] + pub(crate) fn metrics_reporter(mut self, metrics_reporter: Arc) -> Self { + self.metrics_reporter = Some(metrics_reporter); + self + } + /// build the Table pub fn build(self) -> Result { let Self { @@ -105,6 +116,7 @@ impl TableBuilder { readonly, disable_cache, cache_size_bytes, + metrics_reporter, } = self; let Some(file_io) = file_io else { @@ -146,6 +158,7 @@ impl TableBuilder { identifier, readonly, object_cache, + metrics_reporter, }) } } @@ -159,6 +172,7 @@ pub struct Table { identifier: TableIdent, readonly: bool, object_cache: Arc, + metrics_reporter: Option>, } impl Table { @@ -204,7 +218,11 @@ impl Table { /// Creates a table scan. pub fn scan(&self) -> TableScanBuilder<'_> { - TableScanBuilder::new(self) + if let Some(metrics_reporter) = &self.metrics_reporter { + TableScanBuilder::new(self).with_metrics_reporter(Arc::clone(metrics_reporter)) + } else { + TableScanBuilder::new(self) + } } /// Creates a metadata table which provides table-like APIs for inspecting metadata.