Skip to content

Commit a731b96

Browse files
authored
Add support for error message in EndStreamAction (#711)
1 parent e481b7a commit a731b96

File tree

5 files changed

+51
-4
lines changed

5 files changed

+51
-4
lines changed

client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.sql.SparkSession
4141
import io.delta.sharing.client.auth.{AuthConfig, AuthCredentialProviderFactory}
4242
import io.delta.sharing.client.model._
4343
import io.delta.sharing.client.util.{ConfUtils, JsonUtils, RetryUtils, UnexpectedHttpStatus}
44-
import io.delta.sharing.spark.MissingEndStreamActionException
44+
import io.delta.sharing.spark.{DeltaSharingServerException, MissingEndStreamActionException}
4545

4646
/** An interface to fetch Delta metadata from remote server. */
4747
trait DeltaSharingClient {
@@ -1363,6 +1363,10 @@ object DeltaSharingRestClient extends Logging {
13631363
logInfo(
13641364
s"Successfully verified endStreamAction in the response" + queryIdForLogging
13651365
)
1366+
if(lastEndStreamAction.errorMessage != null) {
1367+
throw new DeltaSharingServerException("Request failed during streaming response " +
1368+
s"with error message ${lastEndStreamAction.errorMessage}")
1369+
}
13661370
case Some(false) =>
13671371
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
13681372
s"header, but the server responded with the header set to false(" +

client/src/main/scala/io/delta/sharing/client/model.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ private[sharing] case class Protocol(minReaderVersion: Int) extends Action {
128128
private[sharing] case class EndStreamAction(
129129
refreshToken: String,
130130
nextPageToken: String,
131-
minUrlExpirationTimestamp: java.lang.Long)
131+
minUrlExpirationTimestamp: java.lang.Long,
132+
errorMessage: String = null)
132133
extends Action {
133134
override def wrap: SingleAction = SingleAction(endStreamAction = this)
134135
}

client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import org.apache.spark.sql.types.StructType
2020

2121
class MissingEndStreamActionException(message: String) extends IllegalStateException(message)
2222

23+
class DeltaSharingServerException(message: String) extends RuntimeException(message)
24+
2325
object DeltaSharingErrors {
2426
def nonExistentDeltaSharingTable(tableId: String): Throwable = {
2527
new IllegalStateException(s"Delta sharing table ${tableId} doesn't exist. " +

client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import io.delta.sharing.client.model.{
3535
}
3636
import io.delta.sharing.client.util.JsonUtils
3737
import io.delta.sharing.client.util.UnexpectedHttpStatus
38-
import io.delta.sharing.spark.MissingEndStreamActionException
38+
import io.delta.sharing.spark.{DeltaSharingServerException, MissingEndStreamActionException}
3939

4040
// scalastyle:off maxLineLength
4141
class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
@@ -1243,6 +1243,12 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
12431243
nextPageToken = "random-next",
12441244
minUrlExpirationTimestamp = 0
12451245
).wrap)
1246+
val fakeEndStreamActionErrorStr = JsonUtils.toJson(EndStreamAction(
1247+
refreshToken = null,
1248+
nextPageToken = null,
1249+
minUrlExpirationTimestamp = null,
1250+
errorMessage = "BAD REQUEST: Error Occurred During Streaming"
1251+
).wrap)
12461252

12471253
test("checkEndStreamAction succeeded") {
12481254
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true
@@ -1330,4 +1336,37 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
13301336
}
13311337
checkErrorMessage(e, s"and 0 lines, and last line as [Empty_Seq_in_checkEndStreamAction].")
13321338
}
1339+
1340+
test("checkEndStreamAction with error message throws streaming error") {
1341+
def checkErrorMessage(
1342+
e: DeltaSharingServerException,
1343+
additionalErrorMsg: String): Unit = {
1344+
val commonErrorMsg = "Request failed during streaming response with error message"
1345+
assert(e.getMessage.contains(commonErrorMsg))
1346+
1347+
assert(e.getMessage.contains(additionalErrorMsg))
1348+
}
1349+
1350+
// checkEndStreamAction throws error if the last line is EndStreamAction with error message
1351+
var e = intercept[DeltaSharingServerException] {
1352+
checkEndStreamAction(
1353+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1354+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1355+
Seq(fakeAddFileStr, fakeEndStreamActionErrorStr),
1356+
"random-query-id"
1357+
)
1358+
}
1359+
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming")
1360+
1361+
// checkEndStreamAction throws error if the only line is EndStreamAction with error message
1362+
e = intercept[DeltaSharingServerException] {
1363+
checkEndStreamAction(
1364+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1365+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1366+
Seq(fakeEndStreamActionErrorStr),
1367+
"random-query-id"
1368+
)
1369+
}
1370+
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming")
1371+
}
13331372
}

server/src/main/scala/io/delta/sharing/server/model.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ case class RemoveFile(
157157
case class EndStreamAction(
158158
refreshToken: String,
159159
nextPageToken: String,
160-
minUrlExpirationTimestamp: java.lang.Long
160+
minUrlExpirationTimestamp: java.lang.Long,
161+
endStreamAction: String = null
161162
) extends Action {
162163
override def wrap: SingleAction = SingleAction(endStreamAction = this)
163164
}

0 commit comments

Comments
 (0)