Skip to content

Commit f8e1298

Browse files
authored
[Backport-1.1]: Parse Error Message in Client in EndStreamAction (#716)
1 parent 6ffe7a7 commit f8e1298

File tree

5 files changed

+51
-4
lines changed

5 files changed

+51
-4
lines changed

client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.apache.spark.sql.SparkSession
4343

4444
import io.delta.sharing.client.model._
4545
import io.delta.sharing.client.util.{ConfUtils, JsonUtils, RetryUtils, UnexpectedHttpStatus}
46-
import io.delta.sharing.spark.MissingEndStreamActionException
46+
import io.delta.sharing.spark.{DeltaSharingServerException, MissingEndStreamActionException}
4747

4848
/** An interface to fetch Delta metadata from remote server. */
4949
trait DeltaSharingClient {
@@ -1306,6 +1306,10 @@ object DeltaSharingRestClient extends Logging {
13061306
logInfo(
13071307
s"Successfully verified endStreamAction in the response" + queryIdForLogging
13081308
)
1309+
if(lastEndStreamAction.errorMessage != null) {
1310+
throw new DeltaSharingServerException("Request failed during streaming response " +
1311+
s"with error message ${lastEndStreamAction.errorMessage}")
1312+
}
13091313
case Some(false) =>
13101314
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
13111315
s"header, but the server responded with the header set to false(" +

client/src/main/scala/io/delta/sharing/client/model.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ private[sharing] case class Protocol(minReaderVersion: Int) extends Action {
128128
private[sharing] case class EndStreamAction(
129129
refreshToken: String,
130130
nextPageToken: String,
131-
minUrlExpirationTimestamp: java.lang.Long)
131+
minUrlExpirationTimestamp: java.lang.Long,
132+
errorMessage: String = null)
132133
extends Action {
133134
override def wrap: SingleAction = SingleAction(endStreamAction = this)
134135
}

client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import org.apache.spark.sql.types.StructType
2020

2121
class MissingEndStreamActionException(message: String) extends IllegalStateException(message)
2222

23+
class DeltaSharingServerException(message: String) extends RuntimeException(message)
24+
2325
object DeltaSharingErrors {
2426
def nonExistentDeltaSharingTable(tableId: String): Throwable = {
2527
new IllegalStateException(s"Delta sharing table ${tableId} doesn't exist. " +

client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import io.delta.sharing.client.model.{
3535
}
3636
import io.delta.sharing.client.util.JsonUtils
3737
import io.delta.sharing.client.util.UnexpectedHttpStatus
38-
import io.delta.sharing.spark.MissingEndStreamActionException
38+
import io.delta.sharing.spark.{DeltaSharingServerException, MissingEndStreamActionException}
3939

4040
// scalastyle:off maxLineLength
4141
class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
@@ -1237,6 +1237,12 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
12371237
nextPageToken = "random-next",
12381238
minUrlExpirationTimestamp = 0
12391239
).wrap)
1240+
val fakeEndStreamActionErrorStr = JsonUtils.toJson(EndStreamAction(
1241+
refreshToken = null,
1242+
nextPageToken = null,
1243+
minUrlExpirationTimestamp = null,
1244+
errorMessage = "BAD REQUEST: Error Occurred During Streaming"
1245+
).wrap)
12401246

12411247
test("checkEndStreamAction succeeded") {
12421248
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true
@@ -1324,4 +1330,37 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
13241330
}
13251331
checkErrorMessage(e, s"and 0 lines, and last line as [Empty_Seq_in_checkEndStreamAction].")
13261332
}
1333+
1334+
test("checkEndStreamAction with error message throws streaming error") {
1335+
def checkErrorMessage(
1336+
e: DeltaSharingServerException,
1337+
additionalErrorMsg: String): Unit = {
1338+
val commonErrorMsg = "Request failed during streaming response with error message"
1339+
assert(e.getMessage.contains(commonErrorMsg))
1340+
1341+
assert(e.getMessage.contains(additionalErrorMsg))
1342+
}
1343+
1344+
// checkEndStreamAction throws error if the last line is EndStreamAction with error message
1345+
var e = intercept[DeltaSharingServerException] {
1346+
checkEndStreamAction(
1347+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1348+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1349+
Seq(fakeAddFileStr, fakeEndStreamActionErrorStr),
1350+
"random-query-id"
1351+
)
1352+
}
1353+
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming")
1354+
1355+
// checkEndStreamAction throws error if the only line is EndStreamAction with error message
1356+
e = intercept[DeltaSharingServerException] {
1357+
checkEndStreamAction(
1358+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1359+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1360+
Seq(fakeEndStreamActionErrorStr),
1361+
"random-query-id"
1362+
)
1363+
}
1364+
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming")
1365+
}
13271366
}

server/src/main/scala/io/delta/sharing/server/model.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ case class RemoveFile(
154154
case class EndStreamAction(
155155
refreshToken: String,
156156
nextPageToken: String,
157-
minUrlExpirationTimestamp: java.lang.Long
157+
minUrlExpirationTimestamp: java.lang.Long,
158+
errorMessage: String = null
158159
) extends Action {
159160
override def wrap: SingleAction = SingleAction(endStreamAction = this)
160161
}

0 commit comments

Comments
 (0)