Skip to content

Metrics reporting #1496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft

Metrics reporting #1496

wants to merge 25 commits into from

Conversation

DerGut
Copy link
Contributor

@DerGut DerGut commented Jul 7, 2025

Which issue does this PR close?

What changes are included in this PR?

As mentioned in the issue description, this PR adds an implementation for the Iceberg Metrics Reporting API.

I'll follow up with a more thorough description of it's changes.

Are these changes tested?

Yes, with the attached unit test and an example main that's more adjacent to an integration test. The example is available on this branch.

DerGut added 23 commits July 7, 2025 23:21
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>

Rename MetricsReporter and MetricsReport

Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>

Remove unnecessary stuff

Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>

Be explicit about spawning
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, the DeleteFileIndex is only used by the scan module. I don't think it will be needed by other modules in the future, so maybe it's a good opportunity to move it there instead?


/// If None, the scan projects all fields.
// TODO: We could default to listing all field names in those cases: check what Java is doing.
projected_field_names: Option<Vec<String>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: The list of field names would be more helpful in reporting than an empty value

metrics,
metadata,
} => {
info!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea to use debug-formatted values here. I was struggling a lot to use the tracing API, and this is the best I could come up with so far.
I didn't really want to serialize the struct into a json, nor did I know how to implement fmt::Display for values such that they make sense across tracing subscribers.
Any suggestions welcome!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could also use some feedback about the degree to which we want to mimic the Java implementation's log records.

@@ -90,6 +90,7 @@ typed-builder = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
zstd = { workspace = true }
tracing = { workspace = true }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, tracing was only used in tests. As far as I could find, the LoggingMetricsReporter is the first use of any logging in the iceberg crate.
I'm not entirely sure whether it's a good idea include it and commit on a specific logging crate. tracing seems reasonably standard and compatible with other crates though. I'd also like to include some default reporter. The Java implementation comes with it's LoggingMetricsReporter.java based on SL4J.

I've also run into some issues using the tracing crate (as outlined in this comment) but they can probably be worked around and shouldn't be a deciding factor.

@@ -186,16 +187,25 @@ impl PlanContext {
tx_data: Sender<ManifestEntryContext>,
delete_file_idx: DeleteFileIndex,
delete_file_tx: Sender<ManifestEntryContext>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
) -> Result<(Vec<Result<ManifestFileContext>>, ManifestMetrics)> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we were returning a vector and this function was only called at a single place, I took the liberty of changing the return value. This somewhat simplified passing the result to a spawned thread because Vec implies Send + Sync when it's Items do.

I've also extended the TODO comment below for future reference because I've added another obstacle to simply using an iterator here: the ManifestMetrics are now continuously mutated in the loop. If we used an iterator instead, we couldn't as easily (I think) pass around the mutable reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I had to thread the metrics building throughout the planning stage, I heavily refactored this file to make room for it.

delete_file_tx,
result_tx.clone(),
);
delete_manifests_handle.await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is worth pointing out: During refactoring, I tried to be explicit about where threads are spawned, and which JoinHandles we await on (vs. ignore). This line corresponds to

in the previous implementation.

I left it here for consistency, but at this point I believe it is unintended. The other spawned threads are simply pushed into the background without ever checking their completion (which maybe isn't ideal either, e.g. in case they panic). This thread however is awaited on, and so IIUC we block until all delete manifest entries are processed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the planning stage is parallelized and multiple threads are responsible for fetching manifest files and evaluating their contents, building of the metrics reports needs to fit into this framework.

The approach that I came up with is that individual threads that process manifest entries are spawned with a neighboring thread responsible for metrics aggregation. The processing thread usually iterates over a stream (e.g. of manifest entries), and sends a metrics update to the neighboring thread. The neighboring thread then accumulates the stream of metrics updates and returns the finished result wrapped by the JoinHandle.

My previous attempt included channels for metrics updates (and no separate threads), but this resulted in coupling between the plan file processing and metrics reporting. I had to somewhere iterate over the metrics receivers to aggregate their updates, and had to do this for multiple processors sequentially. This meant that some processor could be blocked when we were still iterating over another processors metrics updates, and the metrics channel buffer was exhausted.

This approach decouples plan file processing from metrics submission.


/// Carries all metrics for a particular scan.
#[derive(Debug)]
pub(crate) struct ScanMetrics {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the Java implementation uses special types for the metrics (e.g. TimerResult.java and CounterResult.java). They include a value and a unit but I felt like both the ScanMetrics' field names and their types should convey everything we need. The RestMetricReporter will need to emit reports that follow this format but I omitted it from the general purpose ScanMetrics.

Happy for any feedback!

DerGut added 2 commits July 8, 2025 01:11
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Metrics Reporter API
1 participant