-
Notifications
You must be signed in to change notification settings - Fork 281
Metrics reporting #1496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Metrics reporting #1496
Conversation
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com> Rename MetricsReporter and MetricsReport Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com> Remove unnecessary stuff Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com> Be explicit about spawning
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, the DeleteFileIndex
is only used by the scan
module. I don't think it will be needed by other modules in the future, so maybe it's a good opportunity to move it there instead?
|
||
/// If None, the scan projects all fields. | ||
// TODO: We could default to listing all field names in those cases: check what Java is doing. | ||
projected_field_names: Option<Vec<String>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: The list of field names would be more helpful in reporting than an empty value
metrics, | ||
metadata, | ||
} => { | ||
info!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a good idea to use debug-formatted values here. I was struggling a lot to use the tracing
API, and this is the best I could come up with so far.
I didn't really want to serialize the struct into a json, nor did I know how to implement fmt::Display
for values such that they make sense across tracing subscribers.
Any suggestions welcome!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could also use some feedback about the degree to which we want to mimic the Java implementation's log records.
@@ -90,6 +90,7 @@ typed-builder = { workspace = true } | |||
url = { workspace = true } | |||
uuid = { workspace = true } | |||
zstd = { workspace = true } | |||
tracing = { workspace = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far, tracing
was only used in tests. As far as I could find, the LoggingMetricsReporter
is the first use of any logging in the iceberg
crate.
I'm not entirely sure whether it's a good idea include it and commit on a specific logging crate. tracing
seems reasonably standard and compatible with other crates though. I'd also like to include some default reporter. The Java implementation comes with it's LoggingMetricsReporter.java
based on SL4J.
I've also run into some issues using the tracing
crate (as outlined in this comment) but they can probably be worked around and shouldn't be a deciding factor.
@@ -186,16 +187,25 @@ impl PlanContext { | |||
tx_data: Sender<ManifestEntryContext>, | |||
delete_file_idx: DeleteFileIndex, | |||
delete_file_tx: Sender<ManifestEntryContext>, | |||
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> { | |||
) -> Result<(Vec<Result<ManifestFileContext>>, ManifestMetrics)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we were returning a vector and this function was only called at a single place, I took the liberty of changing the return value. This somewhat simplified passing the result to a spawned thread because Vec
implies Send + Sync
when it's Items do.
I've also extended the TODO comment below for future reference because I've added another obstacle to simply using an iterator here: the ManifestMetrics
are now continuously mutated in the loop. If we used an iterator instead, we couldn't as easily (I think) pass around the mutable reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I had to thread the metrics building throughout the planning stage, I heavily refactored this file to make room for it.
delete_file_tx, | ||
result_tx.clone(), | ||
); | ||
delete_manifests_handle.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is worth pointing out: During refactoring, I tried to be explicit about where threads are spawned, and which JoinHandles we await on (vs. ignore). This line corresponds to
iceberg-rust/crates/iceberg/src/scan/mod.rs
Line 404 in 96ec4d5
.await; |
I left it here for consistency, but at this point I believe it is unintended. The other spawned threads are simply pushed into the background without ever checking their completion (which maybe isn't ideal either, e.g. in case they panic). This thread however is awaited on, and so IIUC we block until all delete manifest entries are processed.
crates/iceberg/src/scan/metrics.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the planning stage is parallelized and multiple threads are responsible for fetching manifest files and evaluating their contents, building of the metrics reports needs to fit into this framework.
The approach that I came up with is that individual threads that process manifest entries are spawned with a neighboring thread responsible for metrics aggregation. The processing thread usually iterates over a stream (e.g. of manifest entries), and sends a metrics update to the neighboring thread. The neighboring thread then accumulates the stream of metrics updates and returns the finished result wrapped by the JoinHandle.
My previous attempt included channels for metrics updates (and no separate threads), but this resulted in coupling between the plan file processing and metrics reporting. I had to somewhere iterate over the metrics receivers to aggregate their updates, and had to do this for multiple processors sequentially. This meant that some processor could be blocked when we were still iterating over another processors metrics updates, and the metrics channel buffer was exhausted.
This approach decouples plan file processing from metrics submission.
|
||
/// Carries all metrics for a particular scan. | ||
#[derive(Debug)] | ||
pub(crate) struct ScanMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the Java implementation uses special types for the metrics (e.g. TimerResult.java
and CounterResult.java
). They include a value and a unit but I felt like both the ScanMetrics' field names and their types should convey everything we need. The RestMetricReporter will need to emit reports that follow this format but I omitted it from the general purpose ScanMetrics.
Happy for any feedback!
Signed-off-by: Jannik Steinmann <jannik.steinmann@datadoghq.com>
Which issue does this PR close?
What changes are included in this PR?
As mentioned in the issue description, this PR adds an implementation for the Iceberg Metrics Reporting API.
I'll follow up with a more thorough description of it's changes.
Are these changes tested?
Yes, with the attached unit test and an example main that's more adjacent to an integration test. The example is available on this branch.