Skip to content

Commit c6ae93f

Browse files
authored
[Backport-0.7]: Parse error message in client in EndStreamAction (#717)
1 parent 37c8414 commit c6ae93f

File tree

5 files changed

+46
-3
lines changed

5 files changed

+46
-3
lines changed

server/src/main/scala/io/delta/sharing/server/model.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ case class RemoveFile(
132132
}
133133

134134
case class EndStreamAction(
135-
refreshToken: String
136-
) extends Action {
135+
refreshToken: String,
136+
errorMessage: String = null) extends Action {
137137
override def wrap: SingleAction = SingleAction(endStreamAction = this)
138138
}
139139

spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,10 @@ private[spark] object DeltaSharingRestClient extends Logging {
729729
logInfo(
730730
s"Successfully verified endStreamAction in the response" + queryIdForLogging
731731
)
732+
if(lastEndStreamAction.errorMessage != null) {
733+
throw new DeltaSharingServerException("Request failed during streaming response " +
734+
s"with error message ${lastEndStreamAction.errorMessage}")
735+
}
732736
case Some(false) =>
733737
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
734738
s"header, but the server responded with the header set to false(" +

spark/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import org.apache.spark.sql.types.StructType
2020

2121
class MissingEndStreamActionException(message: String) extends IllegalStateException(message)
2222

23+
class DeltaSharingServerException(message: String) extends RuntimeException(message)
24+
2325

2426
object DeltaSharingErrors {
2527
def nonExistentDeltaSharingTable(tableId: String): Throwable = {

spark/src/main/scala/io/delta/sharing/spark/model.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ private[sharing] case class Protocol(minReaderVersion: Int) extends Action {
115115
}
116116

117117
private[sharing] case class EndStreamAction(
118-
refreshToken: String)
118+
refreshToken: String,
119+
errorMessage: String = null)
119120
extends Action {
120121
override def wrap: SingleAction = SingleAction(endStreamAction = this)
121122
}

spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,10 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
886886
val fakeEndStreamActionStr = JsonUtils.toJson(EndStreamAction(
887887
refreshToken = "random-refresh"
888888
).wrap)
889+
val fakeEndStreamActionErrorStr = JsonUtils.toJson(EndStreamAction(
890+
refreshToken = null,
891+
errorMessage = "BAD REQUEST: Error Occurred During Streaming"
892+
).wrap)
889893

890894
test("checkEndStreamAction succeeded") {
891895
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true
@@ -973,4 +977,36 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
973977
}
974978
checkErrorMessage(e, s"and 0 lines, and last line as [Empty_Seq_in_checkEndStreamAction].")
975979
}
980+
981+
test("checkEndStreamAction with error message throws streaming error") {
982+
def checkErrorMessage(
983+
e: DeltaSharingServerException,
984+
additionalErrorMsg: String): Unit = {
985+
val commonErrorMsg = "Request failed during streaming response with error message"
986+
assert(e.getMessage.contains(commonErrorMsg))
987+
assert(e.getMessage.contains(additionalErrorMsg))
988+
}
989+
990+
// checkEndStreamAction throws error if the last line is EndStreamAction with error message
991+
var e = intercept[DeltaSharingServerException] {
992+
checkEndStreamAction(
993+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
994+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
995+
Seq(fakeAddFileStr, fakeEndStreamActionErrorStr),
996+
"random-query-id"
997+
)
998+
}
999+
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming")
1000+
1001+
// checkEndStreamAction throws error if the only line is EndStreamAction with error message
1002+
e = intercept[DeltaSharingServerException] {
1003+
checkEndStreamAction(
1004+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1005+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1006+
Seq(fakeEndStreamActionErrorStr),
1007+
"random-query-id"
1008+
)
1009+
}
1010+
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming")
1011+
}
9761012
}

0 commit comments

Comments
 (0)