Skip to content

Commit bd35b4c

Browse files
authored
Add more checks and unit tests for checkEndStreamAction (#693) (#702)
* Add more checks and unit tests for checkEndStreamAction * fix variable name
1 parent 50a9a08 commit bd35b4c

File tree

2 files changed

+155
-39
lines changed

2 files changed

+155
-39
lines changed

client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets.UTF_8
2222
import java.util.UUID
2323

2424
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
25+
import scala.util.control.NonFatal
2526

2627
import org.apache.commons.io.IOUtils
2728
import org.apache.commons.io.input.BoundedInputStream
@@ -1034,48 +1035,10 @@ class DeltaSharingRestClient(
10341035
response
10351036
}
10361037

1037-
private def checkEndStreamAction(
1038-
capabilities: Option[String],
1039-
capabilitiesMap: Map[String, String],
1040-
lines: Seq[String]): Unit = {
1041-
val includeEndStreamActionHeader = getRespondedIncludeEndStreamActionHeader(capabilitiesMap)
1042-
includeEndStreamActionHeader match {
1043-
case Some(true) =>
1044-
val lastLine = lines.last
1045-
val lastLineAction = JsonUtils.fromJson[SingleAction](lastLine)
1046-
if (lastLineAction.endStreamAction == null) {
1047-
throw new MissingEndStreamActionException("Client sets " +
1048-
s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true " + getDsQueryIdForLogging +
1049-
s", server responded with the header set to true(${capabilities}) " +
1050-
s"and ${lines.size} lines, and last line as [${lastLine}].")
1051-
}
1052-
logInfo(
1053-
s"Successfully verified endStreamAction in the response" + getDsQueryIdForLogging
1054-
)
1055-
case Some(false) =>
1056-
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
1057-
s"header, but the server responded with the header set to false(" +
1058-
s"${capabilities})," + getDsQueryIdForLogging
1059-
)
1060-
case None =>
1061-
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" +
1062-
s" header, but server didn't respond with the header(${capabilities}), " +
1063-
getDsQueryIdForLogging
1064-
)
1065-
}
1066-
}
1067-
10681038
private def getRespondedFormat(capabilitiesMap: Map[String, String]): String = {
10691039
capabilitiesMap.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET)
10701040
}
10711041

1072-
// includeEndStreamActionHeader indicates whether the last line is required to be an
1073-
// EndStreamAction, parsed from the response header.
1074-
private def getRespondedIncludeEndStreamActionHeader(
1075-
capabilitiesMap: Map[String, String]): Option[Boolean] = {
1076-
capabilitiesMap.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).map(_.toBoolean)
1077-
}
1078-
10791042
private def parseDeltaSharingCapabilities(capabilities: Option[String]): Map[String, String] = {
10801043
if (capabilities.isEmpty) {
10811044
return Map.empty[String, String]
@@ -1229,7 +1192,7 @@ class DeltaSharingRestClient(
12291192
).map(_.getValue)
12301193
val capabilitiesMap = parseDeltaSharingCapabilities(capabilities)
12311194
if (setIncludeEndStreamAction) {
1232-
checkEndStreamAction(capabilities, capabilitiesMap, lines)
1195+
checkEndStreamAction(capabilities, capabilitiesMap, lines, getDsQueryIdForLogging)
12331196
}
12341197
(
12351198
Option(
@@ -1364,6 +1327,55 @@ object DeltaSharingRestClient extends Logging {
13641327
)
13651328
}
13661329

1330+
private def tryParseEndStreamAction(line: String): EndStreamAction = {
1331+
try {
1332+
JsonUtils.fromJson[SingleAction](line).endStreamAction
1333+
} catch {
1334+
case NonFatal(_) =>
1335+
logError(s"Failed to parse last line in response as EndStreamAction:$line")
1336+
null
1337+
}
1338+
}
1339+
1340+
// includeEndStreamActionHeader indicates whether the last line is required to be an
1341+
// EndStreamAction, parsed from the response header.
1342+
private def getRespondedIncludeEndStreamActionHeader(
1343+
capabilitiesMap: Map[String, String]): Option[Boolean] = {
1344+
capabilitiesMap.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).map(_.toBoolean)
1345+
}
1346+
1347+
def checkEndStreamAction(
1348+
capabilities: Option[String],
1349+
capabilitiesMap: Map[String, String],
1350+
lines: Seq[String],
1351+
queryIdForLogging: String): Unit = {
1352+
val includeEndStreamActionHeader = getRespondedIncludeEndStreamActionHeader(capabilitiesMap)
1353+
includeEndStreamActionHeader match {
1354+
case Some(true) =>
1355+
val lastLine = lines.lastOption.getOrElse("Empty_Seq_in_checkEndStreamAction")
1356+
val lastEndStreamAction = tryParseEndStreamAction(lastLine)
1357+
if (lastEndStreamAction == null) {
1358+
throw new MissingEndStreamActionException("Client sets " +
1359+
s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true " + queryIdForLogging +
1360+
s", server responded with the header set to true(${capabilities}) " +
1361+
s"and ${lines.size} lines, and last line as [${lastLine}].")
1362+
}
1363+
logInfo(
1364+
s"Successfully verified endStreamAction in the response" + queryIdForLogging
1365+
)
1366+
case Some(false) =>
1367+
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
1368+
s"header, but the server responded with the header set to false(" +
1369+
s"${capabilities})," + queryIdForLogging
1370+
)
1371+
case None =>
1372+
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" +
1373+
s" header, but server didn't respond with the header(${capabilities}), " +
1374+
queryIdForLogging
1375+
)
1376+
}
1377+
}
1378+
13671379
def apply(
13681380
profileFile: String,
13691381
forStreaming: Boolean = false,

client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,16 @@ import io.delta.sharing.client.model.{
2626
AddFile,
2727
AddFileForCDF,
2828
DeltaTableFiles,
29+
EndStreamAction,
2930
Format,
3031
Metadata,
3132
Protocol,
3233
RemoveFile,
3334
Table
3435
}
36+
import io.delta.sharing.client.util.JsonUtils
3537
import io.delta.sharing.client.util.UnexpectedHttpStatus
38+
import io.delta.sharing.spark.MissingEndStreamActionException
3639

3740
// scalastyle:off maxLineLength
3841
class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
@@ -1226,4 +1229,105 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
12261229
}
12271230
}
12281231
}
1232+
1233+
val fakeAddFileStr = JsonUtils.toJson(AddFile(
1234+
url = "https://unused-url",
1235+
expirationTimestamp = 12345,
1236+
id = "random-id",
1237+
partitionValues = Map.empty,
1238+
size = 123,
1239+
stats = """{"numRecords":1,"minValues":{"age":"1"},"maxValues":{"age":"3"},"nullCount":{"age":0}}"""
1240+
).wrap)
1241+
val fakeEndStreamActionStr = JsonUtils.toJson(EndStreamAction(
1242+
refreshToken = "random-refresh",
1243+
nextPageToken = "random-next",
1244+
minUrlExpirationTimestamp = 0
1245+
).wrap)
1246+
1247+
test("checkEndStreamAction succeeded") {
1248+
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true
1249+
// Succeeded because the responded header is true, and EndStreamAction exists
1250+
checkEndStreamAction(
1251+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1252+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1253+
Seq(fakeAddFileStr, fakeEndStreamActionStr),
1254+
"random-query-id"
1255+
)
1256+
1257+
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION=false
1258+
// Succeeded even though the last line is not EndStreamAction
1259+
checkEndStreamAction(
1260+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=false"),
1261+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "false"),
1262+
Seq(fakeAddFileStr),
1263+
"random-query-id"
1264+
)
1265+
// Succeeded even though the lines are empty.
1266+
checkEndStreamAction(
1267+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=false"),
1268+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "false"),
1269+
Seq.empty,
1270+
"random-query-id"
1271+
)
1272+
// Succeeded even though the last line cannot be parsed.
1273+
checkEndStreamAction(
1274+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=false"),
1275+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "false"),
1276+
Seq("random-string-server-error"),
1277+
"random-query-id"
1278+
)
1279+
1280+
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION not in header
1281+
// Succeeded even though the last line is not EndStreamAction
1282+
checkEndStreamAction(None, Map.empty, Seq(fakeAddFileStr), "random-query-id")
1283+
// Succeeded even though the lines are empty.
1284+
checkEndStreamAction(None, Map.empty, Seq.empty, "random-query-id")
1285+
// Succeeded even though the last line cannot be parsed.
1286+
checkEndStreamAction(None, Map.empty, Seq("random-string-server-error"), "random-query-id")
1287+
}
1288+
1289+
test("checkEndStreamAction failed") {
1290+
def checkErrorMessage(
1291+
e: MissingEndStreamActionException,
1292+
additionalErrorMsg: String): Unit = {
1293+
val commonErrorMsg = "Client sets includeendstreamaction=true random-query-id, " +
1294+
"server responded with the header set to true(Some(includeendstreamaction=true)) and"
1295+
assert(e.getMessage.contains(commonErrorMsg))
1296+
1297+
assert(e.getMessage.contains(additionalErrorMsg))
1298+
}
1299+
1300+
// Failed because the last line is not EndStreamAction.
1301+
var e = intercept[MissingEndStreamActionException] {
1302+
checkEndStreamAction(
1303+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1304+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1305+
Seq(fakeAddFileStr),
1306+
"random-query-id"
1307+
)
1308+
}
1309+
checkErrorMessage(e, s"and 1 lines, and last line as [$fakeAddFileStr].")
1310+
1311+
// Failed because the last line cannot be parsed.
1312+
e = intercept[MissingEndStreamActionException] {
1313+
checkEndStreamAction(
1314+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1315+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1316+
Seq("random-string-server-error"),
1317+
"random-query-id"
1318+
)
1319+
}
1320+
checkErrorMessage(e, s"and 1 lines, and last line as [random-string-server-error].")
1321+
1322+
// Failed because the responded lines are empty.
1323+
e = intercept[MissingEndStreamActionException] {
1324+
checkEndStreamAction(
1325+
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
1326+
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
1327+
Seq.empty,
1328+
"random-query-id"
1329+
)
1330+
}
1331+
checkErrorMessage(e, s"and 0 lines, and last line as [Empty_Seq_in_checkEndStreamAction].")
1332+
}
12291333
}

0 commit comments

Comments
 (0)