-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53891][SQL] Model DSV2 Commit Operation Metrics API #52595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
b249670
to
09c7d44
Compare
09c7d44
to
4fe3f19
Compare
3769ff8
to
84b2eb6
Compare
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeOperationMetrics.java
Outdated
Show resolved
Hide resolved
* @since 4.1.0 | ||
*/ | ||
@Evolving | ||
public interface OperationMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This naming seems reasonable to me but if folks have better ideas, it would be great to hear them.
cc @cloud-fan @viirya @gengliangwang @dongjoon-hyun @huaxingao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the AS-IS name, OperationMetrics
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is specific to write, maybe WriteMetrics
? OperationMetrics
also looks okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the word operation is reasonable. An alternative would be to call it OperationSummary to distinguish from regular metrics as we may pass some String values in the future too, not just counts. That said, I am not sure calling it XXXSummary would make it any better.
What do you think, @szehon-ho @viirya @dongjoon-hyun?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or OperationMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also had the same thought — why not just use WriteMetrics/WriterMetrics when reviewing this PR.
It seems more straightforward and specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed it to WriterMetrics. There is already a writeMetric in the class V2ExistingTableWriteExec
/** | ||
* Returns the number of target rows copied unmodified because they did not match any action. | ||
*/ | ||
long numTargetRowsCopied(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we will use -1 if unknown. Shall we document this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I though these always have at least 0 if we find MergeRowsExec,ie https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala#L49 . These initialize it with 0.
It is true that I set -1 in V2TableWriteExec::getOperationMetrics but I thought it will not hit normally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Do we have tests ensuring 1/1 match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have tests that assert all metrics are set to 0 if not used, if that is the question?
collectFirst(query) { case m: MergeRowsExec => m }.map { n => | ||
val metrics = n.metrics | ||
MergeOperationMetricsImpl( | ||
metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add constants for these in a separate PR? It seems fragile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok sure
* @since 4.1.0 | ||
*/ | ||
@Evolving | ||
public interface OperationMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking how to simplify consumption of these objects in connectors. The question is whether this interface should have some sort of operation
method that would tell the type of metrics. That said, it is probably not the end of the world if connectors do a class check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts anyone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be a bit duplicated if we have both class and an enum?
I feel like using a proper object is the right call here compared to the map. Left some questions. Would love to hear what others think too. |
* @since 4.1.0 | ||
*/ | ||
@Evolving | ||
public interface MergeOperationMetrics extends OperationMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe MergeMetrics
? Not a strong option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds ok to me too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I generally prefer shorter names if they are descriptive. I would probably lean towards MergeMetrics here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for MergeMetrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
/** | ||
* Implementation of {@link MergeOperationMetrics} that provides merge operation metrics. | ||
*/ | ||
private[sql] case class MergeOperationMetricsImpl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder why don't simply have MergeOperationMetrics
as an implementation class but separated interface + impl here? Do we suppose to have different MergeOperationMetrics impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chat with @aokolnychyi yesterday off line, he suggest this way to hide the constructor from the API, like LogicalWriteInfoImpl. Else we have to make a public constructor in the public MergeOperationMetrics java API so Spark can construct it, which we may need to make a builder to support adding new metric, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, makes sense.
@cloud-fan @gengliangwang, I would want you folks to take a look as well, if possible. |
metrics.asScala.map { | ||
case (key, value) => commitProperties += key -> String.valueOf(value) | ||
override def commit(messages: Array[WriterCommitMessage], metrics: OperationMetrics): Unit = { | ||
metrics match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we have a test case to verify the new metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh these are then returned and checked via existing test cases. https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala#L1816
do you think something else?
/** | ||
* Returns the number of target rows deleted. | ||
*/ | ||
long numTargetRowsDeleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If all the metrics starts with numTargetRows
, shall we just call it numRows
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh we plan to add 'numSourceRows' in a next pr. btw, these match the current Delta metric names: https://github.com/delta-io/delta/blob/c0943e863aacac1365bd6beaa9f23d6bc9a4f316/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeStats.scala#L208
What changes were proposed in this pull request?
#51377 added a DataSourceV2 API that sends operation metrics along with the commit, via a map of string, long. Change this to a proper model.
Suggestion from @aokolnychyi
Why are the changes needed?
It would be cleaner to model it as a proper object so that it is more clear what metrics Spark sends, and to handle future cases where metrics may not be long values.
Does this PR introduce any user-facing change?
No, unreleased DSV2 API.
How was this patch tested?
Existing tests
Was this patch authored or co-authored using generative AI tooling?
No