-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53732][SQL] Remember TimeTravelSpec in DataSourceV2Relation #52599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
try f finally { set(originContext) } | ||
} | ||
|
||
private[sql] def withAnalysisContext[A](context: AnalysisContext)(f: => A): A = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed for testing below.
|
||
case table => | ||
if (isStreaming) { | ||
assert(timeTravelSpec.isEmpty, "time travel is not allowed in streaming") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be impossible to reach this line with a valid time travel spec. Just a sanity check.
case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec | ||
case class AsOfVersion(version: String) extends TimeTravelSpec | ||
case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec { | ||
override def toString: String = s"TIMESTAMP AS OF $timestamp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed for proper simpleString
implementation in DataSourceV2Relation
. See tests below.
|
||
override def simpleString(maxFields: Int): String = { | ||
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" | ||
val outputString = truncatedString(output, "[", ", ", "]", maxFields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Covered with tests.
} | ||
} | ||
|
||
def pinTable(ident: Identifier, version: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used in time travel tests.
cascade: Boolean, | ||
blocking: Boolean): Unit = { | ||
uncacheByCondition(spark, _.sameResult(plan), cascade, blocking) | ||
EliminateSubqueryAliases(plan) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this branch to avoid changes in the behavior. Some connectors (like Iceberg) use these methods for their custom commands. I wanted to be on the safer side and keep the old behavior for these calls. That's it, if any of these methods are called with DataSourceV2Relation
without time travel spec, we will invalidate all cache entries (including time travel) like before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so when r.timeTravelSpec.isEmpty
, plan1.sameResult(plan2)
won't work with dsv2? Could you add comment in the code?
/** | ||
* Re-caches all cache entries that reference the given table name. | ||
*/ | ||
def recacheByTableName( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need this in subsequent PRs.
Thank you for pinging me, @aokolnychyi . |
identifier: Option[Identifier], | ||
options: CaseInsensitiveStringMap) | ||
options: CaseInsensitiveStringMap, | ||
timeTravelSpec: Option[TimeTravelSpec] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's nice to have this field so that Spark is aware of the version of the table explicitly. But I don't quite understand why it's necessary, as implementation can remember the time travel spec in the v2 Table
returned by loadTable
with time travel spec. The v2 Table#currentVersion
can be used to get the table version explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the use cases that both Iceberg and Delta struggle today is checking that a query uses consistent versions of the table throughout the plan. Having currentVersion
is one step but we need to distinguish time travel as it is OK to have different versions in that case. I want Spark to handle these checks and also reload tables to consistent versions whenever that's needed (will be done in subsequent PRs). Today both Iceberg and Delta try to implement this check/reload on their side but it is really tricky in connectors. There are still unhandled edge cases.
Another use case that is even bigger is tracking read sets in DELETE, UPDATE, and MERGE. I have a proposal/PR about a transactional catalog that allows one to capture all operations that happened during an operation for snapshot and serializable isolation. It is also important to track and distinguish time travel there.
Does this make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The third use case would be views that capture logical plans. They currently rely on tricks in connectors with refresh. I want to simplify/fix that by moving the refresh to Spark so that DSv2 connectors can pin versions correctly.
24133d5
to
28d1e4a
Compare
case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec { | ||
override def toString: String = s"TIMESTAMP AS OF $timestamp" | ||
} | ||
case class AsOfVersion(version: String) extends TimeTravelSpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a blank line between two classes
cachedData.isEmpty | ||
} | ||
|
||
private[sql] def numCachedEntries: Int = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add comment "// Test-only"
def isEmpty: Boolean = { | ||
cachedData.isEmpty | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file is quite confusing. Before this PR, DataSourceV2Relation
does not contain the time travel spec, but the Table
instance should contain it. This means, if a table scan is cached with version 1, and then we uncache the same table scan but with version 2, we won't uncache the version 1 scan.
Now we put time travel spec in DataSourceV2Relation
, which makes this behavior more reliable in case the Table
instance does not contain the version.
I don't quite understand what we try to do here. If we want to bring the old behavior, we can simplily clear out the time travel spec in DataSourceV2Relation#canonicalized
What changes were proposed in this pull request?
This PR adds
TimeTravelSpec
toDataSourceV2Relation
when the relation is created by time traveling.Why are the changes needed?
These changes are needed for subsequent PRs where I will modify Spark to reload certain tables to ensure consistent version scanning and DELETE, UPDATE, and MERGE isolation. Without this change, Spark looses track of whether the relation points to the current version of the table or time travels. As an engine, Spark must be aware whether a relation is the result of time traveling or points to the current table version/snapshot.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This PR comes with tests.
Was this patch authored or co-authored using generative AI tooling?
No.