Skip to content

Commit bf2c297

Browse files
authored
Add more checks and unit tests for checkEndStreamAction (#703)
1 parent f43d461 commit bf2c297

File tree

2 files changed

+155
-39
lines changed

2 files changed

+155
-39
lines changed

client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.time.format.DateTimeFormatter.{ISO_DATE, ISO_DATE_TIME}
2525
import java.util.UUID
2626

2727
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
28+
import scala.util.control.NonFatal
2829

2930
import org.apache.commons.io.IOUtils
3031
import org.apache.commons.io.input.BoundedInputStream
@@ -970,48 +971,10 @@ class DeltaSharingRestClient(
970971
)
971972
}
972973

973-
private def checkEndStreamAction(
974-
capabilities: Option[String],
975-
capabilitiesMap: Map[String, String],
976-
lines: Seq[String]): Unit = {
977-
val includeEndStreamActionHeader = getRespondedIncludeEndStreamActionHeader(capabilitiesMap)
978-
includeEndStreamActionHeader match {
979-
case Some(true) =>
980-
val lastLine = lines.last
981-
val lastLineAction = JsonUtils.fromJson[SingleAction](lastLine)
982-
if (lastLineAction.endStreamAction == null) {
983-
throw new MissingEndStreamActionException("Client sets " +
984-
s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true " + getDsQueryIdForLogging +
985-
s", server responded with the header set to true(${capabilities}) " +
986-
s"and ${lines.size} lines, and last line as [${lastLine}].")
987-
}
988-
logInfo(
989-
s"Successfully verified endStreamAction in the response" + getDsQueryIdForLogging
990-
)
991-
case Some(false) =>
992-
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
993-
s"header, but the server responded with the header set to false(" +
994-
s"${capabilities})," + getDsQueryIdForLogging
995-
)
996-
case None =>
997-
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" +
998-
s" header, but server didn't respond with the header(${capabilities})," +
999-
getDsQueryIdForLogging
1000-
)
1001-
}
1002-
}
1003-
1004974
private def getRespondedFormat(capabilitiesMap: Map[String, String]): String = {
1005975
capabilitiesMap.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET)
1006976
}
1007977

1008-
// includeEndStreamActionHeader indicates whether the last line is required to be an
1009-
// EndStreamAction, parsed from the response header.
1010-
private def getRespondedIncludeEndStreamActionHeader(
1011-
capabilitiesMap: Map[String, String]): Option[Boolean] = {
1012-
capabilitiesMap.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).map(_.toBoolean)
1013-
}
1014-
1015978
private def parseDeltaSharingCapabilities(capabilities: Option[String]): Map[String, String] = {
1016979
if (capabilities.isEmpty) {
1017980
return Map.empty[String, String]
@@ -1172,7 +1135,7 @@ class DeltaSharingRestClient(
11721135
).map(_.getValue)
11731136
val capabilitiesMap = parseDeltaSharingCapabilities(capabilities)
11741137
if (setIncludeEndStreamAction) {
1175-
checkEndStreamAction(capabilities, capabilitiesMap, lines)
1138+
checkEndStreamAction(capabilities, capabilitiesMap, lines, getDsQueryIdForLogging)
11761139
}
11771140
(
11781141
Option(
@@ -1307,6 +1270,55 @@ object DeltaSharingRestClient extends Logging {
13071270
)
13081271
}
13091272

1273+
private def tryParseEndStreamAction(line: String): EndStreamAction = {
1274+
try {
1275+
JsonUtils.fromJson[SingleAction](line).endStreamAction
1276+
} catch {
1277+
case NonFatal(_) =>
1278+
logError(s"Failed to parse last line in response as EndStreamAction:$line")
1279+
null
1280+
}
1281+
}
1282+
1283+
// includeEndStreamActionHeader indicates whether the last line is required to be an
1284+
// EndStreamAction, parsed from the response header.
1285+
private def getRespondedIncludeEndStreamActionHeader(
1286+
capabilitiesMap: Map[String, String]): Option[Boolean] = {
1287+
capabilitiesMap.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).map(_.toBoolean)
1288+
}
1289+
1290+
def checkEndStreamAction(
1291+
capabilities: Option[String],
1292+
capabilitiesMap: Map[String, String],
1293+
lines: Seq[String],
1294+
queryIdForLogging: String): Unit = {
1295+
val includeEndStreamActionHeader = getRespondedIncludeEndStreamActionHeader(capabilitiesMap)
1296+
includeEndStreamActionHeader match {
1297+
case Some(true) =>
1298+
val lastLine = lines.lastOption.getOrElse("Empty_Seq_in_checkEndStreamAction")
1299+
val lastEndStreamAction = tryParseEndStreamAction(lastLine)
1300+
if (lastEndStreamAction == null) {
1301+
throw new MissingEndStreamActionException("Client sets " +
1302+
s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true " + queryIdForLogging +
1303+
s", server responded with the header set to true(${capabilities}) " +
1304+
s"and ${lines.size} lines, and last line as [${lastLine}].")
1305+
}
1306+
logInfo(
1307+
s"Successfully verified endStreamAction in the response" + queryIdForLogging
1308+
)
1309+
case Some(false) =>
1310+
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
1311+
s"header, but the server responded with the header set to false(" +
1312+
s"${capabilities})," + queryIdForLogging
1313+
)
1314+
case None =>
1315+
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" +
1316+
s" header, but server didn't respond with the header(${capabilities}), " +
1317+
queryIdForLogging
1318+
)
1319+
}
1320+
}
1321+
13101322
def apply(
13111323
profileFile: String,
13121324
forStreaming: Boolean = false,

client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@ import io.delta.sharing.client.model.{
2626
AddFile,
2727
AddFileForCDF,
2828
DeltaTableFiles,
29+
EndStreamAction,
2930
Format,
3031
Metadata,
3132
Protocol,
3233
RemoveFile,
3334
Table
3435
}
36+
import io.delta.sharing.client.util.JsonUtils
3537
import io.delta.sharing.client.util.UnexpectedHttpStatus
38+
import io.delta.sharing.spark.MissingEndStreamActionException
3639

3740
// scalastyle:off maxLineLength
3841
class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
@@ -1220,4 +1223,105 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
12201223
}
12211224
}
12221225
}
1226+
1227+
val fakeAddFileStr = JsonUtils.toJson(AddFile(
1228+
url = "https://unused-url",
1229+
expirationTimestamp = 12345,
1230+
id = "random-id",
1231+
partitionValues = Map.empty,
1232+
size = 123,
1233+
stats = """{"numRecords":1,"minValues":{"age":"1"},"maxValues":{"age":"3"},"nullCount":{"age":0}}"""
1234+
).wrap)
1235+
val fakeEndStreamActionStr = JsonUtils.toJson(EndStreamAction(
1236+
refreshToken = "random-refresh",
1237+
nextPageToken = "random-next",
1238+
minUrlExpirationTimestamp = 0
1239+
).wrap)
1240+
1241+
test("checkEndStreamAction succeeded") {
1242+
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true
1243+
// Succeeded because the responded header is true, and EndStreamAction exists
1244+
checkEndStreamAction(
1245+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1246+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1247+
Seq(fakeAddFileStr, fakeEndStreamActionStr),
1248+
"random-query-id"
1249+
)
1250+
1251+
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION=false
1252+
// Succeeded even though the last line is not EndStreamAction
1253+
checkEndStreamAction(
1254+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=false"),
1255+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "false"),
1256+
Seq(fakeAddFileStr),
1257+
"random-query-id"
1258+
)
1259+
// Succeeded even though the lines are empty.
1260+
checkEndStreamAction(
1261+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=false"),
1262+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "false"),
1263+
Seq.empty,
1264+
"random-query-id"
1265+
)
1266+
// Succeeded even though the last line cannot be parsed.
1267+
checkEndStreamAction(
1268+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=false"),
1269+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "false"),
1270+
Seq("random-string-server-error"),
1271+
"random-query-id"
1272+
)
1273+
1274+
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION not in header
1275+
// Succeeded even though the last line is not EndStreamAction
1276+
checkEndStreamAction(None, Map.empty, Seq(fakeAddFileStr), "random-query-id")
1277+
// Succeeded even though the lines are empty.
1278+
checkEndStreamAction(None, Map.empty, Seq.empty, "random-query-id")
1279+
// Succeeded even though the last line cannot be parsed.
1280+
checkEndStreamAction(None, Map.empty, Seq("random-string-server-error"), "random-query-id")
1281+
}
1282+
1283+
test("checkEndStreamAction failed") {
1284+
def checkErrorMessage(
1285+
e: MissingEndStreamActionException,
1286+
additionalErrorMsg: String): Unit = {
1287+
val commonErrorMsg = "Client sets includeendstreamaction=true random-query-id, " +
1288+
"server responded with the header set to true(Some(includeendstreamaction=true)) and"
1289+
assert(e.getMessage.contains(commonErrorMsg))
1290+
1291+
assert(e.getMessage.contains(additionalErrorMsg))
1292+
}
1293+
1294+
// Failed because the last line is not EndStreamAction.
1295+
var e = intercept[MissingEndStreamActionException] {
1296+
checkEndStreamAction(
1297+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1298+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1299+
Seq(fakeAddFileStr),
1300+
"random-query-id"
1301+
)
1302+
}
1303+
checkErrorMessage(e, s"and 1 lines, and last line as [$fakeAddFileStr].")
1304+
1305+
// Failed because the last line cannot be parsed.
1306+
e = intercept[MissingEndStreamActionException] {
1307+
checkEndStreamAction(
1308+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1309+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1310+
Seq("random-string-server-error"),
1311+
"random-query-id"
1312+
)
1313+
}
1314+
checkErrorMessage(e, s"and 1 lines, and last line as [random-string-server-error].")
1315+
1316+
// Failed because the responded lines are empty.
1317+
e = intercept[MissingEndStreamActionException] {
1318+
checkEndStreamAction(
1319+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1320+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1321+
Seq.empty,
1322+
"random-query-id"
1323+
)
1324+
}
1325+
checkErrorMessage(e, s"and 0 lines, and last line as [Empty_Seq_in_checkEndStreamAction].")
1326+
}
12231327
}

0 commit comments

Comments
 (0)