Skip to content

Commit 69f279c

Browse files
heyihongyhuang-db
authored andcommitted
[SPARK-52383][CONNECT] Improve errors in SparkConnectPlanner
### What changes were proposed in this pull request? This PR improves error handling in SparkConnectPlanner by: 1. Adding new error handling methods in `InvalidInputErrors.scala` for better error messages: - `invalidEnum` for handling invalid enum values - `invalidOneOfField` for handling invalid oneOf fields in protobuf messages - `cannotBeEmpty` for handling empty fields - `streamingQueryRunIdMismatch` and `streamingQueryNotFound` for streaming query errors 2. Replacing specific error messages with more generic ones. For examples: - Replacing `unknownRelationNotSupported` with `invalidOneOfField` - Replacing `catalogTypeNotSupported` with `invalidOneOfField` - Replacing `functionIdNotSupported` with `invalidOneOfField` - Replacing `expressionIdNotSupported` with `invalidOneOfField` - Replacing `dataSourceIdNotSupported` with `invalidOneOfField` 3. Improving error handling for protobuf-related issues: - Better handling of oneOf fields in protobuf messages - More descriptive error messages for invalid enum values - Better handling of empty fields ### Why are the changes needed? 1. Provide more specific and descriptive error messages 2. Better handle protobuf-related issues that are common in the Connect API 3. Make error messages more consistent across different parts of the codebase ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `build/sbt "connect/testOnly *SparkConnectPlannerSuite"` `build/sbt "connect/testOnly *InvalidInputErrorsSuite"` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 0.50.7 (Universal) Closes apache#51062 from heyihong/SPARK-52383. Authored-by: Yihong He <heyihong.cn@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 8a3d32b commit 69f279c

File tree

4 files changed

+148
-134
lines changed

4 files changed

+148
-134
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala

Lines changed: 36 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,20 @@ package org.apache.spark.sql.connect.planner
1919

2020
import scala.collection.mutable
2121

22+
import com.google.protobuf.Descriptors.Descriptor
23+
import com.google.protobuf.Internal.EnumLite
24+
import com.google.protobuf.ProtocolMessageEnum
25+
2226
import org.apache.spark.connect.proto
2327
import org.apache.spark.sql.connect.common.InvalidPlanInput
2428
import org.apache.spark.sql.errors.DataTypeErrors.{quoteByDefault, toSQLType}
2529
import org.apache.spark.sql.types.DataType
2630

2731
object InvalidInputErrors {
2832

29-
def unknownRelationNotSupported(rel: proto.Relation): InvalidPlanInput =
30-
InvalidPlanInput(s"${rel.getUnknown} not supported.")
31-
3233
def noHandlerFoundForExtension(): InvalidPlanInput =
3334
InvalidPlanInput("No handler found for extension")
3435

35-
def catalogTypeNotSupported(catType: proto.Catalog.CatTypeCase): InvalidPlanInput =
36-
InvalidPlanInput(s"$catType not supported.")
37-
3836
def invalidSQLWithReferences(query: proto.WithRelations): InvalidPlanInput =
3937
InvalidPlanInput(s"$query is not a valid relation for SQL with references")
4038

@@ -61,9 +59,6 @@ object InvalidInputErrors {
6159
def functionEvalTypeNotSupported(evalType: Int): InvalidPlanInput =
6260
InvalidPlanInput(s"Function with EvalType: $evalType is not supported")
6361

64-
def functionIdNotSupported(functionId: Int): InvalidPlanInput =
65-
InvalidPlanInput(s"Function with ID: $functionId is not supported")
66-
6762
def groupingExpressionAbsentForKeyValueGroupedDataset(): InvalidPlanInput =
6863
InvalidPlanInput("The grouping expression cannot be absent for KeyValueGroupedDataset")
6964

@@ -101,15 +96,31 @@ object InvalidInputErrors {
10196
def multiplePathsNotSupportedForStreamingSource(): InvalidPlanInput =
10297
InvalidPlanInput("Multiple paths are not supported for streaming source")
10398

104-
def doesNotSupport(what: String): InvalidPlanInput =
105-
InvalidPlanInput(s"Does not support $what")
99+
def invalidEnum(protoEnum: Enum[_] with ProtocolMessageEnum): InvalidPlanInput =
100+
InvalidPlanInput(
101+
s"This enum value of ${protoEnum.getDescriptorForType.getFullName}" +
102+
s" is invalid: ${protoEnum.name()}(${protoEnum.getNumber})")
103+
104+
def invalidOneOfField(
105+
enumCase: Enum[_] with EnumLite,
106+
descriptor: Descriptor): InvalidPlanInput = {
107+
// If the oneOf field is not set, the enum number will be 0.
108+
if (enumCase.getNumber == 0) {
109+
InvalidPlanInput(
110+
s"This oneOf field in ${descriptor.getFullName} is not set: ${enumCase.name()}")
111+
} else {
112+
InvalidPlanInput(
113+
s"This oneOf field message in ${descriptor.getFullName} is not supported: " +
114+
s"${enumCase.name()}(${enumCase.getNumber})")
115+
}
116+
}
117+
118+
def cannotBeEmpty(fieldName: String, descriptor: Descriptor): InvalidPlanInput =
119+
InvalidPlanInput(s"$fieldName in ${descriptor.getFullName} cannot be empty")
106120

107121
def invalidSchemaTypeNonStruct(dataType: DataType): InvalidPlanInput =
108122
InvalidPlanInput("INVALID_SCHEMA_TYPE_NON_STRUCT", Map("dataType" -> toSQLType(dataType)))
109123

110-
def expressionIdNotSupported(exprId: Int): InvalidPlanInput =
111-
InvalidPlanInput(s"Expression with ID: $exprId is not supported")
112-
113124
def lambdaFunctionArgumentCountInvalid(got: Int): InvalidPlanInput =
114125
InvalidPlanInput(s"LambdaFunction requires 1 ~ 3 arguments, but got $got ones!")
115126

@@ -127,18 +138,9 @@ object InvalidInputErrors {
127138
def windowFunctionRequired(): InvalidPlanInput =
128139
InvalidPlanInput("WindowFunction is required in WindowExpression")
129140

130-
def unknownFrameType(
131-
frameType: proto.Expression.Window.WindowFrame.FrameType): InvalidPlanInput =
132-
InvalidPlanInput(s"Unknown FrameType $frameType")
133-
134141
def lowerBoundRequiredInWindowFrame(): InvalidPlanInput =
135142
InvalidPlanInput("LowerBound is required in WindowFrame")
136143

137-
def unknownFrameBoundary(
138-
boundary: proto.Expression.Window.WindowFrame.FrameBoundary.BoundaryCase)
139-
: InvalidPlanInput =
140-
InvalidPlanInput(s"Unknown FrameBoundary $boundary")
141-
142144
def upperBoundRequiredInWindowFrame(): InvalidPlanInput =
143145
InvalidPlanInput("UpperBound is required in WindowFrame")
144146

@@ -151,36 +153,18 @@ object InvalidInputErrors {
151153
def intersectDoesNotSupportUnionByName(): InvalidPlanInput =
152154
InvalidPlanInput("Intersect does not support union_by_name")
153155

154-
def unsupportedSetOperation(op: Int): InvalidPlanInput =
155-
InvalidPlanInput(s"Unsupported set operation $op")
156-
157-
def joinTypeNotSupported(t: proto.Join.JoinType): InvalidPlanInput =
158-
InvalidPlanInput(s"Join type $t is not supported")
159-
160156
def aggregateNeedsPlanInput(): InvalidPlanInput =
161157
InvalidPlanInput("Aggregate needs a plan input")
162158

163159
def aggregateWithPivotRequiresPivot(): InvalidPlanInput =
164160
InvalidPlanInput("Aggregate with GROUP_TYPE_PIVOT requires a Pivot")
165161

166-
def runnerCannotBeEmptyInExecuteExternalCommand(): InvalidPlanInput =
167-
InvalidPlanInput("runner cannot be empty in executeExternalCommand")
168-
169-
def commandCannotBeEmptyInExecuteExternalCommand(): InvalidPlanInput =
170-
InvalidPlanInput("command cannot be empty in executeExternalCommand")
171-
172-
def unexpectedForeachBatchFunction(): InvalidPlanInput =
173-
InvalidPlanInput("Unexpected foreachBatch function")
174-
175162
def invalidWithRelationReference(): InvalidPlanInput =
176163
InvalidPlanInput("Invalid WithRelation reference")
177164

178165
def assertionFailure(message: String): InvalidPlanInput =
179166
InvalidPlanInput(message)
180167

181-
def unsupportedMergeActionType(actionType: proto.MergeAction.ActionType): InvalidPlanInput =
182-
InvalidPlanInput(s"Unsupported merge action type $actionType")
183-
184168
def unresolvedNamedLambdaVariableRequiresNamePart(): InvalidPlanInput =
185169
InvalidPlanInput("UnresolvedNamedLambdaVariable requires at least one name part!")
186170

@@ -190,15 +174,6 @@ object InvalidInputErrors {
190174
def sqlCommandExpectsSqlOrWithRelations(other: proto.Relation.RelTypeCase): InvalidPlanInput =
191175
InvalidPlanInput(s"SQL command expects either a SQL or a WithRelations, but got $other")
192176

193-
def unknownGroupType(groupType: proto.Aggregate.GroupType): InvalidPlanInput =
194-
InvalidPlanInput(s"Unknown group type $groupType")
195-
196-
def dataSourceIdNotSupported(dataSourceId: Int): InvalidPlanInput =
197-
InvalidPlanInput(s"Data source id $dataSourceId is not supported")
198-
199-
def unknownSubqueryType(subqueryType: proto.SubqueryExpression.SubqueryType): InvalidPlanInput =
200-
InvalidPlanInput(s"Unknown subquery type $subqueryType")
201-
202177
def reduceShouldCarryScalarScalaUdf(got: mutable.Buffer[proto.Expression]): InvalidPlanInput =
203178
InvalidPlanInput(s"reduce should carry a scalar scala udf, but got $got")
204179

@@ -207,4 +182,15 @@ object InvalidInputErrors {
207182

208183
def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]): InvalidPlanInput =
209184
InvalidPlanInput(s"Unsupported UserDefinedFunction implementation: ${clazz}")
185+
186+
def streamingQueryRunIdMismatch(
187+
id: String,
188+
runId: String,
189+
serverRunId: String): InvalidPlanInput =
190+
InvalidPlanInput(
191+
s"Run id mismatch for query id $id. Run id in the request $runId " +
192+
s"does not match one on the server $serverRunId. The query might have restarted.")
193+
194+
def streamingQueryNotFound(id: String): InvalidPlanInput =
195+
InvalidPlanInput(s"Streaming query $id is not found")
210196
}

0 commit comments

Comments
 (0)