Skip to content

Commit 48ed73b

Browse files
heyihongyhuang-db
authored andcommitted
[SPARK-52337][CONNECT] Make InvalidPlanInput a user-facing error
### What changes were proposed in this pull request? This PR makes `InvalidPlanInput` a user-facing error by: 1. Refactoring `InvalidPlanInput` to implement `SparkThrowable` interface 2. Adding proper error condition (the default value is `INTERNAL_ERROR`) and message parameters support 3. Updating error creation in `InvalidInputErrors` to use the new error format 4. Adding test coverage for the new error handling ### Why are the changes needed? The PR follows the pattern of other Spark errors by implementing the `SparkThrowable` interface, which provides a standardized way to handle and display errors to users. This makes error messages more consistent and easier to understand across the Spark ecosystem. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `build/sbt "connect/testOnly *InvalidInputErrorsSuite"` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 0.50.7 (Universal) Closes apache#51054 from heyihong/SPARK-52337. Authored-by: Yihong He <heyihong.cn@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent b9681a5 commit 48ed73b

File tree

3 files changed

+45
-14
lines changed

3 files changed

+45
-14
lines changed

sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,38 @@
1616
*/
1717
package org.apache.spark.sql.connect.common
1818

19+
import scala.jdk.CollectionConverters._
20+
21+
import org.apache.spark.{SparkThrowable, SparkThrowableHelper}
22+
1923
/**
2024
* Error thrown when a connect plan is not valid.
2125
*/
2226
final case class InvalidPlanInput(
23-
private val message: String = "",
24-
private val cause: Throwable = None.orNull)
25-
extends Exception(message, cause)
27+
private val errorCondition: String,
28+
private val messageParameters: Map[String, String],
29+
private val causeOpt: Option[Throwable])
30+
extends Exception(
31+
SparkThrowableHelper.getMessage(errorCondition, messageParameters),
32+
causeOpt.orNull)
33+
with SparkThrowable {
34+
35+
override def getCondition: String = errorCondition
36+
37+
override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava
38+
}
39+
40+
object InvalidPlanInput {
41+
42+
def apply(message: String): InvalidPlanInput =
43+
InvalidPlanInput(
44+
errorCondition = "INTERNAL_ERROR",
45+
messageParameters = Map("message" -> message),
46+
causeOpt = None)
47+
48+
def apply(errorCondition: String, messageParameters: Map[String, String]): InvalidPlanInput =
49+
InvalidPlanInput(
50+
errorCondition = errorCondition,
51+
messageParameters = messageParameters,
52+
causeOpt = None)
53+
}

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,13 @@ package org.apache.spark.sql.connect.planner
1919

2020
import scala.collection.mutable
2121

22-
import org.apache.spark.SparkThrowableHelper
2322
import org.apache.spark.connect.proto
2423
import org.apache.spark.sql.connect.common.InvalidPlanInput
2524
import org.apache.spark.sql.errors.DataTypeErrors.{quoteByDefault, toSQLType}
2625
import org.apache.spark.sql.types.DataType
2726

2827
object InvalidInputErrors {
2928

30-
// invalidPlanInput is a helper function to facilitate the migration of InvalidInputErrors
31-
// to support NERF.
32-
private def invalidPlanInput(
33-
errorCondition: String,
34-
messageParameters: Map[String, String] = Map.empty): InvalidPlanInput = {
35-
InvalidPlanInput(SparkThrowableHelper.getMessage(errorCondition, messageParameters))
36-
}
37-
3829
def unknownRelationNotSupported(rel: proto.Relation): InvalidPlanInput =
3930
InvalidPlanInput(s"${rel.getUnknown} not supported.")
4031

@@ -97,7 +88,7 @@ object InvalidInputErrors {
9788
InvalidPlanInput("Schema for LocalRelation is required when the input data is not provided.")
9889

9990
def invalidSchemaStringNonStructType(schema: String, dataType: DataType): InvalidPlanInput =
100-
invalidPlanInput(
91+
InvalidPlanInput(
10192
"INVALID_SCHEMA.NON_STRUCT_TYPE",
10293
Map("inputSchema" -> quoteByDefault(schema), "dataType" -> toSQLType(dataType)))
10394

@@ -114,7 +105,7 @@ object InvalidInputErrors {
114105
InvalidPlanInput(s"Does not support $what")
115106

116107
def invalidSchemaTypeNonStruct(dataType: DataType): InvalidPlanInput =
117-
invalidPlanInput("INVALID_SCHEMA_TYPE_NON_STRUCT", Map("dataType" -> toSQLType(dataType)))
108+
InvalidPlanInput("INVALID_SCHEMA_TYPE_NON_STRUCT", Map("dataType" -> toSQLType(dataType)))
118109

119110
def expressionIdNotSupported(exprId: Int): InvalidPlanInput =
120111
InvalidPlanInput(s"Expression with ID: $exprId is not supported")

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/InvalidInputErrorsSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,18 @@ class InvalidInputErrorsSuite extends PlanTest with SparkConnectPlanTest {
9595
.build()
9696

9797
proto.Relation.newBuilder().setRead(read).build()
98+
}),
99+
TestCase(
100+
name = "Deduplicate needs input",
101+
expectedErrorCondition = "INTERNAL_ERROR",
102+
expectedParameters = Map("message" -> "Deduplicate needs a plan input"),
103+
invalidInput = {
104+
val deduplicate = proto.Deduplicate
105+
.newBuilder()
106+
.setAllColumnsAsKeys(true)
107+
.build()
108+
109+
proto.Relation.newBuilder().setDeduplicate(deduplicate).build()
98110
}))
99111

100112
// Run all test cases

0 commit comments

Comments
 (0)