Skip to content

Commit 45835cb

Browse files
authored
Improve flint error handling (#335)
* Improve flint error handling Signed-off-by: Louis Chu <clingzhi@amazon.com> * Error field in json format Signed-off-by: Louis Chu <clingzhi@amazon.com> * Superset update Signed-off-by: Louis Chu <clingzhi@amazon.com> --------- Signed-off-by: Louis Chu <clingzhi@amazon.com>
1 parent d9c0ba8 commit 45835cb

File tree

7 files changed

+115
-68
lines changed

7 files changed

+115
-68
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
208208
libraryDependencies ++= deps(sparkVersion),
209209
libraryDependencies ++= Seq(
210210
"com.typesafe.play" %% "play-json" % "2.9.2",
211+
"com.amazonaws" % "aws-java-sdk-glue" % "1.12.568" % "provided"
212+
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
211213
// handle AmazonS3Exception
212214
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "provided"
213215
// the transitive jackson.core dependency conflicts with existing scala

flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ public final class MetricConstants {
2727
*/
2828
public static final String S3_ERR_CNT_METRIC = "s3.error.count";
2929

30+
/**
31+
* Metric name for counting the errors encountered with Amazon Glue operations.
32+
*/
33+
public static final String GLUE_ERR_CNT_METRIC = "glue.error.count";
34+
3035
/**
3136
* Metric name for counting the number of sessions currently running.
3237
*/

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ package org.apache.spark.sql
77

88
import java.util.Locale
99

10+
import com.amazonaws.services.glue.model.{AccessDeniedException, AWSGlueException}
1011
import com.amazonaws.services.s3.model.AmazonS3Exception
12+
import com.fasterxml.jackson.databind.ObjectMapper
13+
import com.fasterxml.jackson.module.scala.DefaultScalaModule
1114
import org.apache.commons.text.StringEscapeUtils.unescapeJava
1215
import org.opensearch.flint.core.IRestHighLevelClient
1316
import org.opensearch.flint.core.metrics.MetricConstants
@@ -17,13 +20,17 @@ import play.api.libs.json._
1720
import org.apache.spark.{SparkConf, SparkException}
1821
import org.apache.spark.internal.Logging
1922
import org.apache.spark.sql.catalyst.parser.ParseException
23+
import org.apache.spark.sql.flint.config.FlintSparkConf
2024
import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
2125
import org.apache.spark.sql.types._
2226
import org.apache.spark.sql.util._
2327

2428
trait FlintJobExecutor {
2529
this: Logging =>
2630

31+
val mapper = new ObjectMapper()
32+
mapper.registerModule(DefaultScalaModule)
33+
2734
var currentTimeProvider: TimeProvider = new RealTimeProvider()
2835
var threadPoolFactory: ThreadPoolFactory = new DefaultThreadPoolFactory()
2936
var envinromentProvider: EnvironmentProvider = new RealEnvironment()
@@ -65,6 +72,9 @@ trait FlintJobExecutor {
6572
"sessionId": {
6673
"type": "keyword"
6774
},
75+
"jobType": {
76+
"type": "keyword"
77+
},
6878
"updateTime": {
6979
"type": "date",
7080
"format": "strict_date_time||epoch_millis"
@@ -190,6 +200,7 @@ trait FlintJobExecutor {
190200
StructField("queryId", StringType, nullable = true),
191201
StructField("queryText", StringType, nullable = true),
192202
StructField("sessionId", StringType, nullable = true),
203+
StructField("jobType", StringType, nullable = true),
193204
// number is not nullable
194205
StructField("updateTime", LongType, nullable = false),
195206
StructField("queryRunTime", LongType, nullable = true)))
@@ -218,6 +229,7 @@ trait FlintJobExecutor {
218229
queryId,
219230
query,
220231
sessionId,
232+
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
221233
endTime,
222234
endTime - startTime))
223235

@@ -248,6 +260,7 @@ trait FlintJobExecutor {
248260
StructField("queryId", StringType, nullable = true),
249261
StructField("queryText", StringType, nullable = true),
250262
StructField("sessionId", StringType, nullable = true),
263+
StructField("jobType", StringType, nullable = true),
251264
// number is not nullable
252265
StructField("updateTime", LongType, nullable = false),
253266
StructField("queryRunTime", LongType, nullable = true)))
@@ -267,6 +280,7 @@ trait FlintJobExecutor {
267280
queryId,
268281
query,
269282
sessionId,
283+
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
270284
endTime,
271285
endTime - startTime))
272286

@@ -330,7 +344,7 @@ trait FlintJobExecutor {
330344
val inputJson = Json.parse(input)
331345
val mappingJson = Json.parse(mapping)
332346

333-
compareJson(inputJson, mappingJson)
347+
compareJson(inputJson, mappingJson) || compareJson(mappingJson, inputJson)
334348
}
335349

336350
def checkAndCreateIndex(osClient: OSClient, resultIndex: String): Either[String, Unit] = {
@@ -411,68 +425,58 @@ trait FlintJobExecutor {
411425
private def handleQueryException(
412426
e: Exception,
413427
message: String,
414-
spark: SparkSession,
415-
dataSource: String,
416-
query: String,
417-
queryId: String,
418-
sessionId: String): String = {
419-
val error = s"$message: ${e.getMessage}"
420-
logError(error, e)
421-
error
428+
errorSource: Option[String] = None,
429+
statusCode: Option[Int] = None): String = {
430+
431+
val errorDetails = Map("Message" -> s"$message: ${e.getMessage}") ++
432+
errorSource.map("ErrorSource" -> _) ++
433+
statusCode.map(code => "StatusCode" -> code.toString)
434+
435+
val errorJson = mapper.writeValueAsString(errorDetails)
436+
logError(errorJson, e)
437+
errorJson
422438
}
423439

424440
def getRootCause(e: Throwable): Throwable = {
425441
if (e.getCause == null) e
426442
else getRootCause(e.getCause)
427443
}
428444

429-
def processQueryException(
430-
ex: Exception,
431-
spark: SparkSession,
432-
dataSource: String,
433-
query: String,
434-
queryId: String,
435-
sessionId: String): String = {
445+
/**
446+
* This method converts query exception into error string, which then persist to query result
447+
* metadata
448+
*/
449+
def processQueryException(ex: Exception): String = {
436450
getRootCause(ex) match {
437451
case r: ParseException =>
438-
handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId)
452+
handleQueryException(r, "Syntax error")
439453
case r: AmazonS3Exception =>
440454
incrementCounter(MetricConstants.S3_ERR_CNT_METRIC)
441455
handleQueryException(
442456
r,
443457
"Fail to read data from S3. Cause",
444-
spark,
445-
dataSource,
446-
query,
447-
queryId,
448-
sessionId)
449-
case r: AnalysisException =>
458+
Some(r.getServiceName),
459+
Some(r.getStatusCode))
460+
case r: AWSGlueException =>
461+
incrementCounter(MetricConstants.GLUE_ERR_CNT_METRIC)
462+
// Redact Access denied in AWS Glue service
463+
r match {
464+
case accessDenied: AccessDeniedException =>
465+
accessDenied.setErrorMessage(
466+
"Access denied in AWS Glue service. Please check permissions.")
467+
case _ => // No additional action for other types of AWSGlueException
468+
}
450469
handleQueryException(
451470
r,
452-
"Fail to analyze query. Cause",
453-
spark,
454-
dataSource,
455-
query,
456-
queryId,
457-
sessionId)
471+
"Fail to read data from Glue. Cause",
472+
Some(r.getServiceName),
473+
Some(r.getStatusCode))
474+
case r: AnalysisException =>
475+
handleQueryException(r, "Fail to analyze query. Cause")
458476
case r: SparkException =>
459-
handleQueryException(
460-
r,
461-
"Spark exception. Cause",
462-
spark,
463-
dataSource,
464-
query,
465-
queryId,
466-
sessionId)
477+
handleQueryException(r, "Spark exception. Cause")
467478
case r: Exception =>
468-
handleQueryException(
469-
r,
470-
"Fail to run query, cause",
471-
spark,
472-
dataSource,
473-
query,
474-
queryId,
475-
sessionId)
479+
handleQueryException(r, "Fail to run query. Cause")
476480
}
477481
}
478482
}

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -545,19 +545,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
545545
currentTimeProvider)
546546
}
547547

548-
def processQueryException(
549-
ex: Exception,
550-
spark: SparkSession,
551-
dataSource: String,
552-
flintCommand: FlintCommand,
553-
sessionId: String): String = {
554-
val error = super.processQueryException(
555-
ex,
556-
spark,
557-
dataSource,
558-
flintCommand.query,
559-
flintCommand.queryId,
560-
sessionId)
548+
def processQueryException(ex: Exception, flintCommand: FlintCommand): String = {
549+
val error = super.processQueryException(ex)
561550
flintCommand.fail()
562551
flintCommand.error = Some(error)
563552
error
@@ -724,7 +713,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
724713
sessionId,
725714
startTime)
726715
case e: Exception =>
727-
val error = processQueryException(e, spark, dataSource, flintCommand.query, "", "")
716+
val error = processQueryException(e, flintCommand)
728717
Some(
729718
handleCommandFailureAndGetFailedData(
730719
spark,

spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class JobOperator(
6666
dataToWrite = Some(
6767
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
6868
case e: Exception =>
69-
val error = processQueryException(e, spark, dataSource, query, "", "")
69+
val error = processQueryException(e)
7070
dataToWrite = Some(
7171
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
7272
} finally {

spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
package org.apache.spark.sql
77

88
import org.apache.spark.SparkFunSuite
9+
import org.apache.spark.sql.flint.config.FlintSparkConf
910
import org.apache.spark.sql.types._
1011
import org.apache.spark.sql.util.{CleanerFactory, MockTimeProvider}
1112

1213
class FlintJobTest extends SparkFunSuite with JobMatchers {
1314

1415
val spark =
1516
SparkSession.builder().appName("Test").master("local").getOrCreate()
16-
17+
spark.conf.set(FlintSparkConf.JOB_TYPE.key, "streaming")
1718
// Define input dataframe
1819
val inputSchema = StructType(
1920
Seq(
@@ -38,6 +39,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
3839
StructField("queryId", StringType, nullable = true),
3940
StructField("queryText", StringType, nullable = true),
4041
StructField("sessionId", StringType, nullable = true),
42+
StructField("jobType", StringType, nullable = true),
4143
StructField("updateTime", LongType, nullable = false),
4244
StructField("queryRunTime", LongType, nullable = false)))
4345

@@ -61,6 +63,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
6163
"10",
6264
"select 1",
6365
"20",
66+
"streaming",
6467
currentTime,
6568
queryRunTime))
6669
val expected: DataFrame =
@@ -82,20 +85,25 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
8285
}
8386

8487
test("test isSuperset") {
85-
// note in input false has enclosed double quotes, while mapping just has false
88+
// Note in input false has enclosed double quotes, while mapping just has false
8689
val input =
8790
"""{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"},
8891
|"applicationId":{"type":"keyword"},"jobRunId":{
8992
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"},
9093
|"error":{"type":"text"}}}
9194
|""".stripMargin
9295
val mapping =
93-
"""{"dynamic":false,"properties":{"result":{"type":"object"},"schema":{"type":"object"},
94-
|"jobRunId":{"type":"keyword"},"applicationId":{
95-
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}}
96+
"""{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"}, "jobType":{"type": "keyword"},
97+
|"applicationId":{"type":"keyword"},"jobRunId":{
98+
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"},
9699
|"error":{"type":"text"}}}
97100
|""".stripMargin
101+
102+
// Assert that input is a superset of mapping
98103
assert(FlintJob.isSuperset(input, mapping))
104+
105+
// Assert that mapping is a superset of input
106+
assert(FlintJob.isSuperset(mapping, input))
99107
}
100108

101109
test("default streaming query maxExecutors is 10") {

0 commit comments

Comments
 (0)