-
Notifications
You must be signed in to change notification settings - Fork 281
Metrics reporting #1496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Metrics reporting #1496
Changes from all commits
eb8109c
20a0e80
e1dc699
16af416
8832027
05dc825
ce52bf6
3bec473
4fcfbed
7242774
1b8e8c6
ddc9627
f1e598d
08f72bd
683ad4f
876c708
c74e855
90ce07c
630cc53
ed4987d
467c565
9e34b42
166cf5d
22c4612
ed6c139
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! This module contains the metrics reporting API for Iceberg. | ||
//! | ||
//! It is used to report table operations in a pluggable way. See the [docs] | ||
//! for more details. | ||
//! | ||
//! [docs] https://iceberg.apache.org/docs/latest/metrics-reporting | ||
|
||
use std::collections::HashMap; | ||
use std::fmt::Debug; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
|
||
use async_trait::async_trait; | ||
use tracing::info; | ||
|
||
use crate::TableIdent; | ||
use crate::expr::Predicate; | ||
use crate::spec::SchemaId; | ||
|
||
/// This trait defines the API for reporting metrics of table operations. | ||
/// | ||
/// Refer to the [Iceberg docs] for details. | ||
/// | ||
/// [Iceberg docs]: https://iceberg.apache.org/docs/latest/metrics-reporting/ | ||
#[async_trait] | ||
pub(crate) trait MetricsReporter: Debug + Send + Sync { | ||
/// Indicates that an operation is done by reporting a MetricsReport. | ||
/// | ||
/// Any errors are expected to be handled internally. | ||
async fn report(&self, report: MetricsReport); | ||
} | ||
|
||
/// An enum of all metrics reports. | ||
#[derive(Debug)] | ||
pub(crate) enum MetricsReport { | ||
/// A Table Scan report that contains all relevant information from a Table Scan. | ||
Scan { | ||
table: TableIdent, | ||
snapshot_id: i64, | ||
schema_id: SchemaId, | ||
|
||
/// If None, the scan is an unfiltered full table scan. | ||
filter: Option<Arc<Predicate>>, | ||
|
||
/// If None, the scan projects all fields. | ||
// TODO: We could default to listing all field names in those cases: check what Java is doing. | ||
projected_field_names: Option<Vec<String>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: The list of field names would be more helpful in reporting than an empty value |
||
projected_field_ids: Arc<Vec<i32>>, | ||
|
||
metrics: Arc<ScanMetrics>, | ||
metadata: HashMap<String, String>, | ||
}, | ||
} | ||
|
||
/// Carries all metrics for a particular scan. | ||
#[derive(Debug)] | ||
pub(crate) struct ScanMetrics { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that the Java implementation uses special types for the metrics (e.g. Happy for any feedback! |
||
pub(crate) total_planning_duration: Duration, | ||
|
||
// Manifest-level metrics, computed by walking the snapshot's manifest list | ||
// file entries and checking which manifests match the scan's predicates. | ||
pub(crate) total_data_manifests: u32, | ||
pub(crate) total_delete_manifests: u32, | ||
pub(crate) skipped_data_manifests: u32, | ||
pub(crate) skipped_delete_manifests: u32, | ||
pub(crate) scanned_data_manifests: u32, | ||
pub(crate) scanned_delete_manifests: u32, | ||
|
||
// Data file-level metrics. | ||
pub(crate) result_data_files: u32, | ||
pub(crate) skipped_data_files: u32, | ||
pub(crate) total_file_size_in_bytes: u64, | ||
|
||
// Delete file-level metrics. | ||
pub(crate) result_delete_files: u32, | ||
pub(crate) skipped_delete_files: u32, | ||
pub(crate) total_delete_file_size_in_bytes: u64, | ||
|
||
pub(crate) indexed_delete_files: u32, | ||
pub(crate) equality_delete_files: u32, | ||
pub(crate) positional_delete_files: u32, | ||
} | ||
|
||
/// A reporter that logs the metrics to the console. | ||
#[derive(Clone, Debug)] | ||
pub(crate) struct LoggingMetricsReporter {} | ||
|
||
impl LoggingMetricsReporter { | ||
pub(crate) fn new() -> Self { | ||
Self {} | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl MetricsReporter for LoggingMetricsReporter { | ||
async fn report(&self, report: MetricsReport) { | ||
match report { | ||
MetricsReport::Scan { | ||
table, | ||
snapshot_id, | ||
schema_id, | ||
filter, | ||
projected_field_names, | ||
projected_field_ids, | ||
metrics, | ||
metadata, | ||
} => { | ||
info!( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it's a good idea to use debug-formatted values here. I was struggling a lot to use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could also use some feedback about the degree to which we want to mimic the Java implementation's log records. |
||
table = %table, | ||
snapshot_id = snapshot_id, | ||
schema_id = schema_id, | ||
filter = ?filter, | ||
projected_field_names = ?projected_field_names, | ||
projected_field_ids = ?projected_field_ids, | ||
scan_metrics.total_planning_duration = ?metrics.total_planning_duration, | ||
scan_metrics.total_data_manifests = metrics.total_data_manifests, | ||
scan_metrics.total_delete_manifests = metrics.total_delete_manifests, | ||
scan_metrics.scanned_data_manifests = metrics.scanned_data_manifests, | ||
scan_metrics.scanned_delete_manifests = metrics.scanned_delete_manifests, | ||
scan_metrics.skipped_data_manifests = metrics.skipped_data_manifests, | ||
scan_metrics.skipped_delete_manifests = metrics.skipped_delete_manifests, | ||
scan_metrics.result_data_files = metrics.result_data_files, | ||
scan_metrics.result_delete_files = metrics.result_delete_files, | ||
scan_metrics.skipped_data_files = metrics.skipped_data_files, | ||
scan_metrics.skipped_delete_files = metrics.skipped_delete_files, | ||
scan_metrics.total_file_size_in_bytes = metrics.total_file_size_in_bytes, | ||
scan_metrics.total_delete_file_size_in_bytes = metrics.total_delete_file_size_in_bytes, | ||
scan_metrics.indexed_delete_files = metrics.indexed_delete_files, | ||
scan_metrics.equality_delete_files = metrics.equality_delete_files, | ||
scan_metrics.positional_delete_files = metrics.positional_delete_files, | ||
metadata = ?metadata, | ||
"Received metrics report" | ||
); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ use futures::{SinkExt, TryFutureExt}; | |
use crate::delete_file_index::DeleteFileIndex; | ||
use crate::expr::{Bind, BoundPredicate, Predicate}; | ||
use crate::io::object_cache::ObjectCache; | ||
use crate::scan::metrics::ManifestMetrics; | ||
use crate::scan::{ | ||
BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache, | ||
PartitionFilterCache, | ||
|
@@ -186,16 +187,25 @@ impl PlanContext { | |
tx_data: Sender<ManifestEntryContext>, | ||
delete_file_idx: DeleteFileIndex, | ||
delete_file_tx: Sender<ManifestEntryContext>, | ||
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> { | ||
) -> Result<(Vec<Result<ManifestFileContext>>, ManifestMetrics)> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we were returning a vector and this function was only called at a single place, I took the liberty of changing the return value. This somewhat simplified passing the result to a spawned thread because I've also extended the TODO comment below for future reference because I've added another obstacle to simply using an iterator here: the |
||
let manifest_files = manifest_list.entries().iter(); | ||
|
||
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator. | ||
// TODO: Ideally we could ditch this intermediate Vec as we can return | ||
// an iterator over the results. Updates to the manifest metrics somewhat | ||
// complicate this because they need to be serialized somewhere, and an | ||
// iterator can't easily take ownership of the metrics. | ||
// A vec allows us to apply the mutations within this function. | ||
// A vec also implicitly implements Send and Sync, meaning we can pass | ||
// it around more easily in the concurrent planning step. | ||
let mut filtered_mfcs = vec![]; | ||
|
||
let mut metrics = ManifestMetrics::default(); | ||
for manifest_file in manifest_files { | ||
let tx = if manifest_file.content == ManifestContentType::Deletes { | ||
metrics.total_delete_manifests += 1; | ||
delete_file_tx.clone() | ||
} else { | ||
metrics.total_data_manifests += 1; | ||
tx_data.clone() | ||
}; | ||
|
||
|
@@ -212,6 +222,10 @@ impl PlanContext { | |
) | ||
.eval(manifest_file)? | ||
{ | ||
match manifest_file.content { | ||
ManifestContentType::Data => metrics.skipped_data_manifests += 1, | ||
ManifestContentType::Deletes => metrics.skipped_delete_manifests += 1, | ||
} | ||
continue; | ||
} | ||
|
||
|
@@ -230,7 +244,14 @@ impl PlanContext { | |
filtered_mfcs.push(Ok(mfc)); | ||
} | ||
|
||
Ok(Box::new(filtered_mfcs.into_iter())) | ||
// They're not yet scanned, but will be scanned concurrently in the | ||
// next processing step. | ||
metrics.scanned_data_manifests = | ||
metrics.total_data_manifests - metrics.skipped_data_manifests; | ||
metrics.scanned_delete_manifests = | ||
metrics.total_delete_manifests - metrics.skipped_delete_manifests; | ||
|
||
Ok((filtered_mfcs, metrics)) | ||
} | ||
|
||
fn create_manifest_file_context( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far,
tracing
was only used in tests. As far as I could find, theLoggingMetricsReporter
is the first use of any logging in theiceberg
crate.I'm not entirely sure whether it's a good idea include it and commit on a specific logging crate.
tracing
seems reasonably standard and compatible with other crates though. I'd also like to include some default reporter. The Java implementation comes with it'sLoggingMetricsReporter.java
based on SL4J.I've also run into some issues using the
tracing
crate (as outlined in this comment) but they can probably be worked around and shouldn't be a deciding factor.