Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ object AnalysisContext {
set(context)
try f finally { set(originContext) }
}

private[sql] def withAnalysisContext[A](context: AnalysisContext)(f: => A): A = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed for testing below.

val originContext = value.get()
set(context)
try f finally { set(originContext) }
}
}

object Analyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class RelationResolution(override val catalogManager: CatalogManager)
ident,
table,
u.clearWritePrivileges.options,
u.isStreaming
u.isStreaming,
finalTimeTravelSpec
)
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
u.getTagValue(LogicalPlan.PLAN_ID_TAG)
Expand All @@ -162,7 +163,8 @@ class RelationResolution(override val catalogManager: CatalogManager)
ident: Identifier,
table: Option[Table],
options: CaseInsensitiveStringMap,
isStreaming: Boolean): Option[LogicalPlan] = {
isStreaming: Boolean,
timeTravelSpec: Option[TimeTravelSpec]): Option[LogicalPlan] = {
table.map {
// To utilize this code path to execute V1 commands, e.g. INSERT,
// either it must be session catalog, or tracksPartitionsInCatalog
Expand All @@ -189,6 +191,7 @@ class RelationResolution(override val catalogManager: CatalogManager)

case table =>
if (isStreaming) {
assert(timeTravelSpec.isEmpty, "time travel is not allowed in streaming")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be impossible to reach this line with a valid time travel spec. Just a sanity check.

val v1Fallback = table match {
case withFallback: V2TableWithV1Fallback =>
Some(UnresolvedCatalogRelation(withFallback.v1Table, isStreaming = true))
Expand All @@ -210,7 +213,7 @@ class RelationResolution(override val catalogManager: CatalogManager)
} else {
SubqueryAlias(
catalog.name +: ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options)
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options, timeTravelSpec)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap

sealed trait TimeTravelSpec

case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec
case class AsOfVersion(version: String) extends TimeTravelSpec
case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec {
override def toString: String = s"TIMESTAMP AS OF $timestamp"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed for proper simpleString implementation in DataSourceV2Relation. See tests below.

}
case class AsOfVersion(version: String) extends TimeTravelSpec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a blank line between two classes

override def toString: String = s"VERSION AS OF '$version'"
}

object TimeTravelSpec {
def create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation, TimeTravelSpec}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
Expand All @@ -45,7 +45,8 @@ abstract class DataSourceV2RelationBase(
output: Seq[AttributeReference],
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap)
options: CaseInsensitiveStringMap,
timeTravelSpec: Option[TimeTravelSpec] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's nice to have this field so that Spark is aware of the version of the table explicitly. But I don't quite understand why it's necessary, as implementation can remember the time travel spec in the v2 Table returned by loadTable with time travel spec. The v2 Table#currentVersion can be used to get the table version explicitly.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the use cases that both Iceberg and Delta struggle today is checking that a query uses consistent versions of the table throughout the plan. Having currentVersion is one step but we need to distinguish time travel as it is OK to have different versions in that case. I want Spark to handle these checks and also reload tables to consistent versions whenever that's needed (will be done in subsequent PRs). Today both Iceberg and Delta try to implement this check/reload on their side but it is really tricky in connectors. There are still unhandled edge cases.

Another use case that is even bigger is tracking read sets in DELETE, UPDATE, and MERGE. I have a proposal/PR about a transactional catalog that allows one to capture all operations that happened during an operation for snapshot and serializable isolation. It is also important to track and distinguish time travel there.

Does this make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The third use case would be views that capture logical plans. They currently rely on tricks in connectors with refresh. I want to simplify/fix that by moving the refresh to Spark so that DSv2 connectors can pin versions correctly.

extends LeafNode with MultiInstanceRelation with NamedRelation {

import DataSourceV2Implicits._
Expand All @@ -65,7 +66,12 @@ abstract class DataSourceV2RelationBase(
override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)

override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
val outputString = truncatedString(output, "[", ", ", "]", maxFields)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered with tests.

val nameWithTimeTravelSpec = timeTravelSpec match {
case Some(spec) => s"$name $spec"
case _ => name
}
s"RelationV2$outputString $nameWithTimeTravelSpec"
}

override def computeStats(): Statistics = {
Expand Down Expand Up @@ -96,8 +102,9 @@ case class DataSourceV2Relation(
override val output: Seq[AttributeReference],
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap)
extends DataSourceV2RelationBase(table, output, catalog, identifier, options)
options: CaseInsensitiveStringMap,
timeTravelSpec: Option[TimeTravelSpec] = None)
extends DataSourceV2RelationBase(table, output, catalog, identifier, options, timeTravelSpec)
with ExposesMetadataColumns {

import DataSourceV2Implicits._
Expand All @@ -117,7 +124,7 @@ case class DataSourceV2Relation(
def withMetadataColumns(): DataSourceV2Relation = {
val newMetadata = metadataOutput.filterNot(outputSet.contains)
if (newMetadata.nonEmpty) {
DataSourceV2Relation(table, output ++ newMetadata, catalog, identifier, options)
copy(output = output ++ newMetadata)
} else {
this
}
Expand Down Expand Up @@ -151,7 +158,12 @@ case class DataSourceV2ScanRelation(
override def name: String = relation.name

override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
val outputString = truncatedString(output, "[", ", ", "]", maxFields)
val nameWithTimeTravelSpec = relation.timeTravelSpec match {
case Some(spec) => s"$name $spec"
case _ => name
}
s"RelationV2$outputString $nameWithTimeTravelSpec"
}

override def computeStats(): Statistics = {
Expand Down Expand Up @@ -235,17 +247,29 @@ object ExtractV2Table {
def unapply(relation: DataSourceV2Relation): Option[Table] = Some(relation.table)
}

object ExtractV2CatalogAndIdentifier {
def unapply(relation: DataSourceV2Relation): Option[(CatalogPlugin, Identifier)] = {
relation match {
case DataSourceV2Relation(_, _, Some(catalog), Some(identifier), _, _) =>
Some((catalog, identifier))
case _ =>
None
}
}
}

object DataSourceV2Relation {
def create(
table: Table,
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap): DataSourceV2Relation = {
options: CaseInsensitiveStringMap,
timeTravelSpec: Option[TimeTravelSpec] = None): DataSourceV2Relation = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
// The v2 source may return schema containing char/varchar type. We replace char/varchar
// with "annotated" string type here as the query engine doesn't support char/varchar yet.
val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema)
DataSourceV2Relation(table, toAttributes(schema), catalog, identifier, options)
DataSourceV2Relation(table, toAttributes(schema), catalog, identifier, options, timeTravelSpec)
}

def create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,10 @@ abstract class InMemoryBaseTable(
}
}
}

def copy(): Table = {
throw new UnsupportedOperationException(s"copy is not supported for ${getClass.getName}")
}
}

object InMemoryBaseTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,41 @@ class InMemoryTable(
new InMemoryWriterBuilderWithOverWrite(info)
}

override def copy(): Table = {
val copiedTable = new InMemoryTable(
name,
columns(),
partitioning,
properties,
constraints,
distribution,
ordering,
numPartitions,
advisoryPartitionSize,
isDistributionStrictlyRequired,
numRowsPerSplit)

copiedTable.dataMap.synchronized {
dataMap.foreach { case (key, splits) =>
val copiedSplits = splits.map { bufferedRows =>
val copiedBufferedRows = new BufferedRows(bufferedRows.key, bufferedRows.schema)
copiedBufferedRows.rows ++= bufferedRows.rows.map(_.copy())
copiedBufferedRows
}
copiedTable.dataMap.put(key, copiedSplits)
}
}

copiedTable.commits ++= commits.map(_.copy())

copiedTable.setCurrentVersion(currentVersion())
if (validatedVersion() != null) {
copiedTable.setValidatedVersion(validatedVersion())
}

copiedTable
}

class InMemoryWriterBuilderWithOverWrite(override val info: LogicalWriteInfo)
extends InMemoryWriterBuilder(info) with SupportsOverwrite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ class BasicInMemoryTableCatalog extends TableCatalog {
}
}

def pinTable(ident: Identifier, version: String): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used in time travel tests.

Option(tables.get(ident)) match {
case Some(table: InMemoryBaseTable) =>
val versionIdent = Identifier.of(ident.namespace, ident.name + version)
val versionTable = table.copy()
tables.put(versionIdent, versionTable)
case Some(table) =>
throw new UnsupportedOperationException(s"Can't pin ${table.getClass.getName}")
case _ =>
throw new NoSuchTableException(ident.asMultipartIdentifier)
}
}

override def loadTable(ident: Identifier, version: String): Table = {
val versionIdent = Identifier.of(ident.namespace, ident.name + version)
Option(tables.get(versionIdent)) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.{Logging, MessageWithContext}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, View}
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.classic.{Dataset, SparkSession}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
ExtractV2CatalogAndIdentifier, ExtractV2Table, FileTable}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
Expand Down Expand Up @@ -82,6 +85,10 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
cachedData.isEmpty
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file is quite confusing. Before this PR, DataSourceV2Relation does not contain the time travel spec, but the Table instance should contain it. This means, if a table scan is cached with version 1, and then we uncache the same table scan but with version 2, we won't uncache the version 1 scan.

Now we put time travel spec in DataSourceV2Relation, which makes this behavior more reliable in case the Table instance does not contain the version.

I don't quite understand what we try to do here. If we want to bring the old behavior, we can simplily clear out the time travel spec in DataSourceV2Relation#canonicalized

private[sql] def numCachedEntries: Int = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add comment "// Test-only"

cachedData.size
}

// Test-only
def cacheQuery(query: Dataset[_]): Unit = {
cacheQuery(query, tableName = None, storageLevel = MEMORY_AND_DISK)
Expand Down Expand Up @@ -211,15 +218,29 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
plan: LogicalPlan,
cascade: Boolean,
blocking: Boolean): Unit = {
uncacheByCondition(spark, _.sameResult(plan), cascade, blocking)
EliminateSubqueryAliases(plan) match {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this branch to avoid changes in the behavior. Some connectors (like Iceberg) use these methods for their custom commands. I wanted to be on the safer side and keep the old behavior for these calls. That's it, if any of these methods are called with DataSourceV2Relation without time travel spec, we will invalidate all cache entries (including time travel) like before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so when r.timeTravelSpec.isEmpty, plan1.sameResult(plan2) won't work with dsv2? Could you add comment in the code?

case r @ ExtractV2CatalogAndIdentifier(catalog, ident) if r.timeTravelSpec.isEmpty =>
val nameParts = ident.toQualifiedNameParts(catalog)
uncacheTableOrView(spark, nameParts, cascade, blocking)
case _ =>
uncacheByCondition(spark, _.sameResult(plan), cascade, blocking)
}
}

def uncacheTableOrView(spark: SparkSession, name: Seq[String], cascade: Boolean): Unit = {
def uncacheTableOrView(
spark: SparkSession,
name: Seq[String],
cascade: Boolean,
blocking: Boolean = false): Unit = {
uncacheByCondition(
spark, isMatchedTableOrView(_, name, spark.sessionState.conf), cascade, blocking = false)
spark, isMatchedTableOrView(_, name, spark.sessionState.conf), cascade, blocking)
}

private def isMatchedTableOrView(plan: LogicalPlan, name: Seq[String], conf: SQLConf): Boolean = {
private def isMatchedTableOrView(
plan: LogicalPlan,
name: Seq[String],
conf: SQLConf,
includeTimeTravel: Boolean = true): Boolean = {
def isSameName(nameInCache: Seq[String]): Boolean = {
nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled)
}
Expand All @@ -228,9 +249,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
case LogicalRelationWithTable(_, Some(catalogTable)) =>
isSameName(catalogTable.identifier.nameParts)

case DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _) =>
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
isSameName(v2Ident.toQualifiedNameParts(catalog))
case DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _, timeTravelSpec) =>
val nameInCache = v2Ident.toQualifiedNameParts(catalog)
isSameName(nameInCache) && (includeTimeTravel || timeTravelSpec.isEmpty)

case v: View =>
isSameName(v.desc.identifier.nameParts)
Expand Down Expand Up @@ -299,8 +320,27 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
* normalized before being used further.
*/
def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
val normalized = QueryExecution.normalize(spark, plan)
recacheByCondition(spark, _.plan.exists(_.sameResult(normalized)))
EliminateSubqueryAliases(plan) match {
case r @ ExtractV2CatalogAndIdentifier(catalog, ident) if r.timeTravelSpec.isEmpty =>
val nameParts = ident.toQualifiedNameParts(catalog)
recacheByTableName(spark, nameParts)
case _ =>
val normalized = QueryExecution.normalize(spark, plan)
recacheByCondition(spark, _.plan.exists(_.sameResult(normalized)))
}
}

/**
* Re-caches all cache entries that reference the given table name.
*/
def recacheByTableName(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will need this in subsequent PRs.

spark: SparkSession,
name: Seq[String],
includeTimeTravel: Boolean = true): Unit = {
def shouldInvalidate(entry: CachedData): Boolean = {
entry.plan.exists(isMatchedTableOrView(_, name, spark.sessionState.conf, includeTimeTravel))
}
recacheByCondition(spark, shouldInvalidate)
}

/**
Expand Down
Loading