Skip to content

Commit 836ea28

Browse files
authored
Improve flint error handling (#335) (#338)
1 parent 39cd360 commit 836ea28

File tree

7 files changed

+115
-68
lines changed

7 files changed

+115
-68
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
201201
libraryDependencies ++= deps(sparkVersion),
202202
libraryDependencies ++= Seq(
203203
"com.typesafe.play" %% "play-json" % "2.9.2",
204+
"com.amazonaws" % "aws-java-sdk-glue" % "1.12.568" % "provided"
205+
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
204206
// handle AmazonS3Exception
205207
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "provided"
206208
// the transitive jackson.core dependency conflicts with existing scala

flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ public final class MetricConstants {
2727
*/
2828
public static final String S3_ERR_CNT_METRIC = "s3.error.count";
2929

30+
/**
31+
* Metric name for counting the errors encountered with Amazon Glue operations.
32+
*/
33+
public static final String GLUE_ERR_CNT_METRIC = "glue.error.count";
34+
3035
/**
3136
* Metric name for counting the number of sessions currently running.
3237
*/

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ package org.apache.spark.sql
77

88
import java.util.Locale
99

10+
import com.amazonaws.services.glue.model.{AccessDeniedException, AWSGlueException}
1011
import com.amazonaws.services.s3.model.AmazonS3Exception
12+
import com.fasterxml.jackson.databind.ObjectMapper
13+
import com.fasterxml.jackson.module.scala.DefaultScalaModule
1114
import org.apache.commons.text.StringEscapeUtils.unescapeJava
1215
import org.opensearch.flint.core.IRestHighLevelClient
1316
import org.opensearch.flint.core.metrics.MetricConstants
@@ -17,12 +20,16 @@ import play.api.libs.json._
1720
import org.apache.spark.{SparkConf, SparkException}
1821
import org.apache.spark.internal.Logging
1922
import org.apache.spark.sql.catalyst.parser.ParseException
23+
import org.apache.spark.sql.flint.config.FlintSparkConf
2024
import org.apache.spark.sql.types._
2125
import org.apache.spark.sql.util._
2226

2327
trait FlintJobExecutor {
2428
this: Logging =>
2529

30+
val mapper = new ObjectMapper()
31+
mapper.registerModule(DefaultScalaModule)
32+
2633
var currentTimeProvider: TimeProvider = new RealTimeProvider()
2734
var threadPoolFactory: ThreadPoolFactory = new DefaultThreadPoolFactory()
2835
var envinromentProvider: EnvironmentProvider = new RealEnvironment()
@@ -64,6 +71,9 @@ trait FlintJobExecutor {
6471
"sessionId": {
6572
"type": "keyword"
6673
},
74+
"jobType": {
75+
"type": "keyword"
76+
},
6777
"updateTime": {
6878
"type": "date",
6979
"format": "strict_date_time||epoch_millis"
@@ -188,6 +198,7 @@ trait FlintJobExecutor {
188198
StructField("queryId", StringType, nullable = true),
189199
StructField("queryText", StringType, nullable = true),
190200
StructField("sessionId", StringType, nullable = true),
201+
StructField("jobType", StringType, nullable = true),
191202
// number is not nullable
192203
StructField("updateTime", LongType, nullable = false),
193204
StructField("queryRunTime", LongType, nullable = true)))
@@ -216,6 +227,7 @@ trait FlintJobExecutor {
216227
queryId,
217228
query,
218229
sessionId,
230+
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
219231
endTime,
220232
endTime - startTime))
221233

@@ -246,6 +258,7 @@ trait FlintJobExecutor {
246258
StructField("queryId", StringType, nullable = true),
247259
StructField("queryText", StringType, nullable = true),
248260
StructField("sessionId", StringType, nullable = true),
261+
StructField("jobType", StringType, nullable = true),
249262
// number is not nullable
250263
StructField("updateTime", LongType, nullable = false),
251264
StructField("queryRunTime", LongType, nullable = true)))
@@ -265,6 +278,7 @@ trait FlintJobExecutor {
265278
queryId,
266279
query,
267280
sessionId,
281+
spark.conf.get(FlintSparkConf.JOB_TYPE.key),
268282
endTime,
269283
endTime - startTime))
270284

@@ -328,7 +342,7 @@ trait FlintJobExecutor {
328342
val inputJson = Json.parse(input)
329343
val mappingJson = Json.parse(mapping)
330344

331-
compareJson(inputJson, mappingJson)
345+
compareJson(inputJson, mappingJson) || compareJson(mappingJson, inputJson)
332346
}
333347

334348
def checkAndCreateIndex(osClient: OSClient, resultIndex: String): Either[String, Unit] = {
@@ -409,68 +423,58 @@ trait FlintJobExecutor {
409423
private def handleQueryException(
410424
e: Exception,
411425
message: String,
412-
spark: SparkSession,
413-
dataSource: String,
414-
query: String,
415-
queryId: String,
416-
sessionId: String): String = {
417-
val error = s"$message: ${e.getMessage}"
418-
logError(error, e)
419-
error
426+
errorSource: Option[String] = None,
427+
statusCode: Option[Int] = None): String = {
428+
429+
val errorDetails = Map("Message" -> s"$message: ${e.getMessage}") ++
430+
errorSource.map("ErrorSource" -> _) ++
431+
statusCode.map(code => "StatusCode" -> code.toString)
432+
433+
val errorJson = mapper.writeValueAsString(errorDetails)
434+
logError(errorJson, e)
435+
errorJson
420436
}
421437

422438
def getRootCause(e: Throwable): Throwable = {
423439
if (e.getCause == null) e
424440
else getRootCause(e.getCause)
425441
}
426442

427-
def processQueryException(
428-
ex: Exception,
429-
spark: SparkSession,
430-
dataSource: String,
431-
query: String,
432-
queryId: String,
433-
sessionId: String): String = {
443+
/**
444+
* This method converts query exception into error string, which then persist to query result
445+
* metadata
446+
*/
447+
def processQueryException(ex: Exception): String = {
434448
getRootCause(ex) match {
435449
case r: ParseException =>
436-
handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId)
450+
handleQueryException(r, "Syntax error")
437451
case r: AmazonS3Exception =>
438452
incrementCounter(MetricConstants.S3_ERR_CNT_METRIC)
439453
handleQueryException(
440454
r,
441455
"Fail to read data from S3. Cause",
442-
spark,
443-
dataSource,
444-
query,
445-
queryId,
446-
sessionId)
447-
case r: AnalysisException =>
456+
Some(r.getServiceName),
457+
Some(r.getStatusCode))
458+
case r: AWSGlueException =>
459+
incrementCounter(MetricConstants.GLUE_ERR_CNT_METRIC)
460+
// Redact Access denied in AWS Glue service
461+
r match {
462+
case accessDenied: AccessDeniedException =>
463+
accessDenied.setErrorMessage(
464+
"Access denied in AWS Glue service. Please check permissions.")
465+
case _ => // No additional action for other types of AWSGlueException
466+
}
448467
handleQueryException(
449468
r,
450-
"Fail to analyze query. Cause",
451-
spark,
452-
dataSource,
453-
query,
454-
queryId,
455-
sessionId)
469+
"Fail to read data from Glue. Cause",
470+
Some(r.getServiceName),
471+
Some(r.getStatusCode))
472+
case r: AnalysisException =>
473+
handleQueryException(r, "Fail to analyze query. Cause")
456474
case r: SparkException =>
457-
handleQueryException(
458-
r,
459-
"Spark exception. Cause",
460-
spark,
461-
dataSource,
462-
query,
463-
queryId,
464-
sessionId)
475+
handleQueryException(r, "Spark exception. Cause")
465476
case r: Exception =>
466-
handleQueryException(
467-
r,
468-
"Fail to run query, cause",
469-
spark,
470-
dataSource,
471-
query,
472-
queryId,
473-
sessionId)
477+
handleQueryException(r, "Fail to run query. Cause")
474478
}
475479
}
476480
}

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -545,19 +545,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
545545
currentTimeProvider)
546546
}
547547

548-
def processQueryException(
549-
ex: Exception,
550-
spark: SparkSession,
551-
dataSource: String,
552-
flintCommand: FlintCommand,
553-
sessionId: String): String = {
554-
val error = super.processQueryException(
555-
ex,
556-
spark,
557-
dataSource,
558-
flintCommand.query,
559-
flintCommand.queryId,
560-
sessionId)
548+
def processQueryException(ex: Exception, flintCommand: FlintCommand): String = {
549+
val error = super.processQueryException(ex)
561550
flintCommand.fail()
562551
flintCommand.error = Some(error)
563552
error
@@ -724,7 +713,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
724713
sessionId,
725714
startTime)
726715
case e: Exception =>
727-
val error = processQueryException(e, spark, dataSource, flintCommand.query, "", "")
716+
val error = processQueryException(e, flintCommand)
728717
Some(
729718
handleCommandFailureAndGetFailedData(
730719
spark,

spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class JobOperator(
6666
dataToWrite = Some(
6767
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
6868
case e: Exception =>
69-
val error = processQueryException(e, spark, dataSource, query, "", "")
69+
val error = processQueryException(e)
7070
dataToWrite = Some(
7171
getFailedData(spark, dataSource, error, "", query, "", startTime, currentTimeProvider))
7272
} finally {

spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
package org.apache.spark.sql
77

88
import org.apache.spark.SparkFunSuite
9+
import org.apache.spark.sql.flint.config.FlintSparkConf
910
import org.apache.spark.sql.types._
1011
import org.apache.spark.sql.util.{CleanerFactory, MockTimeProvider}
1112

1213
class FlintJobTest extends SparkFunSuite with JobMatchers {
1314

1415
val spark =
1516
SparkSession.builder().appName("Test").master("local").getOrCreate()
16-
17+
spark.conf.set(FlintSparkConf.JOB_TYPE.key, "streaming")
1718
// Define input dataframe
1819
val inputSchema = StructType(
1920
Seq(
@@ -38,6 +39,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
3839
StructField("queryId", StringType, nullable = true),
3940
StructField("queryText", StringType, nullable = true),
4041
StructField("sessionId", StringType, nullable = true),
42+
StructField("jobType", StringType, nullable = true),
4143
StructField("updateTime", LongType, nullable = false),
4244
StructField("queryRunTime", LongType, nullable = false)))
4345

@@ -61,6 +63,7 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
6163
"10",
6264
"select 1",
6365
"20",
66+
"streaming",
6467
currentTime,
6568
queryRunTime))
6669
val expected: DataFrame =
@@ -82,20 +85,25 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
8285
}
8386

8487
test("test isSuperset") {
85-
// note in input false has enclosed double quotes, while mapping just has false
88+
// Note in input false has enclosed double quotes, while mapping just has false
8689
val input =
8790
"""{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"},
8891
|"applicationId":{"type":"keyword"},"jobRunId":{
8992
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"},
9093
|"error":{"type":"text"}}}
9194
|""".stripMargin
9295
val mapping =
93-
"""{"dynamic":false,"properties":{"result":{"type":"object"},"schema":{"type":"object"},
94-
|"jobRunId":{"type":"keyword"},"applicationId":{
95-
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"}}}
96+
"""{"dynamic":"false","properties":{"result":{"type":"object"},"schema":{"type":"object"}, "jobType":{"type": "keyword"},
97+
|"applicationId":{"type":"keyword"},"jobRunId":{
98+
|"type":"keyword"},"dataSourceName":{"type":"keyword"},"status":{"type":"keyword"},
9699
|"error":{"type":"text"}}}
97100
|""".stripMargin
101+
102+
// Assert that input is a superset of mapping
98103
assert(FlintJob.isSuperset(input, mapping))
104+
105+
// Assert that mapping is a superset of input
106+
assert(FlintJob.isSuperset(mapping, input))
99107
}
100108

101109
test("default streaming query maxExecutors is 10") {

0 commit comments

Comments
 (0)