Skip to content

Commit 077b27c

Browse files
yaooqinndongjoon-hyun
authored andcommitted
[SPARK-52671][SQL] RowEncoder shall not lookup a resolved UDT
### What changes were proposed in this pull request? This PR fixes a bug shown below ```sql spark-sql (default)> select submissionTime from bbb; xxx spark-sql (default)> cache table a as select submissionTime from bbb; org.apache.spark.SparkException: xxx is not annotated with SQLUserDefinedType nor registered with UDTRegistration.} at org.apache.spark.sql.errors.ExecutionErrors.userDefinedTypeNotAnnotatedAndRegisteredError(ExecutionErrors.scala:167) at org.apache.spark.sql.errors.ExecutionErrors.userDefinedTypeNotAnnotatedAndRegisteredError$(ExecutionErrors.scala:163) at org.apache.spark.sql.errors.ExecutionErrors$.userDefinedTypeNotAnnotatedAndRegisteredError(ExecutionErrors.scala:259) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$1(RowEncoder.scala:108) at scala.Option.getOrElse(Option.scala:201) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:108) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$2(RowEncoder.scala:128) at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:125) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderFor(RowEncoder.scala:69) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderFor(RowEncoder.scala:65) at org.apache.spark.sql.classic.Dataset.toDF(Dataset.scala:519) at org.apache.spark.sql.classic.Dataset.groupBy(Dataset.scala:941) at org.apache.spark.sql.classic.Dataset.count(Dataset.scala:1501) ``` The RowEncoder tried to relookup a UDT which is already resolved ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? modified tests in sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala ### Was this patch authored or co-authored using generative AI tooling? no Closes #51360 from yaooqinn/SPARK-52671. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent a053c68 commit 077b27c

File tree

4 files changed

+25
-25
lines changed

4 files changed

+25
-25
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8345,11 +8345,6 @@
83458345
"Failed to get outer pointer for <innerCls>."
83468346
]
83478347
},
8348-
"_LEGACY_ERROR_TEMP_2155" : {
8349-
"message" : [
8350-
"<userClass> is not annotated with SQLUserDefinedType nor registered with UDTRegistration.}"
8351-
]
8352-
},
83538348
"_LEGACY_ERROR_TEMP_2163" : {
83548349
"message" : [
83558350
"Initial type <dataType> must be a <target>."

sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.reflect.classTag
2222

2323
import org.apache.spark.sql.{AnalysisException, Row}
2424
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder}
25-
import org.apache.spark.sql.errors.{DataTypeErrorsBase, ExecutionErrors}
25+
import org.apache.spark.sql.errors.DataTypeErrorsBase
2626
import org.apache.spark.sql.internal.SqlApiConf
2727
import org.apache.spark.sql.types._
2828
import org.apache.spark.util.ArrayImplicits._
@@ -99,16 +99,7 @@ object RowEncoder extends DataTypeErrorsBase {
9999
case p: PythonUserDefinedType =>
100100
// TODO check if this works.
101101
encoderForDataType(p.sqlType, lenient)
102-
case udt: UserDefinedType[_] =>
103-
val annotation = udt.userClass.getAnnotation(classOf[SQLUserDefinedType])
104-
val udtClass: Class[_] = if (annotation != null) {
105-
annotation.udt()
106-
} else {
107-
UDTRegistration.getUDTFor(udt.userClass.getName).getOrElse {
108-
throw ExecutionErrors.userDefinedTypeNotAnnotatedAndRegisteredError(udt)
109-
}
110-
}
111-
UDTEncoder(udt, udtClass.asInstanceOf[Class[_ <: UserDefinedType[_]]])
102+
case udt: UserDefinedType[_] => UDTEncoder(udt, udt.getClass)
112103
case ArrayType(elementType, containsNull) =>
113104
IterableEncoder(
114105
classTag[mutable.ArraySeq[_]],

sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType
2424
import org.apache.spark.{QueryContext, SparkArithmeticException, SparkBuildInfo, SparkDateTimeException, SparkException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
2525
import org.apache.spark.sql.catalyst.WalkedTypePath
2626
import org.apache.spark.sql.internal.SqlApiConf
27-
import org.apache.spark.sql.types.{DataType, DoubleType, StringType, UserDefinedType}
27+
import org.apache.spark.sql.types.{DataType, DoubleType, StringType}
2828
import org.apache.spark.unsafe.types.UTF8String
2929

3030
private[sql] trait ExecutionErrors extends DataTypeErrorsBase {
@@ -160,13 +160,6 @@ private[sql] trait ExecutionErrors extends DataTypeErrorsBase {
160160
messageParameters = Map("typeName" -> toSQLType(typeName)))
161161
}
162162

163-
def userDefinedTypeNotAnnotatedAndRegisteredError(udt: UserDefinedType[_]): Throwable = {
164-
new SparkException(
165-
errorClass = "_LEGACY_ERROR_TEMP_2155",
166-
messageParameters = Map("userClass" -> udt.userClass.getName),
167-
cause = null)
168-
}
169-
170163
def cannotFindEncoderForTypeError(typeName: String): SparkUnsupportedOperationException = {
171164
new SparkUnsupportedOperationException(
172165
errorClass = "ENCODER_NOT_FOUND",

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,26 @@ class ExamplePointUDT extends UserDefinedType[ExamplePoint] {
7575
private[spark] override def asNullable: ExamplePointUDT = this
7676
}
7777

78+
class ExamplePointNotAnnotated(val x: Double, val y: Double) extends Serializable {
79+
private val inner = new ExamplePoint(x, y)
80+
override def hashCode: Int = inner.hashCode
81+
override def equals(that: Any): Boolean = {
82+
that match {
83+
case e: ExamplePointNotAnnotated => inner.equals(e.inner)
84+
case _ => false
85+
}
86+
}
87+
}
88+
class ExamplePointNotAnnotatedUDT extends UserDefinedType[ExamplePointNotAnnotated] {
89+
override def sqlType: DataType = DoubleType
90+
override def serialize(p: ExamplePointNotAnnotated): Double = p.x
91+
override def deserialize(datum: Any): ExamplePointNotAnnotated = {
92+
val x = datum.asInstanceOf[Double]
93+
new ExamplePointNotAnnotated(x, 3.14 * datum.asInstanceOf[Double])
94+
}
95+
override def userClass: Class[ExamplePointNotAnnotated] = classOf[ExamplePointNotAnnotated]
96+
}
97+
7898
class RowEncoderSuite extends CodegenInterpretedPlanTest {
7999

80100
private val structOfString = new StructType().add("str", StringType)
@@ -111,7 +131,8 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
111131
.add("binary", BinaryType)
112132
.add("date", DateType)
113133
.add("timestamp", TimestampType)
114-
.add("udt", new ExamplePointUDT))
134+
.add("udt", new ExamplePointUDT)
135+
.add("udtNotAnnotated", new ExamplePointNotAnnotatedUDT))
115136

116137
encodeDecodeTest(
117138
new StructType()

0 commit comments

Comments
 (0)