From eb8109cab5bfb08dd7ee89955e82fd3d0c7bf651 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Fri, 20 Jun 2025 17:20:18 +0200 Subject: [PATCH 01/25] Foundational work Signed-off-by: Jannik Steinmann Rename MetricsReporter and MetricsReport Signed-off-by: Jannik Steinmann Remove unnecessary stuff Signed-off-by: Jannik Steinmann --- Cargo.lock | 1 + crates/iceberg/src/lib.rs | 6 +- crates/iceberg/src/metrics.rs | 132 ++++++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 crates/iceberg/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 786b1b726d..3ab1495bd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3534,6 +3534,7 @@ dependencies = [ "tera", "thrift", "tokio", + "tracing", "typed-builder 0.20.1", "url", "uuid", diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 63af25e9a5..b2deccf60b 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -70,11 +70,11 @@ pub mod table; mod avro; pub mod cache; -pub mod io; -pub mod spec; - pub mod inspect; +pub mod io; +pub mod metrics; pub mod scan; +pub mod spec; pub mod expr; pub mod transaction; diff --git a/crates/iceberg/src/metrics.rs b/crates/iceberg/src/metrics.rs new file mode 100644 index 0000000000..abffd02bef --- /dev/null +++ b/crates/iceberg/src/metrics.rs @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the metrics reporting API for Iceberg. +//! +//! It is used to report table operations in a pluggable way. See [1] for more +//! details. +//! +//! [1] https://iceberg.apache.org/docs/latest/metrics-reporting + +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; + +use crate::TableIdent; +use crate::expr::Predicate; +use crate::spec::SchemaId; + +/// This trait defines the basic API for reporting metrics for operations to a Table. +#[async_trait] +pub(crate) trait MetricsReporter: Debug + Send + Sync { + /// Indicates that an operation is done by reporting a MetricsReport. + /// + /// Any errors are expected to be handled internally. + async fn report(&self, report: MetricsReport); +} + +/// An enum of all metrics reports. +#[derive(Debug)] +pub(crate) enum MetricsReport { + /// A Table Scan report that contains all relevant information from a Table Scan. + Scan { + table: TableIdent, + snapshot_id: i64, + filter: Option>, + schema_id: SchemaId, + projected_field_ids: Arc>, + // TODO: We could default to listing all field names, if all are selected + // check what Java is doing. + projected_field_names: Option>, + metrics: Box, + metadata: HashMap, + }, +} + +/// Carries all metrics for a particular scan. +#[derive(Debug)] +pub(crate) struct ScanMetrics { + pub(crate) total_planning_duration: Duration, + + // Manfiest-level metrics, computed by walking the snapshot's manifest list + // file entries and checking which manifests match the scan's predicates. + pub(crate) total_data_manifests: u32, // TODO: Are these really just skipped+scanned? + pub(crate) total_delete_manifests: u32, + pub(crate) skipped_data_manifests: u32, + pub(crate) skipped_delete_manifests: u32, + pub(crate) scanned_data_manifests: u32, + pub(crate) scanned_delete_manifests: u32, + + // Data file-level metrics. + pub(crate) result_data_files: u32, + pub(crate) skipped_data_files: u32, + pub(crate) total_file_size_in_bytes: u64, // TODO: should then all be u64s? + + // Delete file-level metrics. + pub(crate) result_delete_files: u32, + pub(crate) skipped_delete_files: u32, + pub(crate) total_delete_file_size_in_bytes: u64, + + pub(crate) indexed_delete_files: u32, + pub(crate) equality_delete_files: u32, + pub(crate) positional_delete_files: u32, +} + +// TODO: This impl will provide public accessors for the fields, because +// crate-external implementators will need to access them, while (so far) only +// code within the crate will need to mutate them. +impl ScanMetrics { + pub fn result_data_files(&self) -> Counter { + Counter { + value: self.result_data_files, + unit: "file".to_string(), + } + } + + pub fn result_delete_files(&self) -> Counter { + Counter { + value: self.result_delete_files, + unit: "file".to_string(), + } + } + + pub fn total_data_manifests(&self) -> Counter { + Counter { + value: self.total_data_manifests, + unit: "manifest".to_string(), + } + } +} + +struct Counter { + value: u32, + unit: String, +} + +/// A reporter that logs the metrics to the console. +#[derive(Clone, Debug)] +pub(crate) struct LoggingMetricsReporter {} + +#[async_trait] +impl MetricsReporter for LoggingMetricsReporter { + async fn report(&self, report: MetricsReport) { + println!("Reporting metrics: {:?}", report); + } +} From 20a0e80a2e482c43495b1fed0d8409a89364cf39 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Thu, 26 Jun 2025 20:56:38 +0200 Subject: [PATCH 02/25] refactor: TableScan::plan_files into parallel steps Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/context.rs | 4 +- crates/iceberg/src/scan/mod.rs | 91 ++++++++++++++++++------------ 2 files changed, 58 insertions(+), 37 deletions(-) diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 1e4ef41b2b..a02e3222a7 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -186,7 +186,7 @@ impl PlanContext { tx_data: Sender, delete_file_idx: DeleteFileIndex, delete_file_tx: Sender, - ) -> Result> + 'static>> { + ) -> Result>> { let manifest_files = manifest_list.entries().iter(); // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. @@ -230,7 +230,7 @@ impl PlanContext { filtered_mfcs.push(Ok(mfc)); } - Ok(Box::new(filtered_mfcs.into_iter())) + Ok(filtered_mfcs) } fn create_manifest_file_context( diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e987de859f..7c5bfce802 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -26,7 +26,7 @@ mod task; use std::sync::Arc; use arrow_array::RecordBatch; -use futures::channel::mpsc::{Sender, channel}; +use futures::channel::mpsc::{Receiver, Sender, channel}; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; @@ -36,7 +36,7 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; -use crate::runtime::spawn; +use crate::runtime::{JoinHandle, spawn}; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::utils::available_parallelism; @@ -336,56 +336,76 @@ impl TableScan { return Ok(Box::pin(futures::stream::empty())); }; - let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files; - let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; - // used to stream ManifestEntryContexts between stages of the file plan operation let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = - channel(concurrency_limit_manifest_files); + channel(self.concurrency_limit_manifest_files); let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = - channel(concurrency_limit_manifest_files); - - // used to stream the results back to the caller - let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + channel(self.concurrency_limit_manifest_files); let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); let manifest_list = plan_context.get_manifest_list().await?; // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any - // whose partitions cannot match this - // scan's filter + // whose partitions cannot match this scan's filter let manifest_file_contexts = plan_context.build_manifest_file_contexts( manifest_list, manifest_entry_data_ctx_tx, - delete_file_idx.clone(), + delete_file_idx, manifest_entry_delete_ctx_tx, )?; - let mut channel_for_manifest_error = file_scan_task_tx.clone(); + // used to stream the results back to the caller + let (result_tx, file_scan_task_rx) = channel(self.concurrency_limit_manifest_entries); // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s - spawn(async move { - let result = futures::stream::iter(manifest_file_contexts) - .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move { + self.load_manifests(manifest_file_contexts, result_tx.clone()); + + // Process the delete file [`ManifestEntry`] stream in parallel + self.process_delete_manifest_entries( + manifest_entry_delete_ctx_rx, + delete_file_tx, + result_tx.clone(), + ) + .await; + + // Process the data file [`ManifestEntry`] stream in parallel + self.process_manifest_entries(manifest_entry_data_ctx_rx, result_tx); + + Ok(file_scan_task_rx.boxed()) + } + + fn load_manifests( + &self, + manifest_files: Vec>, + mut error_tx: Sender>, + ) { + let concurrency_limit = self.concurrency_limit_manifest_files; + let _handle = spawn(async move { + let result = futures::stream::iter(manifest_files) + .try_for_each_concurrent(concurrency_limit, |ctx| async move { ctx.fetch_manifest_and_stream_manifest_entries().await }) .await; if let Err(error) = result { - let _ = channel_for_manifest_error.send(Err(error)).await; + let _ = error_tx.send(Err(error)).await; } }); + } - let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); - let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); - - // Process the delete file [`ManifestEntry`] stream in parallel + fn process_delete_manifest_entries( + &self, + manifest_entry_rx: Receiver, + delete_file_tx: Sender, + mut error_tx: Sender>, + ) -> JoinHandle<()> { + let concurrency_limit = self.concurrency_limit_manifest_entries; spawn(async move { - let result = manifest_entry_delete_ctx_rx + let result = manifest_entry_rx .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) .try_for_each_concurrent( - concurrency_limit_manifest_entries, + concurrency_limit, |(manifest_entry_context, tx)| async move { spawn(async move { Self::process_delete_manifest_entry(manifest_entry_context, tx).await @@ -396,19 +416,22 @@ impl TableScan { .await; if let Err(error) = result { - let _ = channel_for_delete_manifest_entry_error - .send(Err(error)) - .await; + let _ = error_tx.send(Err(error)).await; } }) - .await; + } - // Process the data file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_data_ctx_rx + fn process_manifest_entries( + &self, + manifest_entry_rx: Receiver, + mut file_scan_task_tx: Sender>, + ) { + let concurrency_limit = self.concurrency_limit_manifest_entries; + let _handle = spawn(async move { + let result = manifest_entry_rx .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) .try_for_each_concurrent( - concurrency_limit_manifest_entries, + concurrency_limit, |(manifest_entry_context, tx)| async move { spawn(async move { Self::process_data_manifest_entry(manifest_entry_context, tx).await @@ -419,11 +442,9 @@ impl TableScan { .await; if let Err(error) = result { - let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; + let _ = file_scan_task_tx.send(Err(error)).await; } }); - - Ok(file_scan_task_rx.boxed()) } /// Returns an [`ArrowRecordBatchStream`]. From e1dc69908dad88f13da3b4e1be4b90f35b53fc53 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Fri, 27 Jun 2025 11:33:53 +0200 Subject: [PATCH 03/25] Use serialization-based logger Signed-off-by: Jannik Steinmann --- crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/metrics.rs | 14 +++++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 52f5b4d8b5..e3425efb5a 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -90,6 +90,7 @@ typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } zstd = { workspace = true } +tracing = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/crates/iceberg/src/metrics.rs b/crates/iceberg/src/metrics.rs index abffd02bef..e3e47e554d 100644 --- a/crates/iceberg/src/metrics.rs +++ b/crates/iceberg/src/metrics.rs @@ -28,6 +28,9 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use serde::Serialize; +use serde_with::{DurationNanoSeconds, serde_as}; +use tracing::{info, warn}; use crate::TableIdent; use crate::expr::Predicate; @@ -43,7 +46,7 @@ pub(crate) trait MetricsReporter: Debug + Send + Sync { } /// An enum of all metrics reports. -#[derive(Debug)] +#[derive(Debug, Serialize)] pub(crate) enum MetricsReport { /// A Table Scan report that contains all relevant information from a Table Scan. Scan { @@ -61,8 +64,10 @@ pub(crate) enum MetricsReport { } /// Carries all metrics for a particular scan. -#[derive(Debug)] +#[serde_as] +#[derive(Debug, Serialize)] pub(crate) struct ScanMetrics { + #[serde_as(as = "DurationNanoSeconds")] pub(crate) total_planning_duration: Duration, // Manfiest-level metrics, computed by walking the snapshot's manifest list @@ -127,6 +132,9 @@ pub(crate) struct LoggingMetricsReporter {} #[async_trait] impl MetricsReporter for LoggingMetricsReporter { async fn report(&self, report: MetricsReport) { - println!("Reporting metrics: {:?}", report); + match serde_json::to_string(&report) { + Ok(json) => info!(report = json, "Reporting metrics"), + Err(e) => warn!("Failed to serialize metrics report: {}", e), + } } } From 16af4165a61eb1c292c2c3e10fdb2d91d9331b9e Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Fri, 27 Jun 2025 14:21:10 +0200 Subject: [PATCH 04/25] Set metrics reporter on TableScan Signed-off-by: Jannik Steinmann --- crates/iceberg/src/metrics.rs | 12 +++++++++++- crates/iceberg/src/scan/mod.rs | 31 +++++++++++++++++++++++++++++++ crates/iceberg/src/table.rs | 22 +++++++++++++++++++++- 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/metrics.rs b/crates/iceberg/src/metrics.rs index e3e47e554d..a5e9c383c4 100644 --- a/crates/iceberg/src/metrics.rs +++ b/crates/iceberg/src/metrics.rs @@ -36,7 +36,11 @@ use crate::TableIdent; use crate::expr::Predicate; use crate::spec::SchemaId; -/// This trait defines the basic API for reporting metrics for operations to a Table. +/// This trait defines the API for reporting metrics of table operations. +/// +/// Refer to the [Iceberg docs] for details. +/// +/// [Iceberg docs]: https://iceberg.apache.org/docs/latest/metrics-reporting/ #[async_trait] pub(crate) trait MetricsReporter: Debug + Send + Sync { /// Indicates that an operation is done by reporting a MetricsReport. @@ -129,6 +133,12 @@ struct Counter { #[derive(Clone, Debug)] pub(crate) struct LoggingMetricsReporter {} +impl LoggingMetricsReporter { + pub(crate) fn new() -> Self { + Self {} + } +} + #[async_trait] impl MetricsReporter for LoggingMetricsReporter { async fn report(&self, report: MetricsReport) { diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 7c5bfce802..835ea7daab 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -36,6 +36,7 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; +use crate::metrics::{LoggingMetricsReporter, MetricsReporter}; use crate::runtime::{JoinHandle, spawn}; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; @@ -59,6 +60,9 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + + /// If None, we default to a LoggingReporter. + metrics_reporter: Option>>, } impl<'a> TableScanBuilder<'a> { @@ -77,6 +81,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + metrics_reporter: None, } } @@ -101,6 +106,17 @@ impl<'a> TableScanBuilder<'a> { self } + /// Sets the metrics reporter to use for this scan. + /// + /// If unset, we default to a LoggingReporter. + pub(crate) fn with_metrics_reporter( + mut self, + metrics_reporter: Arc>, + ) -> Self { + self.metrics_reporter = Some(metrics_reporter); + self + } + /// Select all columns. pub fn select_all(mut self) -> Self { self.column_names = None; @@ -185,6 +201,17 @@ impl<'a> TableScanBuilder<'a> { /// Build the table scan. pub fn build(self) -> Result { + let metrics_reporter = match self.metrics_reporter { + Some(metrics_reporter) => metrics_reporter, + None => { + // When a table scan is constructed directly (not by a catalog), + // and the user didn't provide a metrics reporter, then we + // construct a new one. + let reporter: Box = Box::new(LoggingMetricsReporter::new()); + Arc::new(reporter) + } + }; + let snapshot = match self.snapshot_id { Some(snapshot_id) => self .table @@ -209,6 +236,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + metrics_reporter, }); }; current_snapshot_id.clone() @@ -299,6 +327,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + metrics_reporter, }) } } @@ -327,6 +356,8 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + + metrics_reporter: Arc>, } impl TableScan { diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index f601dacbc7..411d80c4e1 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -23,6 +23,7 @@ use crate::arrow::ArrowReaderBuilder; use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; +use crate::metrics::MetricsReporter; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -36,6 +37,7 @@ pub struct TableBuilder { readonly: bool, disable_cache: bool, cache_size_bytes: Option, + metrics_reporter: Option>>, } impl TableBuilder { @@ -48,6 +50,7 @@ impl TableBuilder { readonly: false, disable_cache: false, cache_size_bytes: None, + metrics_reporter: None, } } @@ -95,6 +98,16 @@ impl TableBuilder { self } + /// sets the implementation used to report metrics about operations on this + /// table. + pub(crate) fn with_metrics_reporter( + mut self, + metrics_reporter: Arc>, + ) -> Self { + self.metrics_reporter = Some(metrics_reporter); + self + } + /// build the Table pub fn build(self) -> Result { let Self { @@ -105,6 +118,7 @@ impl TableBuilder { readonly, disable_cache, cache_size_bytes, + metrics_reporter, } = self; let Some(file_io) = file_io else { @@ -146,6 +160,7 @@ impl TableBuilder { identifier, readonly, object_cache, + metrics_reporter, }) } } @@ -159,6 +174,7 @@ pub struct Table { identifier: TableIdent, readonly: bool, object_cache: Arc, + metrics_reporter: Option>>, } impl Table { @@ -204,7 +220,11 @@ impl Table { /// Creates a table scan. pub fn scan(&self) -> TableScanBuilder<'_> { - TableScanBuilder::new(self) + if let Some(metrics_reporter) = &self.metrics_reporter { + TableScanBuilder::new(self).with_metrics_reporter(Arc::clone(metrics_reporter)) + } else { + TableScanBuilder::new(self) + } } /// Creates a metadata table which provides table-like APIs for inspecting metadata. From 883202780f3743b7ef6d3cf4baab112b66da5d1e Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Fri, 27 Jun 2025 14:48:37 +0200 Subject: [PATCH 05/25] Collect metrics for indexed deletes Signed-off-by: Jannik Steinmann --- crates/iceberg/src/delete_file_index.rs | 44 ++++++++++++++++++++++--- crates/iceberg/src/scan/mod.rs | 2 +- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index d8f7a872e1..6ec7e37354 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -21,6 +21,8 @@ use std::sync::{Arc, RwLock}; use futures::StreamExt; use futures::channel::mpsc::{Sender, channel}; +use futures::channel::oneshot; +use itertools::Itertools; use tokio::sync::Notify; use crate::runtime::spawn; @@ -51,16 +53,29 @@ struct PopulatedDeleteFileIndex { // TODO: Deletion Vector support } +#[derive(Debug)] +pub(crate) struct DeleteIndexMetrics { + pub(crate) indexed_delete_files: u32, + pub(crate) equality_delete_files: u32, + pub(crate) positional_delete_files: u32, +} + impl DeleteFileIndex { /// create a new `DeleteFileIndex` along with the sender that populates it with delete files - pub(crate) fn new() -> (DeleteFileIndex, Sender) { + pub(crate) fn new() -> ( + DeleteFileIndex, + Sender, + oneshot::Receiver, + ) { // TODO: what should the channel limit be? - let (tx, rx) = channel(10); + let (delete_file_tx, delete_file_rx) = channel(10); let notify = Arc::new(Notify::new()); let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating( notify.clone(), ))); - let delete_file_stream = rx.boxed(); + let delete_file_stream = delete_file_rx.boxed(); + + let (metrics_tx, metrics_rx) = oneshot::channel(); spawn({ let state = state.clone(); @@ -69,15 +84,19 @@ impl DeleteFileIndex { let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files); + let metrics = populated_delete_file_index.metrics(); + { let mut guard = state.write().unwrap(); *guard = DeleteFileIndexState::Populated(populated_delete_file_index); } notify.notify_waiters(); + + metrics_tx.send(metrics).unwrap(); } }); - (DeleteFileIndex { state }, tx) + (DeleteFileIndex { state }, delete_file_tx, metrics_rx) } /// Gets all the delete files that apply to the specified data file. @@ -207,4 +226,21 @@ impl PopulatedDeleteFileIndex { results } + + fn metrics(&self) -> DeleteIndexMetrics { + // We count both partitioned and globally applied equality deletes. + let equality_delete_files = + flattened_len(&self.eq_deletes_by_partition) + self.global_deletes.len() as u32; + let positional_delete_files = flattened_len(&self.pos_deletes_by_partition); + + DeleteIndexMetrics { + indexed_delete_files: equality_delete_files + positional_delete_files, + equality_delete_files, + positional_delete_files, + } + } +} + +fn flattened_len(map: &HashMap>>) -> u32 { + map.values().flatten().try_len().unwrap_or(0) as u32 } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 835ea7daab..8a811d8a75 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -373,7 +373,7 @@ impl TableScan { let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = channel(self.concurrency_limit_manifest_files); - let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); + let (delete_file_idx, delete_file_tx, index_metrics_rx) = DeleteFileIndex::new(); let manifest_list = plan_context.get_manifest_list().await?; From 05dc825a4c487db316cd959519f2d9591b1c7892 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 29 Jun 2025 12:36:01 +0200 Subject: [PATCH 06/25] Collect manifest file metrics Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/context.rs | 20 +++++++++++++++++--- crates/iceberg/src/scan/metrics.rs | 27 +++++++++++++++++++++++++++ crates/iceberg/src/scan/mod.rs | 13 +++++++------ 3 files changed, 51 insertions(+), 9 deletions(-) create mode 100644 crates/iceberg/src/scan/metrics.rs diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index a02e3222a7..569814c441 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -23,6 +23,7 @@ use futures::{SinkExt, TryFutureExt}; use crate::delete_file_index::DeleteFileIndex; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::object_cache::ObjectCache; +use crate::scan::metrics::ManifestMetrics; use crate::scan::{ BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache, PartitionFilterCache, @@ -186,16 +187,18 @@ impl PlanContext { tx_data: Sender, delete_file_idx: DeleteFileIndex, delete_file_tx: Sender, - ) -> Result>> { + ) -> Result<(Vec>, ManifestMetrics)> { let manifest_files = manifest_list.entries().iter(); - // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; + let mut metrics = ManifestMetrics::default(); for manifest_file in manifest_files { let tx = if manifest_file.content == ManifestContentType::Deletes { + metrics.total_delete_manifests += 1; delete_file_tx.clone() } else { + metrics.total_data_manifests += 1; tx_data.clone() }; @@ -212,6 +215,10 @@ impl PlanContext { ) .eval(manifest_file)? { + match manifest_file.content { + ManifestContentType::Data => metrics.skipped_data_manifests += 1, + ManifestContentType::Deletes => metrics.skipped_delete_manifests += 1, + } continue; } @@ -230,7 +237,14 @@ impl PlanContext { filtered_mfcs.push(Ok(mfc)); } - Ok(filtered_mfcs) + // They're not yet scanned, but will be scanned concurrently in the + // next processing step. + metrics.scanned_data_manifests = + metrics.total_data_manifests - metrics.skipped_data_manifests; + metrics.scanned_delete_manifests = + metrics.total_delete_manifests - metrics.skipped_delete_manifests; + + Ok((filtered_mfcs, metrics)) } fn create_manifest_file_context( diff --git a/crates/iceberg/src/scan/metrics.rs b/crates/iceberg/src/scan/metrics.rs new file mode 100644 index 0000000000..d2bc83bb04 --- /dev/null +++ b/crates/iceberg/src/scan/metrics.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Subset of [ScanMetrics] produced by manifest-handling functions. +#[derive(Default)] +pub(crate) struct ManifestMetrics { + pub(crate) total_data_manifests: u32, + pub(crate) total_delete_manifests: u32, + pub(crate) skipped_data_manifests: u32, + pub(crate) skipped_delete_manifests: u32, + pub(crate) scanned_data_manifests: u32, + pub(crate) scanned_delete_manifests: u32, +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 8a811d8a75..5753cc65bc 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -379,12 +379,13 @@ impl TableScan { // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any // whose partitions cannot match this scan's filter - let manifest_file_contexts = plan_context.build_manifest_file_contexts( - manifest_list, - manifest_entry_data_ctx_tx, - delete_file_idx, - manifest_entry_delete_ctx_tx, - )?; + let (manifest_file_contexts, manifest_metrics) = plan_context + .build_manifest_file_contexts( + manifest_list, + manifest_entry_data_ctx_tx, + delete_file_idx, + manifest_entry_delete_ctx_tx, + )?; // used to stream the results back to the caller let (result_tx, file_scan_task_rx) = channel(self.concurrency_limit_manifest_entries); From ce52bf6bc9466a88fc2e8564bd1ab29540281811 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 29 Jun 2025 18:48:51 +0200 Subject: [PATCH 07/25] Drop unnecessary Box<> Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/mod.rs | 9 ++++----- crates/iceberg/src/table.rs | 9 +++------ 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 5753cc65bc..b84bf1e75d 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -62,7 +62,7 @@ pub struct TableScanBuilder<'a> { row_selection_enabled: bool, /// If None, we default to a LoggingReporter. - metrics_reporter: Option>>, + metrics_reporter: Option>, } impl<'a> TableScanBuilder<'a> { @@ -111,7 +111,7 @@ impl<'a> TableScanBuilder<'a> { /// If unset, we default to a LoggingReporter. pub(crate) fn with_metrics_reporter( mut self, - metrics_reporter: Arc>, + metrics_reporter: Arc, ) -> Self { self.metrics_reporter = Some(metrics_reporter); self @@ -207,8 +207,7 @@ impl<'a> TableScanBuilder<'a> { // When a table scan is constructed directly (not by a catalog), // and the user didn't provide a metrics reporter, then we // construct a new one. - let reporter: Box = Box::new(LoggingMetricsReporter::new()); - Arc::new(reporter) + Arc::new(LoggingMetricsReporter::new()) } }; @@ -357,7 +356,7 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, - metrics_reporter: Arc>, + metrics_reporter: Arc, } impl TableScan { diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 411d80c4e1..f950d5be1f 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -37,7 +37,7 @@ pub struct TableBuilder { readonly: bool, disable_cache: bool, cache_size_bytes: Option, - metrics_reporter: Option>>, + metrics_reporter: Option>, } impl TableBuilder { @@ -100,10 +100,7 @@ impl TableBuilder { /// sets the implementation used to report metrics about operations on this /// table. - pub(crate) fn with_metrics_reporter( - mut self, - metrics_reporter: Arc>, - ) -> Self { + pub(crate) fn metrics_reporter(mut self, metrics_reporter: Arc) -> Self { self.metrics_reporter = Some(metrics_reporter); self } @@ -174,7 +171,7 @@ pub struct Table { identifier: TableIdent, readonly: bool, object_cache: Arc, - metrics_reporter: Option>>, + metrics_reporter: Option>, } impl Table { From 3bec473df4824f3b511d47b71c1e1f1682911ada Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 29 Jun 2025 23:30:50 +0200 Subject: [PATCH 08/25] Collect metrics for data and delete files Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/metrics.rs | 6 ++++ crates/iceberg/src/scan/mod.rs | 50 ++++++++++++++++++++++++------ 2 files changed, 47 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/scan/metrics.rs b/crates/iceberg/src/scan/metrics.rs index d2bc83bb04..a90af5355a 100644 --- a/crates/iceberg/src/scan/metrics.rs +++ b/crates/iceberg/src/scan/metrics.rs @@ -25,3 +25,9 @@ pub(crate) struct ManifestMetrics { pub(crate) scanned_data_manifests: u32, pub(crate) scanned_delete_manifests: u32, } + +/// Represents an update to a single data or delete file. +pub(crate) enum FileMetricsUpdate { + Skipped, + Scanned { size_in_bytes: u64 }, +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index b84bf1e75d..c50cec26dd 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -38,6 +38,7 @@ use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::metrics::{LoggingMetricsReporter, MetricsReporter}; use crate::runtime::{JoinHandle, spawn}; +use crate::scan::metrics::{FileMetricsUpdate, ManifestMetrics}; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::utils::available_parallelism; @@ -393,15 +394,18 @@ impl TableScan { self.load_manifests(manifest_file_contexts, result_tx.clone()); // Process the delete file [`ManifestEntry`] stream in parallel + let (delete_file_metrics_tx, delete_file_metrics_rx) = channel(1); self.process_delete_manifest_entries( manifest_entry_delete_ctx_rx, delete_file_tx, result_tx.clone(), + delete_file_metrics_tx, ) .await; // Process the data file [`ManifestEntry`] stream in parallel - self.process_manifest_entries(manifest_entry_data_ctx_rx, result_tx); + let (data_file_metrics_tx, data_file_metrics_rx) = channel(1); + self.process_manifest_entries(manifest_entry_data_ctx_rx, result_tx, data_file_metrics_tx); Ok(file_scan_task_rx.boxed()) } @@ -430,16 +434,22 @@ impl TableScan { manifest_entry_rx: Receiver, delete_file_tx: Sender, mut error_tx: Sender>, + metrics_update_tx: Sender, ) -> JoinHandle<()> { let concurrency_limit = self.concurrency_limit_manifest_entries; spawn(async move { let result = manifest_entry_rx - .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone(), metrics_update_tx.clone()))) .try_for_each_concurrent( concurrency_limit, - |(manifest_entry_context, tx)| async move { + |(manifest_entry_context, file_tx, metrics_tx)| async move { spawn(async move { - Self::process_delete_manifest_entry(manifest_entry_context, tx).await + Self::process_delete_manifest_entry( + manifest_entry_context, + file_tx, + metrics_tx, + ) + .await }) .await }, @@ -456,16 +466,22 @@ impl TableScan { &self, manifest_entry_rx: Receiver, mut file_scan_task_tx: Sender>, + metrics_update_tx: Sender, ) { let concurrency_limit = self.concurrency_limit_manifest_entries; let _handle = spawn(async move { let result = manifest_entry_rx - .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) + .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone(), metrics_update_tx.clone()))) .try_for_each_concurrent( concurrency_limit, - |(manifest_entry_context, tx)| async move { + |(manifest_entry_context, task_tx, metrics_tx)| async move { spawn(async move { - Self::process_data_manifest_entry(manifest_entry_context, tx).await + Self::process_data_manifest_entry( + manifest_entry_context, + task_tx, + metrics_tx, + ) + .await }) .await }, @@ -508,9 +524,11 @@ impl TableScan { async fn process_data_manifest_entry( manifest_entry_context: ManifestEntryContext, mut file_scan_task_tx: Sender>, + mut metrics_tx: Sender, ) -> Result<()> { // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { + metrics_tx.send(FileMetricsUpdate::Skipped).await?; return Ok(()); } @@ -539,6 +557,7 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { + metrics_tx.send(FileMetricsUpdate::Skipped).await?; return Ok(()); } @@ -548,6 +567,7 @@ impl TableScan { manifest_entry_context.manifest_entry.data_file(), false, )? { + metrics_tx.send(FileMetricsUpdate::Skipped).await?; return Ok(()); } } @@ -555,8 +575,12 @@ impl TableScan { // congratulations! the manifest entry has made its way through the // entire plan without getting filtered out. Create a corresponding // FileScanTask and push it to the result stream - file_scan_task_tx - .send(Ok(manifest_entry_context.into_file_scan_task().await?)) + let size_in_bytes = manifest_entry_context.manifest_entry.file_size_in_bytes(); + let file_scan_task = manifest_entry_context.into_file_scan_task().await?; + + file_scan_task_tx.send(Ok(file_scan_task)).await?; + metrics_tx + .send(FileMetricsUpdate::Scanned { size_in_bytes }) .await?; Ok(()) @@ -565,9 +589,11 @@ impl TableScan { async fn process_delete_manifest_entry( manifest_entry_context: ManifestEntryContext, mut delete_file_ctx_tx: Sender, + mut metrics_tx: Sender, ) -> Result<()> { // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { + metrics_tx.send(FileMetricsUpdate::Skipped).await?; return Ok(()); } @@ -591,6 +617,7 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { + metrics_tx.send(FileMetricsUpdate::Skipped).await?; return Ok(()); } } @@ -602,6 +629,11 @@ impl TableScan { }) .await?; + let size_in_bytes = manifest_entry_context.manifest_entry.file_size_in_bytes(); + metrics_tx + .send(FileMetricsUpdate::Scanned { size_in_bytes }) + .await?; + Ok(()) } } From 4fcfbed748d7d7edb38fcb0dac95f3c4cd3c2ed3 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 29 Jun 2025 23:31:53 +0200 Subject: [PATCH 09/25] Inlcude metrics mod Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index c50cec26dd..6e1dc9faca 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -21,6 +21,7 @@ mod cache; use cache::*; mod context; use context::*; +mod metrics; mod task; use std::sync::Arc; @@ -32,7 +33,7 @@ use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; use crate::arrow::ArrowReaderBuilder; -use crate::delete_file_index::DeleteFileIndex; +use crate::delete_file_index::DeleteFileIndex, DeleteIndexMetrics}; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; From 7242774f609f81da0e26c786d28842ef73f98543 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 29 Jun 2025 23:34:41 +0200 Subject: [PATCH 10/25] Include TableIdent in TableScan Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 6e1dc9faca..85177c363f 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -43,7 +43,7 @@ use crate::scan::metrics::{FileMetricsUpdate, ManifestMetrics}; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::utils::available_parallelism; -use crate::{Error, ErrorKind, Result}; +use crate::{Error, ErrorKind, Result, TableIdent}; /// A stream of arrow [`RecordBatch`]es. pub type ArrowRecordBatchStream = BoxStream<'static, Result>; @@ -228,6 +228,7 @@ impl<'a> TableScanBuilder<'a> { None => { let Some(current_snapshot_id) = self.table.metadata().current_snapshot() else { return Ok(TableScan { + table: self.table.identifier().clone(), batch_size: self.batch_size, column_names: self.column_names, file_io: self.table.file_io().clone(), @@ -319,6 +320,7 @@ impl<'a> TableScanBuilder<'a> { }; Ok(TableScan { + table: self.table.identifier().clone(), batch_size: self.batch_size, column_names: self.column_names, file_io: self.table.file_io().clone(), @@ -336,6 +338,8 @@ impl<'a> TableScanBuilder<'a> { /// Table scan. #[derive(Debug)] pub struct TableScan { + table: TableIdent, + /// A [PlanContext], if this table has at least one snapshot, otherwise None. /// /// If this is None, then the scan contains no rows. From 1b8e8c6e453cc3bcbf4b43487824e276c5f9351a Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 29 Jun 2025 23:58:22 +0200 Subject: [PATCH 11/25] Send metrics report Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/metrics.rs | 80 ++++++++++++++++++++++++++++++ crates/iceberg/src/scan/mod.rs | 64 +++++++++++++++++++++++- 2 files changed, 142 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/scan/metrics.rs b/crates/iceberg/src/scan/metrics.rs index a90af5355a..cc92f4eb50 100644 --- a/crates/iceberg/src/scan/metrics.rs +++ b/crates/iceberg/src/scan/metrics.rs @@ -15,6 +15,15 @@ // specific language governing permissions and limitations // under the License. +use std::time::Instant; + +use futures::StreamExt; +use futures::channel::mpsc::Receiver; +use futures::channel::oneshot; + +use crate::delete_file_index::DeleteIndexMetrics; +use crate::metrics::ScanMetrics; + /// Subset of [ScanMetrics] produced by manifest-handling functions. #[derive(Default)] pub(crate) struct ManifestMetrics { @@ -31,3 +40,74 @@ pub(crate) enum FileMetricsUpdate { Skipped, Scanned { size_in_bytes: u64 }, } + +/// Receives metrics updates from different sources and combines them into the +/// [ScanMetrics] struct used for reporting. +pub(crate) async fn aggregate_metrics( + planning_start: Instant, + manifest_metrics: ManifestMetrics, + mut data_file_metrics_rx: Receiver, + mut delete_file_metrics_rx: Receiver, + index_metrics_rx: oneshot::Receiver, +) -> ScanMetrics { + // TODO: Double-check the order of blocking operations. We should start with + // result streams that we need to unblock first. This is because we attach + // some concurrency limit to each metrics channel and we currently + // sequentially read from each one to prevent a lock on the metrics struct. + + let mut result_data_files = 0; + let mut total_file_size_in_bytes = 0; + let mut skipped_data_files = 0; + while let Some(data_file) = data_file_metrics_rx.next().await { + match data_file { + FileMetricsUpdate::Scanned { size_in_bytes } => { + result_data_files += 1; + total_file_size_in_bytes += size_in_bytes; + } + FileMetricsUpdate::Skipped => skipped_data_files += 1, + } + } + + let mut result_delete_files = 0; + let mut total_delete_file_size_in_bytes = 0; + let mut skipped_delete_files = 0; + while let Some(delete_file) = delete_file_metrics_rx.next().await { + match delete_file { + FileMetricsUpdate::Scanned { size_in_bytes } => { + result_delete_files += 1; + total_delete_file_size_in_bytes += size_in_bytes; + } + FileMetricsUpdate::Skipped => skipped_delete_files += 1, + } + } + + let index_metrics = index_metrics_rx.await.unwrap(); + + // Only now (after consuming all metrics updates) do we know that + // all concurrent work is finished and we can stop timing the + // planning phase. + let total_planning_duration = planning_start.elapsed(); + + ScanMetrics { + total_planning_duration, + + total_data_manifests: manifest_metrics.total_data_manifests, + total_delete_manifests: manifest_metrics.total_delete_manifests, + skipped_data_manifests: manifest_metrics.skipped_data_manifests, + skipped_delete_manifests: manifest_metrics.skipped_delete_manifests, + scanned_data_manifests: manifest_metrics.scanned_data_manifests, + scanned_delete_manifests: manifest_metrics.scanned_delete_manifests, + + result_data_files, + skipped_data_files, + total_file_size_in_bytes, + + result_delete_files, + skipped_delete_files, + total_delete_file_size_in_bytes, + + indexed_delete_files: index_metrics.indexed_delete_files, + equality_delete_files: index_metrics.equality_delete_files, + positional_delete_files: index_metrics.positional_delete_files, + } +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 85177c363f..77a1ef7504 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -21,10 +21,12 @@ mod cache; use cache::*; mod context; use context::*; +use futures::channel::oneshot; mod metrics; mod task; use std::sync::Arc; +use std::time::Instant; use arrow_array::RecordBatch; use futures::channel::mpsc::{Receiver, Sender, channel}; @@ -37,9 +39,9 @@ use crate::delete_file_index::DeleteFileIndex, DeleteIndexMetrics}; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; -use crate::metrics::{LoggingMetricsReporter, MetricsReporter}; +use crate::metrics::{LoggingMetricsReporter, MetricsReport, MetricsReporter}; use crate::runtime::{JoinHandle, spawn}; -use crate::scan::metrics::{FileMetricsUpdate, ManifestMetrics}; +use crate::scan::metrics::{FileMetricsUpdate, ManifestMetrics, aggregate_metrics}; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::utils::available_parallelism; @@ -372,6 +374,9 @@ impl TableScan { return Ok(Box::pin(futures::stream::empty())); }; + // Initialize scan metrics for reporting. + let plan_start_time = Instant::now(); + // used to stream ManifestEntryContexts between stages of the file plan operation let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = channel(self.concurrency_limit_manifest_files); @@ -412,6 +417,16 @@ impl TableScan { let (data_file_metrics_tx, data_file_metrics_rx) = channel(1); self.process_manifest_entries(manifest_entry_data_ctx_rx, result_tx, data_file_metrics_tx); + self.report_metrics( + plan_start_time, + plan_context, + data_file_metrics_rx, + delete_file_metrics_rx, + index_metrics_rx, + manifest_metrics, + ) + .await; + Ok(file_scan_task_rx.boxed()) } @@ -499,6 +514,51 @@ impl TableScan { }); } + async fn report_metrics( + &self, + plan_start_time: Instant, + plan_context: &PlanContext, + data_file_metrics_rx: Receiver, + delete_file_metrics_rx: Receiver, + index_metrics_rx: oneshot::Receiver, + manifest_metrics: ManifestMetrics, + ) { + let table = self.table.clone(); + let snapshot_id = plan_context.snapshot.snapshot_id(); + let filter = plan_context.predicate.clone(); + let schema_id = plan_context.snapshot_schema.schema_id(); + let projected_field_ids = plan_context.field_ids.clone(); + let projected_field_names = self.column_names.clone(); + + let metrics_reporter = Arc::clone(&self.metrics_reporter); + spawn({ + let metrics = aggregate_metrics( + plan_start_time, + manifest_metrics, + data_file_metrics_rx, + delete_file_metrics_rx, + index_metrics_rx, + ) + .await; + + async move { + let report = MetricsReport::Scan { + table, + snapshot_id, + filter, + schema_id, + projected_field_ids, + projected_field_names, + metadata: HashMap::new(), + metrics: Arc::new(metrics), + }; + + metrics_reporter.report(report).await; + } + }) + .await; + } + /// Returns an [`ArrowRecordBatchStream`]. pub async fn to_arrow(&self) -> Result { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) From ddc9627ea06b14e27b695d3642e570aca48fc514 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Mon, 30 Jun 2025 00:14:38 +0200 Subject: [PATCH 12/25] Add missing brackets around import Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 77a1ef7504..5469db417f 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -25,6 +25,7 @@ use futures::channel::oneshot; mod metrics; mod task; +use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -35,7 +36,7 @@ use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; use crate::arrow::ArrowReaderBuilder; -use crate::delete_file_index::DeleteFileIndex, DeleteIndexMetrics}; +use crate::delete_file_index::{DeleteFileIndex, DeleteIndexMetrics}; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; From f1e598dd1442060fec479a9e6111120f545111ac Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Mon, 30 Jun 2025 00:16:49 +0200 Subject: [PATCH 13/25] Test metrics reporting Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/mod.rs | 112 ++++++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 5469db417f..7604048ccd 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -717,11 +717,12 @@ pub mod tests { use std::collections::HashMap; use std::fs; use std::fs::File; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; use arrow_array::{ ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, }; + use async_trait::async_trait; use futures::{TryStreamExt, stream}; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; @@ -734,6 +735,7 @@ pub mod tests { use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; + use crate::metrics::{MetricsReport, MetricsReporter, ScanMetrics}; use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, @@ -745,6 +747,7 @@ pub mod tests { pub struct TableTestFixture { pub table_location: String, pub table: Table, + metrics_reporter: Arc, } impl TableTestFixture { @@ -777,17 +780,21 @@ pub mod tests { serde_json::from_str::(&metadata_json).unwrap() }; + let metrics_reporter = Arc::new(TestMetricsReporter::new()); + let table = Table::builder() .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) .file_io(file_io.clone()) .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .metrics_reporter(Arc::clone(&metrics_reporter) as Arc) .build() .unwrap(); Self { table_location: table_location.to_str().unwrap().to_string(), table, + metrics_reporter, } } @@ -816,17 +823,21 @@ pub mod tests { serde_json::from_str::(&metadata_json).unwrap() }; + let metrics_reporter = Arc::new(TestMetricsReporter::new()); + let table = Table::builder() .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) .file_io(file_io.clone()) .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .metrics_reporter(Arc::clone(&metrics_reporter) as Arc) .build() .unwrap(); Self { table_location: table_location.to_str().unwrap().to_string(), table, + metrics_reporter, } } @@ -865,17 +876,21 @@ pub mod tests { .partition_specs .insert(0, table_metadata.default_spec.clone()); + let metrics_reporter = Arc::new(TestMetricsReporter::new()); + let table = Table::builder() .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) .file_io(file_io.clone()) .metadata_location(table_metadata1_location.to_str().unwrap()) + .metrics_reporter(Arc::clone(&metrics_reporter) as Arc) .build() .unwrap(); Self { table_location: table_location.to_str().unwrap().to_string(), table, + metrics_reporter, } } @@ -1314,6 +1329,27 @@ pub mod tests { } } + #[derive(Debug)] + struct TestMetricsReporter { + last_report: Mutex>, + } + + #[async_trait] + impl MetricsReporter for TestMetricsReporter { + async fn report(&self, report: MetricsReport) { + let mut guard = self.last_report.lock().unwrap(); + *guard = Some(report); + } + } + + impl TestMetricsReporter { + fn new() -> Self { + Self { + last_report: Mutex::new(None), + } + } + } + #[test] fn test_table_scan_columns() { let table = TableTestFixture::new().table; @@ -1952,4 +1988,78 @@ pub mod tests { }; test_fn(task); } + + #[tokio::test] + async fn test_metrics_reporter() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let table_scan = fixture.table.scan().select(["x", "y"]).build().unwrap(); + + // Consume the table scan's results to finish the planning process, and + // send a report. + let _batches: Vec<_> = table_scan + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let report_guard = fixture.metrics_reporter.last_report.lock().unwrap(); + assert!(report_guard.is_some()); + let report = report_guard.as_ref().unwrap(); + + match report { + MetricsReport::Scan { + table, + snapshot_id, + filter, + schema_id, + projected_field_ids, + projected_field_names, + metrics, + metadata, + } => { + assert_eq!(table, fixture.table.identifier()); + assert_eq!( + snapshot_id, + &fixture.table.metadata().current_snapshot_id().unwrap() + ); + assert!(filter.is_none()); + assert_eq!(schema_id, &fixture.table.metadata().current_schema_id); + assert_eq!(projected_field_ids, &Arc::new(vec![1, 2])); + assert_eq!( + projected_field_names, + &Some(vec!["x".to_string(), "y".to_string()]) + ); + + assert_metrics(metrics); + + assert!(metadata.is_empty()) + } + } + } + + fn assert_metrics(metrics: &Arc) { + assert!(!metrics.total_planning_duration.is_zero()); + assert_eq!(metrics.total_data_manifests, 1); + assert_eq!(metrics.total_delete_manifests, 0); + assert_eq!(metrics.skipped_data_manifests, 0); + assert_eq!(metrics.skipped_delete_manifests, 0); + assert_eq!(metrics.scanned_data_manifests, 1); + assert_eq!(metrics.scanned_delete_manifests, 0); + + assert_eq!(metrics.result_data_files, 2); + assert_eq!(metrics.skipped_data_files, 1); + assert_eq!(metrics.total_file_size_in_bytes, 200); + + assert_eq!(metrics.result_delete_files, 0); + assert_eq!(metrics.skipped_delete_files, 0); + assert_eq!(metrics.total_delete_file_size_in_bytes, 0); + + assert_eq!(metrics.indexed_delete_files, 0); + assert_eq!(metrics.equality_delete_files, 0); + assert_eq!(metrics.positional_delete_files, 0); + } } From 08f72bd539d12abce5bac2575d705fe5f840a50e Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Mon, 30 Jun 2025 00:19:04 +0200 Subject: [PATCH 14/25] Move stream writing outside of processing functions Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/mod.rs | 97 ++++++++++++++++------------------ 1 file changed, 47 insertions(+), 50 deletions(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 7604048ccd..71a6364477 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -463,14 +463,23 @@ impl TableScan { .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone(), metrics_update_tx.clone()))) .try_for_each_concurrent( concurrency_limit, - |(manifest_entry_context, file_tx, metrics_tx)| async move { + |(manifest_entry, mut file_tx, mut metrics_tx)| async move { spawn(async move { - Self::process_delete_manifest_entry( - manifest_entry_context, - file_tx, - metrics_tx, - ) - .await + let delete_file = + Self::filter_delete_manifest_entry(manifest_entry).await?; + + let metrics_update = if let Some(delete_file) = delete_file { + let size_in_bytes = delete_file.manifest_entry.file_size_in_bytes(); + file_tx.send(delete_file).await?; + + FileMetricsUpdate::Scanned { size_in_bytes } + } else { + FileMetricsUpdate::Skipped + }; + + metrics_tx.send(metrics_update).await?; + + Ok(()) }) .await }, @@ -495,14 +504,22 @@ impl TableScan { .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone(), metrics_update_tx.clone()))) .try_for_each_concurrent( concurrency_limit, - |(manifest_entry_context, task_tx, metrics_tx)| async move { + |(manifest_entry, mut task_tx, mut metrics_tx)| async move { spawn(async move { - Self::process_data_manifest_entry( - manifest_entry_context, - task_tx, - metrics_tx, - ) - .await + let file_task = + Self::filter_data_manifest_entry(manifest_entry).await?; + + let metrics_update = if let Some(file_task) = file_task { + let size_in_bytes = file_task.length; + task_tx.send(Ok(file_task)).await?; + + FileMetricsUpdate::Scanned { size_in_bytes } + } else { + FileMetricsUpdate::Skipped + }; + + metrics_tx.send(metrics_update).await?; + Ok(()) }) .await }, @@ -587,15 +604,12 @@ impl TableScan { self.plan_context.as_ref().map(|x| &x.snapshot) } - async fn process_data_manifest_entry( + async fn filter_data_manifest_entry( manifest_entry_context: ManifestEntryContext, - mut file_scan_task_tx: Sender>, - mut metrics_tx: Sender, - ) -> Result<()> { + ) -> Result> { // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { - metrics_tx.send(FileMetricsUpdate::Skipped).await?; - return Ok(()); + return Ok(None); } // abort the plan if we encounter a manifest entry for a delete file @@ -623,8 +637,7 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { - metrics_tx.send(FileMetricsUpdate::Skipped).await?; - return Ok(()); + return Ok(None); } // skip any data file whose metrics don't match this scan's filter @@ -633,34 +646,24 @@ impl TableScan { manifest_entry_context.manifest_entry.data_file(), false, )? { - metrics_tx.send(FileMetricsUpdate::Skipped).await?; - return Ok(()); + return Ok(None); } } // congratulations! the manifest entry has made its way through the // entire plan without getting filtered out. Create a corresponding // FileScanTask and push it to the result stream - let size_in_bytes = manifest_entry_context.manifest_entry.file_size_in_bytes(); let file_scan_task = manifest_entry_context.into_file_scan_task().await?; - file_scan_task_tx.send(Ok(file_scan_task)).await?; - metrics_tx - .send(FileMetricsUpdate::Scanned { size_in_bytes }) - .await?; - - Ok(()) + Ok(Some(file_scan_task)) } - async fn process_delete_manifest_entry( + async fn filter_delete_manifest_entry( manifest_entry_context: ManifestEntryContext, - mut delete_file_ctx_tx: Sender, - mut metrics_tx: Sender, - ) -> Result<()> { + ) -> Result> { // skip processing this manifest entry if it has been marked as deleted if !manifest_entry_context.manifest_entry.is_alive() { - metrics_tx.send(FileMetricsUpdate::Skipped).await?; - return Ok(()); + return Ok(None); } // abort the plan if we encounter a manifest entry that is not for a delete file @@ -683,24 +686,18 @@ impl TableScan { // skip any data file whose partition data indicates that it can't contain // any data that matches this scan's filter if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? { - metrics_tx.send(FileMetricsUpdate::Skipped).await?; - return Ok(()); + return Ok(None); } } - delete_file_ctx_tx - .send(DeleteFileContext { - manifest_entry: manifest_entry_context.manifest_entry.clone(), - partition_spec_id: manifest_entry_context.partition_spec_id, - }) - .await?; + let delete_file_ctx = DeleteFileContext { + manifest_entry: manifest_entry_context.manifest_entry.clone(), + partition_spec_id: manifest_entry_context.partition_spec_id, + }; - let size_in_bytes = manifest_entry_context.manifest_entry.file_size_in_bytes(); - metrics_tx - .send(FileMetricsUpdate::Scanned { size_in_bytes }) - .await?; + // let size_in_bytes = manifest_entry_context.manifest_entry.file_size_in_bytes(); - Ok(()) + Ok(Some(delete_file_ctx)) } } From 683ad4f1c0e09f54b14329ad1e80050995cd2317 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Wed, 2 Jul 2025 00:27:26 +0200 Subject: [PATCH 15/25] Replace Box with Arc Signed-off-by: Jannik Steinmann --- crates/iceberg/src/metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/metrics.rs b/crates/iceberg/src/metrics.rs index a5e9c383c4..505f46036f 100644 --- a/crates/iceberg/src/metrics.rs +++ b/crates/iceberg/src/metrics.rs @@ -62,7 +62,7 @@ pub(crate) enum MetricsReport { // TODO: We could default to listing all field names, if all are selected // check what Java is doing. projected_field_names: Option>, - metrics: Box, + metrics: Arc, metadata: HashMap, }, } From 876c7088ffd6fed624183850733a6ef74b56c78c Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 6 Jul 2025 11:39:55 +0200 Subject: [PATCH 16/25] Rever vec comment Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/context.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 569814c441..8ccbcd2c4c 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -190,6 +190,13 @@ impl PlanContext { ) -> Result<(Vec>, ManifestMetrics)> { let manifest_files = manifest_list.entries().iter(); + // TODO: Ideally we could ditch this intermediate Vec as we can return + // an iterator over the results. Updates to the manifest metrics somewhat + // complicate this because they need to be serialized somewhere, and an + // iterator can't easily take ownership of the metrics. + // A vec allows us to apply the mutations within this function. + // A vec also implicitly implements Send and Sync, meaning we can pass + // it around more easily in the concurrent planning step. let mut filtered_mfcs = vec![]; let mut metrics = ManifestMetrics::default(); From c74e855bf6b225a0a564a147e9d44c430b5f15c0 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 6 Jul 2025 12:07:27 +0200 Subject: [PATCH 17/25] Move JoinHandle for delete index metrics Signed-off-by: Jannik Steinmann --- crates/iceberg/src/delete_file_index.rs | 19 ++++++++++--------- crates/iceberg/src/scan/metrics.rs | 5 +++-- crates/iceberg/src/scan/mod.rs | 10 +++++----- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 6ec7e37354..535b6dd22a 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -21,11 +21,10 @@ use std::sync::{Arc, RwLock}; use futures::StreamExt; use futures::channel::mpsc::{Sender, channel}; -use futures::channel::oneshot; use itertools::Itertools; use tokio::sync::Notify; -use crate::runtime::spawn; +use crate::runtime::{JoinHandle, spawn}; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; @@ -61,11 +60,15 @@ pub(crate) struct DeleteIndexMetrics { } impl DeleteFileIndex { - /// create a new `DeleteFileIndex` along with the sender that populates it with delete files + /// Create a new `DeleteFileIndex` along with the sender that populates it + /// with delete files + /// + /// It will asynchronously wait for all delete files to come in before it + /// starts indexing. pub(crate) fn new() -> ( DeleteFileIndex, Sender, - oneshot::Receiver, + JoinHandle, ) { // TODO: what should the channel limit be? let (delete_file_tx, delete_file_rx) = channel(10); @@ -75,9 +78,7 @@ impl DeleteFileIndex { ))); let delete_file_stream = delete_file_rx.boxed(); - let (metrics_tx, metrics_rx) = oneshot::channel(); - - spawn({ + let metrics_handle = spawn({ let state = state.clone(); async move { let delete_files = delete_file_stream.collect::>().await; @@ -92,11 +93,11 @@ impl DeleteFileIndex { } notify.notify_waiters(); - metrics_tx.send(metrics).unwrap(); + metrics } }); - (DeleteFileIndex { state }, delete_file_tx, metrics_rx) + (DeleteFileIndex { state }, delete_file_tx, metrics_handle) } /// Gets all the delete files that apply to the specified data file. diff --git a/crates/iceberg/src/scan/metrics.rs b/crates/iceberg/src/scan/metrics.rs index cc92f4eb50..fd94d012d7 100644 --- a/crates/iceberg/src/scan/metrics.rs +++ b/crates/iceberg/src/scan/metrics.rs @@ -23,6 +23,7 @@ use futures::channel::oneshot; use crate::delete_file_index::DeleteIndexMetrics; use crate::metrics::ScanMetrics; +use crate::runtime::JoinHandle; /// Subset of [ScanMetrics] produced by manifest-handling functions. #[derive(Default)] @@ -48,7 +49,7 @@ pub(crate) async fn aggregate_metrics( manifest_metrics: ManifestMetrics, mut data_file_metrics_rx: Receiver, mut delete_file_metrics_rx: Receiver, - index_metrics_rx: oneshot::Receiver, + index_metrics_handle: JoinHandle, ) -> ScanMetrics { // TODO: Double-check the order of blocking operations. We should start with // result streams that we need to unblock first. This is because we attach @@ -81,7 +82,7 @@ pub(crate) async fn aggregate_metrics( } } - let index_metrics = index_metrics_rx.await.unwrap(); + let index_metrics = index_metrics_handle.await; // Only now (after consuming all metrics updates) do we know that // all concurrent work is finished and we can stop timing the diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 71a6364477..9da8612bfe 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -21,7 +21,6 @@ mod cache; use cache::*; mod context; use context::*; -use futures::channel::oneshot; mod metrics; mod task; @@ -31,6 +30,7 @@ use std::time::Instant; use arrow_array::RecordBatch; use futures::channel::mpsc::{Receiver, Sender, channel}; +use futures::channel::oneshot; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; @@ -384,7 +384,7 @@ impl TableScan { let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = channel(self.concurrency_limit_manifest_files); - let (delete_file_idx, delete_file_tx, index_metrics_rx) = DeleteFileIndex::new(); + let (delete_file_idx, delete_file_tx, index_metrics_handle) = DeleteFileIndex::new(); let manifest_list = plan_context.get_manifest_list().await?; @@ -423,7 +423,7 @@ impl TableScan { plan_context, data_file_metrics_rx, delete_file_metrics_rx, - index_metrics_rx, + index_metrics_handle, manifest_metrics, ) .await; @@ -538,7 +538,7 @@ impl TableScan { plan_context: &PlanContext, data_file_metrics_rx: Receiver, delete_file_metrics_rx: Receiver, - index_metrics_rx: oneshot::Receiver, + index_metrics_handle: JoinHandle, manifest_metrics: ManifestMetrics, ) { let table = self.table.clone(); @@ -555,7 +555,7 @@ impl TableScan { manifest_metrics, data_file_metrics_rx, delete_file_metrics_rx, - index_metrics_rx, + index_metrics_handle, ) .await; From 90ce07cc9d6e8df984570b132b26cf8a30277342 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 6 Jul 2025 12:15:20 +0200 Subject: [PATCH 18/25] Use JoinHandle for delete file metrics Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/metrics.rs | 30 +++++++---------- crates/iceberg/src/scan/mod.rs | 52 +++++++++++++++++++----------- 2 files changed, 46 insertions(+), 36 deletions(-) diff --git a/crates/iceberg/src/scan/metrics.rs b/crates/iceberg/src/scan/metrics.rs index fd94d012d7..2f4a649f73 100644 --- a/crates/iceberg/src/scan/metrics.rs +++ b/crates/iceberg/src/scan/metrics.rs @@ -19,7 +19,6 @@ use std::time::Instant; use futures::StreamExt; use futures::channel::mpsc::Receiver; -use futures::channel::oneshot; use crate::delete_file_index::DeleteIndexMetrics; use crate::metrics::ScanMetrics; @@ -36,6 +35,13 @@ pub(crate) struct ManifestMetrics { pub(crate) scanned_delete_manifests: u32, } +#[derive(Default)] +pub(crate) struct DeleteFileMetrics { + pub(crate) result_delete_files: u32, + pub(crate) skipped_delete_files: u32, + pub(crate) total_delete_file_size_in_bytes: u64, +} + /// Represents an update to a single data or delete file. pub(crate) enum FileMetricsUpdate { Skipped, @@ -48,7 +54,7 @@ pub(crate) async fn aggregate_metrics( planning_start: Instant, manifest_metrics: ManifestMetrics, mut data_file_metrics_rx: Receiver, - mut delete_file_metrics_rx: Receiver, + delete_file_metrics_handle: JoinHandle, index_metrics_handle: JoinHandle, ) -> ScanMetrics { // TODO: Double-check the order of blocking operations. We should start with @@ -69,19 +75,7 @@ pub(crate) async fn aggregate_metrics( } } - let mut result_delete_files = 0; - let mut total_delete_file_size_in_bytes = 0; - let mut skipped_delete_files = 0; - while let Some(delete_file) = delete_file_metrics_rx.next().await { - match delete_file { - FileMetricsUpdate::Scanned { size_in_bytes } => { - result_delete_files += 1; - total_delete_file_size_in_bytes += size_in_bytes; - } - FileMetricsUpdate::Skipped => skipped_delete_files += 1, - } - } - + let delete_file_metrics = delete_file_metrics_handle.await; let index_metrics = index_metrics_handle.await; // Only now (after consuming all metrics updates) do we know that @@ -103,9 +97,9 @@ pub(crate) async fn aggregate_metrics( skipped_data_files, total_file_size_in_bytes, - result_delete_files, - skipped_delete_files, - total_delete_file_size_in_bytes, + result_delete_files: delete_file_metrics.result_delete_files, + skipped_delete_files: delete_file_metrics.skipped_delete_files, + total_delete_file_size_in_bytes: delete_file_metrics.total_delete_file_size_in_bytes, indexed_delete_files: index_metrics.indexed_delete_files, equality_delete_files: index_metrics.equality_delete_files, diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 9da8612bfe..0e5a709416 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -30,7 +30,6 @@ use std::time::Instant; use arrow_array::RecordBatch; use futures::channel::mpsc::{Receiver, Sender, channel}; -use futures::channel::oneshot; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; @@ -42,7 +41,9 @@ use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::metrics::{LoggingMetricsReporter, MetricsReport, MetricsReporter}; use crate::runtime::{JoinHandle, spawn}; -use crate::scan::metrics::{FileMetricsUpdate, ManifestMetrics, aggregate_metrics}; +use crate::scan::metrics::{ + DeleteFileMetrics, FileMetricsUpdate, ManifestMetrics, aggregate_metrics, +}; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::utils::available_parallelism; @@ -404,15 +405,12 @@ impl TableScan { // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s self.load_manifests(manifest_file_contexts, result_tx.clone()); - // Process the delete file [`ManifestEntry`] stream in parallel - let (delete_file_metrics_tx, delete_file_metrics_rx) = channel(1); - self.process_delete_manifest_entries( + let (_delete_file_handle, delete_file_metrics_handle) = self + .process_delete_manifest_entries( manifest_entry_delete_ctx_rx, delete_file_tx, result_tx.clone(), - delete_file_metrics_tx, - ) - .await; + ); // Process the data file [`ManifestEntry`] stream in parallel let (data_file_metrics_tx, data_file_metrics_rx) = channel(1); @@ -422,7 +420,7 @@ impl TableScan { plan_start_time, plan_context, data_file_metrics_rx, - delete_file_metrics_rx, + delete_file_metrics_handle, index_metrics_handle, manifest_metrics, ) @@ -455,18 +453,34 @@ impl TableScan { manifest_entry_rx: Receiver, delete_file_tx: Sender, mut error_tx: Sender>, - metrics_update_tx: Sender, - ) -> JoinHandle<()> { + ) -> (JoinHandle<()>, JoinHandle) { let concurrency_limit = self.concurrency_limit_manifest_entries; - spawn(async move { + + let (metrics_update_tx, mut metrics_update_rx) = channel(1); + let metrics_handle = spawn(async move { + let mut accumulator = DeleteFileMetrics::default(); + + while let Some(update) = metrics_update_rx.next().await { + match update { + FileMetricsUpdate::Skipped => accumulator.skipped_delete_files += 1, + FileMetricsUpdate::Scanned { size_in_bytes } => { + accumulator.total_delete_file_size_in_bytes += size_in_bytes; + accumulator.result_delete_files += 1; + } + } + } + + accumulator + }); + + let delete_file_handle = spawn(async move { let result = manifest_entry_rx .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone(), metrics_update_tx.clone()))) .try_for_each_concurrent( concurrency_limit, |(manifest_entry, mut file_tx, mut metrics_tx)| async move { spawn(async move { - let delete_file = - Self::filter_delete_manifest_entry(manifest_entry).await?; + let delete_file = Self::filter_delete_manifest_entry(manifest_entry)?; let metrics_update = if let Some(delete_file) = delete_file { let size_in_bytes = delete_file.manifest_entry.file_size_in_bytes(); @@ -489,7 +503,9 @@ impl TableScan { if let Err(error) = result { let _ = error_tx.send(Err(error)).await; } - }) + }); + + (delete_file_handle, metrics_handle) } fn process_manifest_entries( @@ -537,7 +553,7 @@ impl TableScan { plan_start_time: Instant, plan_context: &PlanContext, data_file_metrics_rx: Receiver, - delete_file_metrics_rx: Receiver, + delete_file_metrics_handle: JoinHandle, index_metrics_handle: JoinHandle, manifest_metrics: ManifestMetrics, ) { @@ -554,7 +570,7 @@ impl TableScan { plan_start_time, manifest_metrics, data_file_metrics_rx, - delete_file_metrics_rx, + delete_file_metrics_handle, index_metrics_handle, ) .await; @@ -658,7 +674,7 @@ impl TableScan { Ok(Some(file_scan_task)) } - async fn filter_delete_manifest_entry( + fn filter_delete_manifest_entry( manifest_entry_context: ManifestEntryContext, ) -> Result> { // skip processing this manifest entry if it has been marked as deleted From 630cc53c4ed448c6f451aa7db9e00ff07f4eef3a Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Sun, 6 Jul 2025 12:54:18 +0200 Subject: [PATCH 19/25] Use JoinHandle for data file metrics and refactor Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/metrics.rs | 103 +++++++++++++++-------------- crates/iceberg/src/scan/mod.rs | 94 ++++++++++++-------------- 2 files changed, 95 insertions(+), 102 deletions(-) diff --git a/crates/iceberg/src/scan/metrics.rs b/crates/iceberg/src/scan/metrics.rs index 2f4a649f73..307f25b3db 100644 --- a/crates/iceberg/src/scan/metrics.rs +++ b/crates/iceberg/src/scan/metrics.rs @@ -24,57 +24,17 @@ use crate::delete_file_index::DeleteIndexMetrics; use crate::metrics::ScanMetrics; use crate::runtime::JoinHandle; -/// Subset of [ScanMetrics] produced by manifest-handling functions. -#[derive(Default)] -pub(crate) struct ManifestMetrics { - pub(crate) total_data_manifests: u32, - pub(crate) total_delete_manifests: u32, - pub(crate) skipped_data_manifests: u32, - pub(crate) skipped_delete_manifests: u32, - pub(crate) scanned_data_manifests: u32, - pub(crate) scanned_delete_manifests: u32, -} - -#[derive(Default)] -pub(crate) struct DeleteFileMetrics { - pub(crate) result_delete_files: u32, - pub(crate) skipped_delete_files: u32, - pub(crate) total_delete_file_size_in_bytes: u64, -} - -/// Represents an update to a single data or delete file. -pub(crate) enum FileMetricsUpdate { - Skipped, - Scanned { size_in_bytes: u64 }, -} - -/// Receives metrics updates from different sources and combines them into the +/// Awaits metrics updates from different sources and combines them into the /// [ScanMetrics] struct used for reporting. pub(crate) async fn aggregate_metrics( planning_start: Instant, manifest_metrics: ManifestMetrics, - mut data_file_metrics_rx: Receiver, - delete_file_metrics_handle: JoinHandle, + data_file_metrics_handle: JoinHandle, + delete_file_metrics_handle: JoinHandle, index_metrics_handle: JoinHandle, ) -> ScanMetrics { - // TODO: Double-check the order of blocking operations. We should start with - // result streams that we need to unblock first. This is because we attach - // some concurrency limit to each metrics channel and we currently - // sequentially read from each one to prevent a lock on the metrics struct. - - let mut result_data_files = 0; - let mut total_file_size_in_bytes = 0; - let mut skipped_data_files = 0; - while let Some(data_file) = data_file_metrics_rx.next().await { - match data_file { - FileMetricsUpdate::Scanned { size_in_bytes } => { - result_data_files += 1; - total_file_size_in_bytes += size_in_bytes; - } - FileMetricsUpdate::Skipped => skipped_data_files += 1, - } - } - + // TODO: Consider joining them instead. + let data_file_metrics = data_file_metrics_handle.await; let delete_file_metrics = delete_file_metrics_handle.await; let index_metrics = index_metrics_handle.await; @@ -93,16 +53,57 @@ pub(crate) async fn aggregate_metrics( scanned_data_manifests: manifest_metrics.scanned_data_manifests, scanned_delete_manifests: manifest_metrics.scanned_delete_manifests, - result_data_files, - skipped_data_files, - total_file_size_in_bytes, + result_data_files: data_file_metrics.result_files, + skipped_data_files: data_file_metrics.skipped_files, + total_file_size_in_bytes: data_file_metrics.total_file_size_in_bytes, - result_delete_files: delete_file_metrics.result_delete_files, - skipped_delete_files: delete_file_metrics.skipped_delete_files, - total_delete_file_size_in_bytes: delete_file_metrics.total_delete_file_size_in_bytes, + result_delete_files: delete_file_metrics.result_files, + skipped_delete_files: delete_file_metrics.skipped_files, + total_delete_file_size_in_bytes: delete_file_metrics.total_file_size_in_bytes, indexed_delete_files: index_metrics.indexed_delete_files, equality_delete_files: index_metrics.equality_delete_files, positional_delete_files: index_metrics.positional_delete_files, } } + +/// Subset of [ScanMetrics] produced by manifest-handling functions. +#[derive(Default)] +pub(crate) struct ManifestMetrics { + pub(crate) total_data_manifests: u32, + pub(crate) total_delete_manifests: u32, + pub(crate) skipped_data_manifests: u32, + pub(crate) skipped_delete_manifests: u32, + pub(crate) scanned_data_manifests: u32, + pub(crate) scanned_delete_manifests: u32, +} + +/// Subset of [ScanMetrics] produced by file-handling functions. +#[derive(Default)] +pub(crate) struct FileMetrics { + result_files: u32, + skipped_files: u32, + total_file_size_in_bytes: u64, +} + +impl FileMetrics { + pub(crate) async fn accumulate(mut updates: Receiver) -> Self { + let mut accumulator = Self::default(); + while let Some(update) = updates.next().await { + match update { + FileMetricsUpdate::Skipped => accumulator.skipped_files += 1, + FileMetricsUpdate::Scanned { size_in_bytes } => { + accumulator.total_file_size_in_bytes += size_in_bytes; + accumulator.result_files += 1; + } + } + } + accumulator + } +} + +/// Represents an update to a single data or delete file. +pub(crate) enum FileMetricsUpdate { + Skipped, + Scanned { size_in_bytes: u64 }, +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 0e5a709416..d95fcb089a 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -41,9 +41,7 @@ use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::metrics::{LoggingMetricsReporter, MetricsReport, MetricsReporter}; use crate::runtime::{JoinHandle, spawn}; -use crate::scan::metrics::{ - DeleteFileMetrics, FileMetricsUpdate, ManifestMetrics, aggregate_metrics, -}; +use crate::scan::metrics::{FileMetrics, FileMetricsUpdate, ManifestMetrics, aggregate_metrics}; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::utils::available_parallelism; @@ -405,21 +403,25 @@ impl TableScan { // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s self.load_manifests(manifest_file_contexts, result_tx.clone()); - let (_delete_file_handle, delete_file_metrics_handle) = self - .process_delete_manifest_entries( - manifest_entry_delete_ctx_rx, - delete_file_tx, - result_tx.clone(), + let (delete_manifests_handle, delete_file_metrics_handle) = self + .spawn_process_delete_manifest_entries( + manifest_entry_delete_ctx_rx, + delete_file_tx, + result_tx.clone(), ); // Process the data file [`ManifestEntry`] stream in parallel let (data_file_metrics_tx, data_file_metrics_rx) = channel(1); - self.process_manifest_entries(manifest_entry_data_ctx_rx, result_tx, data_file_metrics_tx); + let (_handle, data_file_metrics_handle) = self.spawn_process_manifest_entries( + manifest_entry_data_ctx_rx, + result_tx, + data_file_metrics_tx, + ); self.report_metrics( plan_start_time, plan_context, - data_file_metrics_rx, + data_file_metrics_handle, delete_file_metrics_handle, index_metrics_handle, manifest_metrics, @@ -448,30 +450,16 @@ impl TableScan { }); } - fn process_delete_manifest_entries( + fn spawn_process_delete_manifest_entries( &self, manifest_entry_rx: Receiver, delete_file_tx: Sender, mut error_tx: Sender>, - ) -> (JoinHandle<()>, JoinHandle) { + ) -> (JoinHandle<()>, JoinHandle) { let concurrency_limit = self.concurrency_limit_manifest_entries; - let (metrics_update_tx, mut metrics_update_rx) = channel(1); - let metrics_handle = spawn(async move { - let mut accumulator = DeleteFileMetrics::default(); - - while let Some(update) = metrics_update_rx.next().await { - match update { - FileMetricsUpdate::Skipped => accumulator.skipped_delete_files += 1, - FileMetricsUpdate::Scanned { size_in_bytes } => { - accumulator.total_delete_file_size_in_bytes += size_in_bytes; - accumulator.result_delete_files += 1; - } - } - } - - accumulator - }); + let (metrics_update_tx, metrics_update_rx) = channel(1); + let metrics_handle = spawn(FileMetrics::accumulate(metrics_update_rx)); let delete_file_handle = spawn(async move { let result = manifest_entry_rx @@ -512,10 +500,13 @@ impl TableScan { &self, manifest_entry_rx: Receiver, mut file_scan_task_tx: Sender>, - metrics_update_tx: Sender, - ) { + ) -> (JoinHandle<()>, JoinHandle) { let concurrency_limit = self.concurrency_limit_manifest_entries; - let _handle = spawn(async move { + + let (metrics_update_tx, metrics_update_rx) = channel(1); + let metrics_handle = spawn(FileMetrics::accumulate(metrics_update_rx)); + + let handle = spawn(async move { let result = manifest_entry_rx .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone(), metrics_update_tx.clone()))) .try_for_each_concurrent( @@ -546,17 +537,19 @@ impl TableScan { let _ = file_scan_task_tx.send(Err(error)).await; } }); + + (handle, metrics_handle) } async fn report_metrics( &self, plan_start_time: Instant, plan_context: &PlanContext, - data_file_metrics_rx: Receiver, - delete_file_metrics_handle: JoinHandle, + data_file_metrics_handle: JoinHandle, + delete_file_metrics_handle: JoinHandle, index_metrics_handle: JoinHandle, manifest_metrics: ManifestMetrics, - ) { + ) -> JoinHandle<()> { let table = self.table.clone(); let snapshot_id = plan_context.snapshot.snapshot_id(); let filter = plan_context.predicate.clone(); @@ -565,32 +558,31 @@ impl TableScan { let projected_field_names = self.column_names.clone(); let metrics_reporter = Arc::clone(&self.metrics_reporter); - spawn({ + let handle = spawn(async move { let metrics = aggregate_metrics( plan_start_time, manifest_metrics, - data_file_metrics_rx, + data_file_metrics_handle, delete_file_metrics_handle, index_metrics_handle, ) .await; - async move { - let report = MetricsReport::Scan { - table, - snapshot_id, - filter, - schema_id, - projected_field_ids, - projected_field_names, - metadata: HashMap::new(), - metrics: Arc::new(metrics), - }; + let report = MetricsReport::Scan { + table, + snapshot_id, + filter, + schema_id, + projected_field_ids, + projected_field_names, + metadata: HashMap::new(), + metrics: Arc::new(metrics), + }; - metrics_reporter.report(report).await; - } - }) - .await; + metrics_reporter.report(report).await; + }); + + handle } /// Returns an [`ArrowRecordBatchStream`]. From ed4987d84089afe13d0e4481656dc001bfa7e1c3 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Mon, 7 Jul 2025 19:45:16 +0200 Subject: [PATCH 20/25] Be explicit about JoinHandles and awaits Signed-off-by: Jannik Steinmann Be explicit about spawning --- crates/iceberg/src/scan/mod.rs | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index d95fcb089a..5268d64d74 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -400,8 +400,7 @@ impl TableScan { // used to stream the results back to the caller let (result_tx, file_scan_task_rx) = channel(self.concurrency_limit_manifest_entries); - // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s - self.load_manifests(manifest_file_contexts, result_tx.clone()); + let _handle = self.spawn_fetch_manifests(manifest_file_contexts, result_tx.clone()); let (delete_manifests_handle, delete_file_metrics_handle) = self .spawn_process_delete_manifest_entries( @@ -409,35 +408,30 @@ impl TableScan { delete_file_tx, result_tx.clone(), ); + delete_manifests_handle.await; - // Process the data file [`ManifestEntry`] stream in parallel - let (data_file_metrics_tx, data_file_metrics_rx) = channel(1); - let (_handle, data_file_metrics_handle) = self.spawn_process_manifest_entries( - manifest_entry_data_ctx_rx, - result_tx, - data_file_metrics_tx, - ); + let (_handle, data_file_metrics_handle) = + self.spawn_process_manifest_entries(manifest_entry_data_ctx_rx, result_tx); - self.report_metrics( + let _handle = self.report_metrics( plan_start_time, plan_context, data_file_metrics_handle, delete_file_metrics_handle, index_metrics_handle, manifest_metrics, - ) - .await; + ); Ok(file_scan_task_rx.boxed()) } - fn load_manifests( + fn spawn_fetch_manifests( &self, manifest_files: Vec>, mut error_tx: Sender>, - ) { + ) -> JoinHandle<()> { let concurrency_limit = self.concurrency_limit_manifest_files; - let _handle = spawn(async move { + let handle = spawn(async move { let result = futures::stream::iter(manifest_files) .try_for_each_concurrent(concurrency_limit, |ctx| async move { ctx.fetch_manifest_and_stream_manifest_entries().await @@ -448,6 +442,8 @@ impl TableScan { let _ = error_tx.send(Err(error)).await; } }); + + handle } fn spawn_process_delete_manifest_entries( @@ -461,7 +457,7 @@ impl TableScan { let (metrics_update_tx, metrics_update_rx) = channel(1); let metrics_handle = spawn(FileMetrics::accumulate(metrics_update_rx)); - let delete_file_handle = spawn(async move { + let handle = spawn(async move { let result = manifest_entry_rx .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone(), metrics_update_tx.clone()))) .try_for_each_concurrent( @@ -493,10 +489,10 @@ impl TableScan { } }); - (delete_file_handle, metrics_handle) + (handle, metrics_handle) } - fn process_manifest_entries( + fn spawn_process_manifest_entries( &self, manifest_entry_rx: Receiver, mut file_scan_task_tx: Sender>, @@ -541,7 +537,7 @@ impl TableScan { (handle, metrics_handle) } - async fn report_metrics( + fn report_metrics( &self, plan_start_time: Instant, plan_context: &PlanContext, From 467c56545d69b07a23b81f51be06f871af211de5 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Mon, 7 Jul 2025 22:55:29 +0200 Subject: [PATCH 21/25] Simplify LoggingMetricsReporter Signed-off-by: Jannik Steinmann --- crates/iceberg/src/metrics.rs | 104 ++++++++++++++++++---------------- 1 file changed, 54 insertions(+), 50 deletions(-) diff --git a/crates/iceberg/src/metrics.rs b/crates/iceberg/src/metrics.rs index 505f46036f..17e7e9be46 100644 --- a/crates/iceberg/src/metrics.rs +++ b/crates/iceberg/src/metrics.rs @@ -17,10 +17,10 @@ //! This module contains the metrics reporting API for Iceberg. //! -//! It is used to report table operations in a pluggable way. See [1] for more -//! details. +//! It is used to report table operations in a pluggable way. See the [docs] +//! for more details. //! -//! [1] https://iceberg.apache.org/docs/latest/metrics-reporting +//! [docs] https://iceberg.apache.org/docs/latest/metrics-reporting use std::collections::HashMap; use std::fmt::Debug; @@ -28,9 +28,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use serde::Serialize; -use serde_with::{DurationNanoSeconds, serde_as}; -use tracing::{info, warn}; +use tracing::info; use crate::TableIdent; use crate::expr::Predicate; @@ -50,33 +48,35 @@ pub(crate) trait MetricsReporter: Debug + Send + Sync { } /// An enum of all metrics reports. -#[derive(Debug, Serialize)] +#[derive(Debug)] pub(crate) enum MetricsReport { /// A Table Scan report that contains all relevant information from a Table Scan. Scan { table: TableIdent, snapshot_id: i64, - filter: Option>, schema_id: SchemaId, - projected_field_ids: Arc>, - // TODO: We could default to listing all field names, if all are selected - // check what Java is doing. + + /// If None, the scan is an unfiltered full table scan. + filter: Option>, + + /// If None, the scan projects all fields. + // TODO: We could default to listing all field names in those cases: check what Java is doing. projected_field_names: Option>, + projected_field_ids: Arc>, + metrics: Arc, metadata: HashMap, }, } /// Carries all metrics for a particular scan. -#[serde_as] -#[derive(Debug, Serialize)] +#[derive(Debug)] pub(crate) struct ScanMetrics { - #[serde_as(as = "DurationNanoSeconds")] pub(crate) total_planning_duration: Duration, // Manfiest-level metrics, computed by walking the snapshot's manifest list // file entries and checking which manifests match the scan's predicates. - pub(crate) total_data_manifests: u32, // TODO: Are these really just skipped+scanned? + pub(crate) total_data_manifests: u32, pub(crate) total_delete_manifests: u32, pub(crate) skipped_data_manifests: u32, pub(crate) skipped_delete_manifests: u32, @@ -86,7 +86,7 @@ pub(crate) struct ScanMetrics { // Data file-level metrics. pub(crate) result_data_files: u32, pub(crate) skipped_data_files: u32, - pub(crate) total_file_size_in_bytes: u64, // TODO: should then all be u64s? + pub(crate) total_file_size_in_bytes: u64, // Delete file-level metrics. pub(crate) result_delete_files: u32, @@ -98,37 +98,6 @@ pub(crate) struct ScanMetrics { pub(crate) positional_delete_files: u32, } -// TODO: This impl will provide public accessors for the fields, because -// crate-external implementators will need to access them, while (so far) only -// code within the crate will need to mutate them. -impl ScanMetrics { - pub fn result_data_files(&self) -> Counter { - Counter { - value: self.result_data_files, - unit: "file".to_string(), - } - } - - pub fn result_delete_files(&self) -> Counter { - Counter { - value: self.result_delete_files, - unit: "file".to_string(), - } - } - - pub fn total_data_manifests(&self) -> Counter { - Counter { - value: self.total_data_manifests, - unit: "manifest".to_string(), - } - } -} - -struct Counter { - value: u32, - unit: String, -} - /// A reporter that logs the metrics to the console. #[derive(Clone, Debug)] pub(crate) struct LoggingMetricsReporter {} @@ -142,9 +111,44 @@ impl LoggingMetricsReporter { #[async_trait] impl MetricsReporter for LoggingMetricsReporter { async fn report(&self, report: MetricsReport) { - match serde_json::to_string(&report) { - Ok(json) => info!(report = json, "Reporting metrics"), - Err(e) => warn!("Failed to serialize metrics report: {}", e), + match report { + MetricsReport::Scan { + table, + snapshot_id, + schema_id, + filter, + projected_field_names, + projected_field_ids, + metrics, + metadata, + } => { + info!( + table = %table, + snapshot_id = snapshot_id, + schema_id = schema_id, + filter = ?filter, + projected_field_names = ?projected_field_names, + projected_field_ids = ?projected_field_ids, + scan_metrics.total_planning_duration = ?metrics.total_planning_duration, + scan_metrics.total_data_manifests = metrics.total_data_manifests, + scan_metrics.total_delete_manifests = metrics.total_delete_manifests, + scan_metrics.scanned_data_manifests = metrics.scanned_data_manifests, + scan_metrics.scanned_delete_manifests = metrics.scanned_delete_manifests, + scan_metrics.skipped_data_manifests = metrics.skipped_data_manifests, + scan_metrics.skipped_delete_manifests = metrics.skipped_delete_manifests, + scan_metrics.result_data_files = metrics.result_data_files, + scan_metrics.result_delete_files = metrics.result_delete_files, + scan_metrics.skipped_data_files = metrics.skipped_data_files, + scan_metrics.skipped_delete_files = metrics.skipped_delete_files, + scan_metrics.total_file_size_in_bytes = metrics.total_file_size_in_bytes, + scan_metrics.total_delete_file_size_in_bytes = metrics.total_delete_file_size_in_bytes, + scan_metrics.indexed_delete_files = metrics.indexed_delete_files, + scan_metrics.equality_delete_files = metrics.equality_delete_files, + scan_metrics.positional_delete_files = metrics.positional_delete_files, + metadata = ?metadata, + "Received metrics report" + ); + } } } } From 9e34b42d778a3352ca584fab69bf654c48f51282 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Mon, 7 Jul 2025 23:31:58 +0200 Subject: [PATCH 22/25] Feature-flag TableBuilder::metrics_reporter for tests only Signed-off-by: Jannik Steinmann --- crates/iceberg/src/table.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index f950d5be1f..4c93252ee0 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -100,6 +100,7 @@ impl TableBuilder { /// sets the implementation used to report metrics about operations on this /// table. + #[cfg(test)] pub(crate) fn metrics_reporter(mut self, metrics_reporter: Arc) -> Self { self.metrics_reporter = Some(metrics_reporter); self From 166cf5d6d44ab753a4a4a12f5720df93353a934d Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Mon, 7 Jul 2025 23:41:05 +0200 Subject: [PATCH 23/25] Join all metrics handles Signed-off-by: Jannik Steinmann --- crates/iceberg/src/scan/metrics.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/scan/metrics.rs b/crates/iceberg/src/scan/metrics.rs index 307f25b3db..ac4b19ffac 100644 --- a/crates/iceberg/src/scan/metrics.rs +++ b/crates/iceberg/src/scan/metrics.rs @@ -17,8 +17,8 @@ use std::time::Instant; -use futures::StreamExt; use futures::channel::mpsc::Receiver; +use futures::{StreamExt, join}; use crate::delete_file_index::DeleteIndexMetrics; use crate::metrics::ScanMetrics; @@ -33,10 +33,11 @@ pub(crate) async fn aggregate_metrics( delete_file_metrics_handle: JoinHandle, index_metrics_handle: JoinHandle, ) -> ScanMetrics { - // TODO: Consider joining them instead. - let data_file_metrics = data_file_metrics_handle.await; - let delete_file_metrics = delete_file_metrics_handle.await; - let index_metrics = index_metrics_handle.await; + let (data_file_metrics, delete_file_metrics, index_metrics) = join!( + data_file_metrics_handle, + delete_file_metrics_handle, + index_metrics_handle + ); // Only now (after consuming all metrics updates) do we know that // all concurrent work is finished and we can stop timing the From 22c46126ac7ce68f0f770f3072123765758f108b Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Tue, 8 Jul 2025 01:11:02 +0200 Subject: [PATCH 24/25] Fix clippy warnings Signed-off-by: Jannik Steinmann --- crates/iceberg/src/metrics.rs | 2 +- crates/iceberg/src/scan/mod.rs | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/metrics.rs b/crates/iceberg/src/metrics.rs index 17e7e9be46..6f37038a58 100644 --- a/crates/iceberg/src/metrics.rs +++ b/crates/iceberg/src/metrics.rs @@ -74,7 +74,7 @@ pub(crate) enum MetricsReport { pub(crate) struct ScanMetrics { pub(crate) total_planning_duration: Duration, - // Manfiest-level metrics, computed by walking the snapshot's manifest list + // Manifest-level metrics, computed by walking the snapshot's manifest list // file entries and checking which manifests match the scan's predicates. pub(crate) total_data_manifests: u32, pub(crate) total_delete_manifests: u32, diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 5268d64d74..9734bdf850 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -374,7 +374,7 @@ impl TableScan { return Ok(Box::pin(futures::stream::empty())); }; - // Initialize scan metrics for reporting. + // Start the planning phase timer. let plan_start_time = Instant::now(); // used to stream ManifestEntryContexts between stages of the file plan operation @@ -431,7 +431,7 @@ impl TableScan { mut error_tx: Sender>, ) -> JoinHandle<()> { let concurrency_limit = self.concurrency_limit_manifest_files; - let handle = spawn(async move { + spawn(async move { let result = futures::stream::iter(manifest_files) .try_for_each_concurrent(concurrency_limit, |ctx| async move { ctx.fetch_manifest_and_stream_manifest_entries().await @@ -441,9 +441,7 @@ impl TableScan { if let Err(error) = result { let _ = error_tx.send(Err(error)).await; } - }); - - handle + }) } fn spawn_process_delete_manifest_entries( @@ -554,7 +552,7 @@ impl TableScan { let projected_field_names = self.column_names.clone(); let metrics_reporter = Arc::clone(&self.metrics_reporter); - let handle = spawn(async move { + spawn(async move { let metrics = aggregate_metrics( plan_start_time, manifest_metrics, @@ -576,9 +574,7 @@ impl TableScan { }; metrics_reporter.report(report).await; - }); - - handle + }) } /// Returns an [`ArrowRecordBatchStream`]. From ed6c13935b8596a0828aaceb3499b456be4b6566 Mon Sep 17 00:00:00 2001 From: Jannik Steinmann Date: Tue, 8 Jul 2025 01:20:24 +0200 Subject: [PATCH 25/25] Remove unclear comment --- crates/iceberg/src/scan/mod.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 9734bdf850..a43936e4e6 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -207,12 +207,7 @@ impl<'a> TableScanBuilder<'a> { pub fn build(self) -> Result { let metrics_reporter = match self.metrics_reporter { Some(metrics_reporter) => metrics_reporter, - None => { - // When a table scan is constructed directly (not by a catalog), - // and the user didn't provide a metrics reporter, then we - // construct a new one. - Arc::new(LoggingMetricsReporter::new()) - } + None => Arc::new(LoggingMetricsReporter::new()), }; let snapshot = match self.snapshot_id {