-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52689][SQL] Send DML Metrics to V2Write #51377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
021dfb4
to
78235a6
Compare
3fc94aa
to
de9d47d
Compare
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
Outdated
Show resolved
Hide resolved
public interface MergeMetrics { | ||
|
||
class Builder { | ||
private long numTargetRowsCopied = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally don't mind -1 but I think we have a few places in DSv2 that use OptionalLong
.
Will it make sense to be consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok changed to OptionalLong in the API
sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java
Outdated
Show resolved
Hide resolved
/** | ||
* Whether this batch write requests merge execution metrics. | ||
*/ | ||
default boolean requestMergeMetrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a performance hit for requesting metrics? If not, I'd drop this method and always call commitMerge
. The fewer public methods we have the better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The perf hit is a execution graph walk. Anyway, i removed the check, and walk in all the cases.
@@ -275,7 +277,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat | |||
} | |||
|
|||
case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) => | |||
AppendDataExec(planLater(query), refreshCache(r), write) :: Nil | |||
AppendDataExec(planLater(query), refreshCache(r), write, getCommand(r)) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for cases when MERGE is rewritten as INSERT? I thought we would skip populating metrics for appends, but let me think about it. What does Delta do when MERGE becomes INSERT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I dont handle that yet.
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Show resolved
Hide resolved
de9d47d
to
781f32d
Compare
What changes were proposed in this pull request?
Send some DML execution metrics (ie, MergeRowsExec) to the write of these data source, so they can persist them for debugging purpose.
Why are the changes needed?
DML row-level-operations, ie MERGE, UPDATE, DELETE are a critical functionality of V2 data sources (like Iceberg). It will be nice, if we can send some DML metrics to the commit of these data source, so they can persist them for debugging purpose on commit metadata.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test
Was this patch authored or co-authored using generative AI tooling?
No