Skip to content

Conversation

szehon-ho
Copy link
Member

What changes were proposed in this pull request?

#51377 added a DataSourceV2 API that sends operation metrics along with the commit, via a map of string, long. Change this to a proper model.

Suggestion from @aokolnychyi

Why are the changes needed?

It would be cleaner to model it as a proper object so that it is more clear what metrics Spark sends, and to handle future cases where metrics may not be long values.

Does this PR introduce any user-facing change?

No, unreleased DSV2 API.

How was this patch tested?

Existing tests

Was this patch authored or co-authored using generative AI tooling?

No

* @since 4.1.0
*/
@Evolving
public interface OperationMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This naming seems reasonable to me but if folks have better ideas, it would be great to hear them.

cc @cloud-fan @viirya @gengliangwang @dongjoon-hyun @huaxingao

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the AS-IS name, OperationMetrics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is specific to write, maybe WriteMetrics? OperationMetrics also looks okay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the word operation is reasonable. An alternative would be to call it OperationSummary to distinguish from regular metrics as we may pass some String values in the future too, not just counts. That said, I am not sure calling it XXXSummary would make it any better.

What do you think, @szehon-ho @viirya @dongjoon-hyun?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or OperationMetadata?

Copy link
Member

@gengliangwang gengliangwang Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also had the same thought — why not just use WriteMetrics/WriterMetrics when reviewing this PR.
It seems more straightforward and specific.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member Author

@szehon-ho szehon-ho Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed it to WriterMetrics. There is already a writeMetric in the class V2ExistingTableWriteExec

/**
* Returns the number of target rows copied unmodified because they did not match any action.
*/
long numTargetRowsCopied();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we will use -1 if unknown. Shall we document this?

Copy link
Member Author

@szehon-ho szehon-ho Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though these always have at least 0 if we find MergeRowsExec,ie https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala#L49 . These initialize it with 0.

It is true that I set -1 in V2TableWriteExec::getOperationMetrics but I thought it will not hit normally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Do we have tests ensuring 1/1 match?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have tests that assert all metrics are set to 0 if not used, if that is the question?

collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
val metrics = n.metrics
MergeOperationMetricsImpl(
metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add constants for these in a separate PR? It seems fragile.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sure

* @since 4.1.0
*/
@Evolving
public interface OperationMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking how to simplify consumption of these objects in connectors. The question is whether this interface should have some sort of operation method that would tell the type of metrics. That said, it is probably not the end of the world if connectors do a class check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts anyone?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be a bit duplicated if we have both class and an enum?

@aokolnychyi
Copy link
Contributor

I feel like using a proper object is the right call here compared to the map. Left some questions.

Would love to hear what others think too.

* @since 4.1.0
*/
@Evolving
public interface MergeOperationMetrics extends OperationMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe MergeMetrics? Not a strong option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds ok to me too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I generally prefer shorter names if they are descriptive. I would probably lean towards MergeMetrics here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for MergeMetrics

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

/**
* Implementation of {@link MergeOperationMetrics} that provides merge operation metrics.
*/
private[sql] case class MergeOperationMetricsImpl(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder why don't simply have MergeOperationMetrics as an implementation class but separated interface + impl here? Do we suppose to have different MergeOperationMetrics impl?

Copy link
Member Author

@szehon-ho szehon-ho Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chat with @aokolnychyi yesterday off line, he suggest this way to hide the constructor from the API, like LogicalWriteInfoImpl. Else we have to make a public constructor in the public MergeOperationMetrics java API so Spark can construct it, which we may need to make a builder to support adding new metric, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, makes sense.

@aokolnychyi
Copy link
Contributor

@cloud-fan @gengliangwang, I would want you folks to take a look as well, if possible.

metrics.asScala.map {
case (key, value) => commitProperties += key -> String.valueOf(value)
override def commit(messages: Array[WriterCommitMessage], metrics: OperationMetrics): Unit = {
metrics match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we have a test case to verify the new metrics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh these are then returned and checked via existing test cases. https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala#L1816

do you think something else?

/**
* Returns the number of target rows deleted.
*/
long numTargetRowsDeleted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all the metrics starts with numTargetRows, shall we just call it numRows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants