-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53732][SQL] Remember TimeTravelSpec in DataSourceV2Relation #52599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,7 +139,8 @@ class RelationResolution(override val catalogManager: CatalogManager) | |
ident, | ||
table, | ||
u.clearWritePrivileges.options, | ||
u.isStreaming | ||
u.isStreaming, | ||
finalTimeTravelSpec | ||
) | ||
loaded.foreach(AnalysisContext.get.relationCache.update(key, _)) | ||
u.getTagValue(LogicalPlan.PLAN_ID_TAG) | ||
|
@@ -162,7 +163,8 @@ class RelationResolution(override val catalogManager: CatalogManager) | |
ident: Identifier, | ||
table: Option[Table], | ||
options: CaseInsensitiveStringMap, | ||
isStreaming: Boolean): Option[LogicalPlan] = { | ||
isStreaming: Boolean, | ||
timeTravelSpec: Option[TimeTravelSpec]): Option[LogicalPlan] = { | ||
table.map { | ||
// To utilize this code path to execute V1 commands, e.g. INSERT, | ||
// either it must be session catalog, or tracksPartitionsInCatalog | ||
|
@@ -189,6 +191,7 @@ class RelationResolution(override val catalogManager: CatalogManager) | |
|
||
case table => | ||
if (isStreaming) { | ||
assert(timeTravelSpec.isEmpty, "time travel is not allowed in streaming") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be impossible to reach this line with a valid time travel spec. Just a sanity check. |
||
val v1Fallback = table match { | ||
case withFallback: V2TableWithV1Fallback => | ||
Some(UnresolvedCatalogRelation(withFallback.v1Table, isStreaming = true)) | ||
|
@@ -210,7 +213,7 @@ class RelationResolution(override val catalogManager: CatalogManager) | |
} else { | ||
SubqueryAlias( | ||
catalog.name +: ident.asMultipartIdentifier, | ||
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options) | ||
DataSourceV2Relation.create(table, Some(catalog), Some(ident), options, timeTravelSpec) | ||
) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,8 +27,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap | |
|
||
sealed trait TimeTravelSpec | ||
|
||
case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec | ||
case class AsOfVersion(version: String) extends TimeTravelSpec | ||
case class AsOfTimestamp(timestamp: Long) extends TimeTravelSpec { | ||
override def toString: String = s"TIMESTAMP AS OF $timestamp" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needed for proper |
||
} | ||
case class AsOfVersion(version: String) extends TimeTravelSpec { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add a blank line between two classes |
||
override def toString: String = s"VERSION AS OF '$version'" | ||
} | ||
|
||
object TimeTravelSpec { | ||
def create( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ | |
package org.apache.spark.sql.execution.datasources.v2 | ||
|
||
import org.apache.spark.SparkException | ||
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} | ||
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation, TimeTravelSpec} | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder} | ||
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics} | ||
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes | ||
|
@@ -45,7 +45,8 @@ abstract class DataSourceV2RelationBase( | |
output: Seq[AttributeReference], | ||
catalog: Option[CatalogPlugin], | ||
identifier: Option[Identifier], | ||
options: CaseInsensitiveStringMap) | ||
options: CaseInsensitiveStringMap, | ||
timeTravelSpec: Option[TimeTravelSpec] = None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's nice to have this field so that Spark is aware of the version of the table explicitly. But I don't quite understand why it's necessary, as implementation can remember the time travel spec in the v2 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One of the use cases that both Iceberg and Delta struggle today is checking that a query uses consistent versions of the table throughout the plan. Having Another use case that is even bigger is tracking read sets in DELETE, UPDATE, and MERGE. I have a proposal/PR about a transactional catalog that allows one to capture all operations that happened during an operation for snapshot and serializable isolation. It is also important to track and distinguish time travel there. Does this make sense? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The third use case would be views that capture logical plans. They currently rely on tricks in connectors with refresh. I want to simplify/fix that by moving the refresh to Spark so that DSv2 connectors can pin versions correctly. |
||
extends LeafNode with MultiInstanceRelation with NamedRelation { | ||
|
||
import DataSourceV2Implicits._ | ||
|
@@ -65,7 +66,12 @@ abstract class DataSourceV2RelationBase( | |
override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA) | ||
|
||
override def simpleString(maxFields: Int): String = { | ||
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" | ||
val outputString = truncatedString(output, "[", ", ", "]", maxFields) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Covered with tests. |
||
val nameWithTimeTravelSpec = timeTravelSpec match { | ||
case Some(spec) => s"$name $spec" | ||
case _ => name | ||
} | ||
s"RelationV2$outputString $nameWithTimeTravelSpec" | ||
} | ||
|
||
override def computeStats(): Statistics = { | ||
|
@@ -96,8 +102,9 @@ case class DataSourceV2Relation( | |
override val output: Seq[AttributeReference], | ||
catalog: Option[CatalogPlugin], | ||
identifier: Option[Identifier], | ||
options: CaseInsensitiveStringMap) | ||
extends DataSourceV2RelationBase(table, output, catalog, identifier, options) | ||
options: CaseInsensitiveStringMap, | ||
timeTravelSpec: Option[TimeTravelSpec] = None) | ||
extends DataSourceV2RelationBase(table, output, catalog, identifier, options, timeTravelSpec) | ||
with ExposesMetadataColumns { | ||
|
||
import DataSourceV2Implicits._ | ||
|
@@ -117,7 +124,7 @@ case class DataSourceV2Relation( | |
def withMetadataColumns(): DataSourceV2Relation = { | ||
val newMetadata = metadataOutput.filterNot(outputSet.contains) | ||
if (newMetadata.nonEmpty) { | ||
DataSourceV2Relation(table, output ++ newMetadata, catalog, identifier, options) | ||
copy(output = output ++ newMetadata) | ||
} else { | ||
this | ||
} | ||
|
@@ -151,7 +158,12 @@ case class DataSourceV2ScanRelation( | |
override def name: String = relation.name | ||
|
||
override def simpleString(maxFields: Int): String = { | ||
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" | ||
val outputString = truncatedString(output, "[", ", ", "]", maxFields) | ||
val nameWithTimeTravelSpec = relation.timeTravelSpec match { | ||
case Some(spec) => s"$name $spec" | ||
case _ => name | ||
} | ||
s"RelationV2$outputString $nameWithTimeTravelSpec" | ||
} | ||
|
||
override def computeStats(): Statistics = { | ||
|
@@ -235,17 +247,29 @@ object ExtractV2Table { | |
def unapply(relation: DataSourceV2Relation): Option[Table] = Some(relation.table) | ||
} | ||
|
||
object ExtractV2CatalogAndIdentifier { | ||
def unapply(relation: DataSourceV2Relation): Option[(CatalogPlugin, Identifier)] = { | ||
relation match { | ||
case DataSourceV2Relation(_, _, Some(catalog), Some(identifier), _, _) => | ||
Some((catalog, identifier)) | ||
case _ => | ||
None | ||
} | ||
} | ||
} | ||
|
||
object DataSourceV2Relation { | ||
def create( | ||
table: Table, | ||
catalog: Option[CatalogPlugin], | ||
identifier: Option[Identifier], | ||
options: CaseInsensitiveStringMap): DataSourceV2Relation = { | ||
options: CaseInsensitiveStringMap, | ||
timeTravelSpec: Option[TimeTravelSpec] = None): DataSourceV2Relation = { | ||
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ | ||
// The v2 source may return schema containing char/varchar type. We replace char/varchar | ||
// with "annotated" string type here as the query engine doesn't support char/varchar yet. | ||
val schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(table.columns.asSchema) | ||
DataSourceV2Relation(table, toAttributes(schema), catalog, identifier, options) | ||
DataSourceV2Relation(table, toAttributes(schema), catalog, identifier, options, timeTravelSpec) | ||
} | ||
|
||
def create( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,19 @@ class BasicInMemoryTableCatalog extends TableCatalog { | |
} | ||
} | ||
|
||
def pinTable(ident: Identifier, version: String): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Used in time travel tests. |
||
Option(tables.get(ident)) match { | ||
case Some(table: InMemoryBaseTable) => | ||
val versionIdent = Identifier.of(ident.namespace, ident.name + version) | ||
val versionTable = table.copy() | ||
tables.put(versionIdent, versionTable) | ||
case Some(table) => | ||
throw new UnsupportedOperationException(s"Can't pin ${table.getClass.getName}") | ||
case _ => | ||
throw new NoSuchTableException(ident.asMultipartIdentifier) | ||
} | ||
} | ||
|
||
override def loadTable(ident: Identifier, version: String): Table = { | ||
val versionIdent = Identifier.of(ident.namespace, ident.name + version) | ||
Option(tables.get(versionIdent)) match { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,18 +21,21 @@ import org.apache.hadoop.fs.{FileSystem, Path} | |
|
||
import org.apache.spark.internal.{Logging, MessageWithContext} | ||
import org.apache.spark.internal.LogKeys._ | ||
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases | ||
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} | ||
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint | ||
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, View} | ||
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION | ||
import org.apache.spark.sql.catalyst.util.sideBySide | ||
import org.apache.spark.sql.classic.{Dataset, SparkSession} | ||
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper | ||
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper | ||
import org.apache.spark.sql.execution.columnar.InMemoryRelation | ||
import org.apache.spark.sql.execution.command.CommandUtils | ||
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable} | ||
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable} | ||
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, | ||
ExtractV2CatalogAndIdentifier, ExtractV2Table, FileTable} | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.storage.StorageLevel | ||
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK | ||
|
@@ -82,6 +85,10 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { | |
cachedData.isEmpty | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes in this file is quite confusing. Before this PR, Now we put time travel spec in I don't quite understand what we try to do here. If we want to bring the old behavior, we can simplily clear out the time travel spec in |
||
private[sql] def numCachedEntries: Int = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we add comment "// Test-only" |
||
cachedData.size | ||
} | ||
|
||
// Test-only | ||
def cacheQuery(query: Dataset[_]): Unit = { | ||
cacheQuery(query, tableName = None, storageLevel = MEMORY_AND_DISK) | ||
|
@@ -211,15 +218,29 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { | |
plan: LogicalPlan, | ||
cascade: Boolean, | ||
blocking: Boolean): Unit = { | ||
uncacheByCondition(spark, _.sameResult(plan), cascade, blocking) | ||
EliminateSubqueryAliases(plan) match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added this branch to avoid changes in the behavior. Some connectors (like Iceberg) use these methods for their custom commands. I wanted to be on the safer side and keep the old behavior for these calls. That's it, if any of these methods are called with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so when |
||
case r @ ExtractV2CatalogAndIdentifier(catalog, ident) if r.timeTravelSpec.isEmpty => | ||
val nameParts = ident.toQualifiedNameParts(catalog) | ||
uncacheTableOrView(spark, nameParts, cascade, blocking) | ||
case _ => | ||
uncacheByCondition(spark, _.sameResult(plan), cascade, blocking) | ||
} | ||
} | ||
|
||
def uncacheTableOrView(spark: SparkSession, name: Seq[String], cascade: Boolean): Unit = { | ||
def uncacheTableOrView( | ||
spark: SparkSession, | ||
name: Seq[String], | ||
cascade: Boolean, | ||
blocking: Boolean = false): Unit = { | ||
uncacheByCondition( | ||
spark, isMatchedTableOrView(_, name, spark.sessionState.conf), cascade, blocking = false) | ||
spark, isMatchedTableOrView(_, name, spark.sessionState.conf), cascade, blocking) | ||
} | ||
|
||
private def isMatchedTableOrView(plan: LogicalPlan, name: Seq[String], conf: SQLConf): Boolean = { | ||
private def isMatchedTableOrView( | ||
plan: LogicalPlan, | ||
name: Seq[String], | ||
conf: SQLConf, | ||
includeTimeTravel: Boolean = true): Boolean = { | ||
def isSameName(nameInCache: Seq[String]): Boolean = { | ||
nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled) | ||
} | ||
|
@@ -228,9 +249,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { | |
case LogicalRelationWithTable(_, Some(catalogTable)) => | ||
isSameName(catalogTable.identifier.nameParts) | ||
|
||
case DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _) => | ||
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper | ||
isSameName(v2Ident.toQualifiedNameParts(catalog)) | ||
case DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _, timeTravelSpec) => | ||
val nameInCache = v2Ident.toQualifiedNameParts(catalog) | ||
isSameName(nameInCache) && (includeTimeTravel || timeTravelSpec.isEmpty) | ||
|
||
case v: View => | ||
isSameName(v.desc.identifier.nameParts) | ||
|
@@ -299,8 +320,27 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { | |
* normalized before being used further. | ||
*/ | ||
def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = { | ||
val normalized = QueryExecution.normalize(spark, plan) | ||
recacheByCondition(spark, _.plan.exists(_.sameResult(normalized))) | ||
EliminateSubqueryAliases(plan) match { | ||
case r @ ExtractV2CatalogAndIdentifier(catalog, ident) if r.timeTravelSpec.isEmpty => | ||
val nameParts = ident.toQualifiedNameParts(catalog) | ||
recacheByTableName(spark, nameParts) | ||
case _ => | ||
val normalized = QueryExecution.normalize(spark, plan) | ||
recacheByCondition(spark, _.plan.exists(_.sameResult(normalized))) | ||
} | ||
} | ||
|
||
/** | ||
* Re-caches all cache entries that reference the given table name. | ||
*/ | ||
def recacheByTableName( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will need this in subsequent PRs. |
||
spark: SparkSession, | ||
name: Seq[String], | ||
includeTimeTravel: Boolean = true): Unit = { | ||
def shouldInvalidate(entry: CachedData): Boolean = { | ||
entry.plan.exists(isMatchedTableOrView(_, name, spark.sessionState.conf, includeTimeTravel)) | ||
} | ||
recacheByCondition(spark, shouldInvalidate) | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed for testing below.