Skip to content

Commit d6026d5

Browse files
littlegrasscaoSun Cao
and
Sun Cao
authored
Don't use JValue in validateSourceVersion (#713)
* Don't use JValue in validateSourceVersion * fix tests --------- Co-authored-by: Sun Cao <sun.cao+data@databricks.com>
1 parent f87297d commit d6026d5

File tree

3 files changed

+13
-37
lines changed

3 files changed

+13
-37
lines changed

client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ object DeltaSharingErrors {
4040
}
4141

4242
def unsupportedTableReaderVersion(supportedVersion: Long, tableVersion: Long): Throwable = {
43-
new IllegalStateException(s"The table reader version ${tableVersion} is larger than " +
44-
s"supported reader version $supportedVersion. Please upgrade to a new release."
43+
new IllegalStateException(s"The table reader version ${tableVersion} is not equal to " +
44+
s"supported reader version $supportedVersion."
4545
)
4646
}
4747

client/src/main/scala/io/delta/sharing/spark/DeltaSharingSourceOffset.scala

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ package io.delta.sharing.spark
1818

1919
// scalastyle:off import.ordering.noEmptyLine
2020
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
21-
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
22-
import org.json4s._
23-
import org.json4s.jackson.JsonMethods.parse
21+
import org.apache.spark.sql.execution.streaming.Offset
2422

2523
import io.delta.sharing.client.util.JsonUtils
2624

@@ -86,37 +84,19 @@ object DeltaSharingSourceOffset {
8684
offset match {
8785
case o: DeltaSharingSourceOffset => o
8886
case s =>
89-
validateSourceVersion(s.json)
9087
val o = JsonUtils.fromJson[DeltaSharingSourceOffset](s.json)
88+
validateSourceVersion(o)
9189
if (o.tableId != tableId) {
9290
throw DeltaSharingErrors.nonExistentDeltaSharingTable(o.tableId)
9391
}
9492
o
9593
}
9694
}
9795

98-
private def validateSourceVersion(json: String): Unit = {
99-
val parsedJson = parse(json)
100-
val versionOpt = jsonOption(parsedJson \ "sourceVersion").map {
101-
case i: JInt => i.num.longValue
102-
case other => throw DeltaSharingErrors.invalidSourceVersion(other.toString)
103-
}
104-
if (versionOpt.isEmpty) {
105-
throw DeltaSharingErrors.cannotFindSourceVersionException(json)
106-
}
107-
108-
val maxVersion = VERSION_1
109-
110-
if (versionOpt.get > maxVersion) {
111-
throw DeltaSharingErrors.unsupportedTableReaderVersion(maxVersion, versionOpt.get)
112-
}
113-
}
114-
115-
/** Return an option that translates JNothing to None */
116-
private def jsonOption(json: JValue): Option[JValue] = {
117-
json match {
118-
case JNothing => None
119-
case value: JValue => Some(value)
96+
private def validateSourceVersion(offset: DeltaSharingSourceOffset) = {
97+
// Only version 1 is supported for now.
98+
if (offset.sourceVersion != VERSION_1) {
99+
throw DeltaSharingErrors.unsupportedTableReaderVersion(VERSION_1, offset.sourceVersion)
120100
}
121101
}
122102

spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceOffsetSuite.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,10 @@ package io.delta.sharing.spark
1818

1919
import java.util.UUID
2020

21+
import com.fasterxml.jackson.databind.exc.InvalidFormatException
2122
import org.apache.spark.sql.QueryTest
22-
import org.apache.spark.sql.SparkSession
23-
import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles
2423
import org.apache.spark.sql.execution.streaming.SerializedOffset
25-
import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQueryException, Trigger}
2624
import org.apache.spark.sql.test.SharedSparkSession
27-
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
28-
import org.scalatest.time.SpanSugar._
2925

3026
class DeltaSharingSourceOffsetSuite extends QueryTest
3127
with SharedSparkSession with DeltaSharingIntegrationTest {
@@ -48,7 +44,7 @@ class DeltaSharingSourceOffsetSuite extends QueryTest
4844
val e = intercept[IllegalStateException] {
4945
DeltaSharingSourceOffset(UUID.randomUUID().toString, SerializedOffset(json))
5046
}
51-
assert(e.getMessage.contains("Please upgrade to a new release"))
47+
assert(e.getMessage.contains("is not equal to supported reader version"))
5248
}
5349

5450
test("DeltaSharingSourceOffset sourceVersion - invalid value") {
@@ -61,10 +57,10 @@ class DeltaSharingSourceOffsetSuite extends QueryTest
6157
| "isStartingVersion": true
6258
|}
6359
""".stripMargin
64-
val e = intercept[IllegalStateException] {
60+
val e = intercept[InvalidFormatException] {
6561
DeltaSharingSourceOffset(UUID.randomUUID().toString, SerializedOffset(json))
6662
}
67-
for (msg <- Seq("foo", "invalid")) {
63+
for (msg <- Seq("foo", "not a valid")) {
6864
assert(e.getMessage.contains(msg))
6965
}
7066
}
@@ -81,7 +77,7 @@ class DeltaSharingSourceOffsetSuite extends QueryTest
8177
val e = intercept[IllegalStateException] {
8278
DeltaSharingSourceOffset(UUID.randomUUID().toString, SerializedOffset(json))
8379
}
84-
for (msg <- Seq("Cannot find", "sourceVersion")) {
80+
for (msg <- Seq("The table reader version", "is not equal to")) {
8581
assert(e.getMessage.contains(msg))
8682
}
8783
}

0 commit comments

Comments
 (0)