Skip to content

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Oct 14, 2025

What changes were proposed in this pull request?

This PR adds TimeTravelSpec to DataSourceV2Relation when the relation is created by time traveling.

Why are the changes needed?

These changes are needed for subsequent PRs where I will modify Spark to reload certain tables to ensure consistent version scanning and DELETE, UPDATE, and MERGE isolation. Without this change, Spark looses track of whether the relation points to the current version of the table or time travels. As an engine, Spark must be aware whether a relation is the result of time traveling or points to the current table version/snapshot.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

This PR comes with tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Oct 14, 2025
try f finally { set(originContext) }
}

private[sql] def withAnalysisContext[A](context: AnalysisContext)(f: => A): A = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed for testing below.


case table =>
if (isStreaming) {
assert(timeTravelSpec.isEmpty, "time travel is not allowed in streaming")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be impossible to reach this line with a valid time travel spec. Just a sanity check.

case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec
case class AsOfVersion(version: String) extends TimeTravelSpec
case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec {
override def toString: String = s"TIMESTAMP AS OF $timestamp"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed for proper simpleString implementation in DataSourceV2Relation. See tests below.


override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
val outputString = truncatedString(output, "[", ", ", "]", maxFields)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered with tests.

}
}

def pinTable(ident: Identifier, version: String): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used in time travel tests.

cascade: Boolean,
blocking: Boolean): Unit = {
uncacheByCondition(spark, _.sameResult(plan), cascade, blocking)
EliminateSubqueryAliases(plan) match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this branch to avoid changes in the behavior. Some connectors (like Iceberg) use these methods for their custom commands. I wanted to be on the safer side and keep the old behavior for these calls. That's it, if any of these methods are called with DataSourceV2Relation without time travel spec, we will invalidate all cache entries (including time travel) like before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so when r.timeTravelSpec.isEmpty, plan1.sameResult(plan2) won't work with dsv2? Could you add comment in the code?

/**
* Re-caches all cache entries that reference the given table name.
*/
def recacheByTableName(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will need this in subsequent PRs.

@aokolnychyi
Copy link
Contributor Author

cc @dongjoon-hyun @cloud-fan @gengliangwang @huaxingao @szehon-ho @viirya

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @aokolnychyi .

identifier: Option[Identifier],
options: CaseInsensitiveStringMap)
options: CaseInsensitiveStringMap,
timeTravelSpec: Option[TimeTravelSpec] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's nice to have this field so that Spark is aware of the version of the table explicitly. But I don't quite understand why it's necessary, as implementation can remember the time travel spec in the v2 Table returned by loadTable with time travel spec. The v2 Table#currentVersion can be used to get the table version explicitly.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the use cases that both Iceberg and Delta struggle today is checking that a query uses consistent versions of the table throughout the plan. Having currentVersion is one step but we need to distinguish time travel as it is OK to have different versions in that case. I want Spark to handle these checks and also reload tables to consistent versions whenever that's needed (will be done in subsequent PRs). Today both Iceberg and Delta try to implement this check/reload on their side but it is really tricky in connectors. There are still unhandled edge cases.

Another use case that is even bigger is tracking read sets in DELETE, UPDATE, and MERGE. I have a proposal/PR about a transactional catalog that allows one to capture all operations that happened during an operation for snapshot and serializable isolation. It is also important to track and distinguish time travel there.

Does this make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The third use case would be views that capture logical plans. They currently rely on tricks in connectors with refresh. I want to simplify/fix that by moving the refresh to Spark so that DSv2 connectors can pin versions correctly.

case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec {
override def toString: String = s"TIMESTAMP AS OF $timestamp"
}
case class AsOfVersion(version: String) extends TimeTravelSpec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a blank line between two classes

cachedData.isEmpty
}

private[sql] def numCachedEntries: Int = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add comment "// Test-only"

def isEmpty: Boolean = {
cachedData.isEmpty
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file is quite confusing. Before this PR, DataSourceV2Relation does not contain the time travel spec, but the Table instance should contain it. This means, if a table scan is cached with version 1, and then we uncache the same table scan but with version 2, we won't uncache the version 1 scan.

Now we put time travel spec in DataSourceV2Relation, which makes this behavior more reliable in case the Table instance does not contain the version.

I don't quite understand what we try to do here. If we want to bring the old behavior, we can simplily clear out the time travel spec in DataSourceV2Relation#canonicalized

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants