Skip to content

Commit 7480f8d

Browse files
richardc-dbcloud-fan
authored andcommitted
[SPARK-52060][SQL] Make OneRowRelationExec node
### What changes were proposed in this pull request? creates a new OneRowRelationExec node, which is more or less a copy of the RDDScanExec node. We want a dedicated node because this helps make it more clear when a one row relation, i.e. for patterns like `SELECT version()` is used. ### Why are the changes needed? this makes it more clear in the code that a one row relation is used and allows us to avoid checking the hard coded "OneRowRelation" string when pattern matching. ### Does this PR introduce _any_ user-facing change? yes, the plan will now be `OneRowRelationExec` rather than `RDDScanExec`. The plan string should be the same, however. ### How was this patch tested? added UTs ### Was this patch authored or co-authored using generative AI tooling? Closes apache#50849 from richardc-db/make_one_row_relation_node. Lead-authored-by: Richard Chen <r.chen@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 5ff4b8a commit 7480f8d

File tree

4 files changed

+58
-4
lines changed

4 files changed

+58
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,3 +319,46 @@ case class RDDScanExec(
319319

320320
override def getStream: Option[SparkDataStream] = stream
321321
}
322+
323+
/**
324+
* A physical plan node for `OneRowRelation` for scans with no 'FROM' clause.
325+
*
326+
* We do not extend `RDDScanExec` in order to avoid complexity due to `TreeNode.makeCopy` and
327+
* `TreeNode`'s general use of reflection.
328+
*/
329+
case class OneRowRelationExec() extends LeafExecNode
330+
with InputRDDCodegen {
331+
332+
override val nodeName: String = s"Scan OneRowRelation"
333+
334+
override val output: Seq[Attribute] = Nil
335+
336+
private val rdd: RDD[InternalRow] = {
337+
val numOutputRows = longMetric("numOutputRows")
338+
session
339+
.sparkContext
340+
.parallelize(Seq(""), 1)
341+
.mapPartitionsInternal { _ =>
342+
val proj = UnsafeProjection.create(Seq.empty[Expression])
343+
Iterator(proj.apply(InternalRow.empty)).map { r =>
344+
numOutputRows += 1
345+
r
346+
}
347+
}
348+
}
349+
350+
override lazy val metrics = Map(
351+
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
352+
353+
protected override def doExecute(): RDD[InternalRow] = rdd
354+
355+
override def simpleString(maxFields: Int): String = s"$nodeName[]"
356+
357+
override def inputRDD: RDD[InternalRow] = rdd
358+
359+
override protected val createUnsafeProjection: Boolean = false
360+
361+
override protected def doCanonicalize(): SparkPlan = {
362+
super.doCanonicalize().asInstanceOf[OneRowRelationExec].copy()
363+
}
364+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -690,8 +690,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
690690
}
691691
}
692692

693-
protected lazy val singleRowRdd = session.sparkContext.parallelize(Seq(InternalRow()), 1)
694-
695693
object InMemoryScans extends Strategy {
696694
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
697695
case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
@@ -1054,7 +1052,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
10541052
generator, g.requiredChildOutput, outer,
10551053
g.qualifiedGeneratorOutput, planLater(child)) :: Nil
10561054
case _: logical.OneRowRelation =>
1057-
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
1055+
execution.OneRowRelationExec() :: Nil
10581056
case r: logical.Range =>
10591057
execution.RangeExec(r) :: Nil
10601058
case r: logical.RepartitionByExpression =>

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ trait CodegenSupport extends SparkPlan {
5656
case _: SortMergeJoinExec => "smj"
5757
case _: BroadcastNestedLoopJoinExec => "bnlj"
5858
case _: RDDScanExec => "rdd"
59+
case _: OneRowRelationExec => "orr"
5960
case _: DataSourceScanExec => "scan"
6061
case _: InMemoryTableScanExec => "memoryScan"
6162
case _: WholeStageCodegenExec => "wholestagecodegen"

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, NestedCo
3838
import org.apache.spark.sql.catalyst.parser.ParseException
3939
import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, Project, RepartitionByExpression, Sort}
4040
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
41-
import org.apache.spark.sql.execution.{CommandResultExec, UnionExec}
41+
import org.apache.spark.sql.execution.{CommandResultExec, OneRowRelationExec, UnionExec}
4242
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
4343
import org.apache.spark.sql.execution.aggregate._
4444
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
@@ -4962,6 +4962,18 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
49624962
parameters = Map("plan" -> "'Aggregate [groupingsets(Vector(0), posexplode(array(col)))]")
49634963
)
49644964
}
4965+
4966+
Seq(true, false).foreach { codegenEnabled =>
4967+
test(s"SPARK-52060: one row relation with codegen enabled - $codegenEnabled") {
4968+
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled.toString) {
4969+
val df = spark.sql("select 'test' stringCol")
4970+
checkAnswer(df, Row("test"))
4971+
val plan = df.queryExecution.executedPlan
4972+
val oneRowRelationExists = plan.find(_.isInstanceOf[OneRowRelationExec]).isDefined
4973+
assert(oneRowRelationExists)
4974+
}
4975+
}
4976+
}
49654977
}
49664978

49674979
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)