Skip to content

Commit 422dae7

Browse files
authored
Apply new logging format to record exceptions (#314)
Signed-off-by: Louis Chu <clingzhi@amazon.com>
1 parent 45835cb commit 422dae7

File tree

5 files changed

+78
-64
lines changed

5 files changed

+78
-64
lines changed

flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.opensearch.client.indices.PutMappingRequest;
2929
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
3030
import org.opensearch.client.RequestOptions;
31+
import org.opensearch.flint.core.logging.CustomLogging;
32+
import org.opensearch.flint.core.logging.OperationMessage;
3133
import org.opensearch.flint.core.metrics.MetricsUtil;
3234

3335
import java.io.Closeable;
@@ -90,7 +92,11 @@ static void recordOperationSuccess(String metricNamePrefix) {
9092
static void recordOperationFailure(String metricNamePrefix, Exception e) {
9193
OpenSearchException openSearchException = extractOpenSearchException(e);
9294
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;
93-
95+
if (openSearchException != null) {
96+
CustomLogging.logError(new OperationMessage("OpenSearch Operation failed.", statusCode), openSearchException);
97+
} else {
98+
CustomLogging.logError("OpenSearch Operation failed with an exception.", e);
99+
}
94100
if (statusCode == 403) {
95101
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
96102
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
@@ -104,15 +110,16 @@ static void recordOperationFailure(String metricNamePrefix, Exception e) {
104110
* Extracts an OpenSearchException from the given Throwable.
105111
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
106112
*
107-
* @param ex the exception to be checked
113+
* @param e the exception to be checked
108114
* @return the extracted OpenSearchException, or null if not found
109115
*/
110-
private static OpenSearchException extractOpenSearchException(Throwable ex) {
111-
if (ex instanceof OpenSearchException) {
112-
return (OpenSearchException) ex;
113-
} else if (ex.getCause() instanceof OpenSearchException) {
114-
return (OpenSearchException) ex.getCause();
116+
static OpenSearchException extractOpenSearchException(Throwable e) {
117+
if (e instanceof OpenSearchException) {
118+
return (OpenSearchException) e;
119+
} else if (e.getCause() == null) {
120+
return null;
121+
} else {
122+
return extractOpenSearchException(e.getCause());
115123
}
116-
return null;
117124
}
118125
}

flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ public class CustomLogging {
4444
private static final Map<String, BiConsumer<String, Throwable>> logLevelActions = new HashMap<>();
4545

4646
static {
47-
String[] parts = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN).split(":");
47+
DOMAIN_NAME = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN);
48+
String[] parts = DOMAIN_NAME.split(":");
4849
CLIENT_ID = parts.length == 2 ? parts[0] : UNKNOWN;
49-
DOMAIN_NAME = parts.length == 2 ? parts[1] : UNKNOWN;
5050

5151
logLevelActions.put("DEBUG", logger::debug);
5252
logLevelActions.put("INFO", logger::info);
@@ -78,10 +78,6 @@ private static String convertToJson(Map<String, Object> logEventMap) {
7878
* @return A map representation of the log event.
7979
*/
8080
protected static Map<String, Object> constructLogEventMap(String level, Object content, Throwable throwable) {
81-
if (content == null) {
82-
throw new IllegalArgumentException("Log message must not be null");
83-
}
84-
8581
Map<String, Object> logEventMap = new LinkedHashMap<>();
8682
Map<String, Object> body = new LinkedHashMap<>();
8783
constructMessageBody(content, body);
@@ -105,6 +101,11 @@ protected static Map<String, Object> constructLogEventMap(String level, Object c
105101
}
106102

107103
private static void constructMessageBody(Object content, Map<String, Object> body) {
104+
if (content == null) {
105+
body.put("message", "");
106+
return;
107+
}
108+
108109
if (content instanceof Message) {
109110
Message message = (Message) content;
110111
body.put("message", message.getFormattedMessage());
@@ -151,6 +152,10 @@ public static void logError(Object message) {
151152
log("ERROR", message, null);
152153
}
153154

155+
public static void logError(Throwable throwable) {
156+
log("ERROR", "", throwable);
157+
}
158+
154159
public static void logError(Object message, Throwable throwable) {
155160
log("ERROR", message, throwable);
156161
}

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package org.apache.spark.sql
88

99
import java.util.concurrent.atomic.AtomicInteger
1010

11+
import org.opensearch.flint.core.logging.CustomLogging
1112
import org.opensearch.flint.core.metrics.MetricConstants
1213
import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge
1314
import play.api.libs.json._
@@ -28,28 +29,17 @@ import org.apache.spark.sql.types._
2829
*/
2930
object FlintJob extends Logging with FlintJobExecutor {
3031
def main(args: Array[String]): Unit = {
31-
val (queryOption, resultIndex) = args.length match {
32-
case 1 =>
33-
(None, args(0)) // Starting from OS 2.13, resultIndex is the only argument
34-
case 2 =>
35-
(
36-
Some(args(0)),
37-
args(1)
38-
) // Before OS 2.13, there are two arguments, the second one is resultIndex
39-
case _ =>
40-
throw new IllegalArgumentException(
41-
"Unsupported number of arguments. Expected 1 or 2 arguments.")
42-
}
32+
val (queryOption, resultIndex) = parseArgs(args)
4333

4434
val conf = createSparkConf()
4535
val jobType = conf.get("spark.flint.job.type", "batch")
46-
logInfo(s"""Job type is: ${jobType}""")
36+
CustomLogging.logInfo(s"""Job type is: ${jobType}""")
4737
conf.set(FlintSparkConf.JOB_TYPE.key, jobType)
4838

4939
val dataSource = conf.get("spark.flint.datasource.name", "")
5040
val query = queryOption.getOrElse(unescapeQuery(conf.get(FlintSparkConf.QUERY.key, "")))
5141
if (query.isEmpty) {
52-
throw new IllegalArgumentException(s"Query undefined for the ${jobType} job.")
42+
logAndThrow(s"Query undefined for the ${jobType} job.")
5343
}
5444
// https://github.com/opensearch-project/opensearch-spark/issues/138
5545
/*

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
1313
import com.fasterxml.jackson.module.scala.DefaultScalaModule
1414
import org.apache.commons.text.StringEscapeUtils.unescapeJava
1515
import org.opensearch.flint.core.IRestHighLevelClient
16+
import org.opensearch.flint.core.logging.{CustomLogging, OperationMessage}
1617
import org.opensearch.flint.core.metrics.MetricConstants
1718
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
1819
import play.api.libs.json._
@@ -433,7 +434,11 @@ trait FlintJobExecutor {
433434
statusCode.map(code => "StatusCode" -> code.toString)
434435

435436
val errorJson = mapper.writeValueAsString(errorDetails)
436-
logError(errorJson, e)
437+
438+
statusCode.foreach { code =>
439+
CustomLogging.logError(new OperationMessage("", code), e)
440+
}
441+
437442
errorJson
438443
}
439444

@@ -479,4 +484,23 @@ trait FlintJobExecutor {
479484
handleQueryException(r, "Fail to run query. Cause")
480485
}
481486
}
487+
488+
def parseArgs(args: Array[String]): (Option[String], String) = {
489+
args match {
490+
case Array(resultIndex) =>
491+
(None, resultIndex) // Starting from OS 2.13, resultIndex is the only argument
492+
case Array(query, resultIndex) =>
493+
(
494+
Some(query),
495+
resultIndex
496+
) // Before OS 2.13, there are two arguments, the second one is resultIndex
497+
case _ => logAndThrow("Unsupported number of arguments. Expected 1 or 2 arguments.")
498+
}
499+
}
500+
501+
def logAndThrow(errorMessage: String): Nothing = {
502+
val t = new IllegalArgumentException(errorMessage)
503+
CustomLogging.logError(t)
504+
throw t
505+
}
482506
}

spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.opensearch.common.Strings
2121
import org.opensearch.flint.app.{FlintCommand, FlintInstance}
2222
import org.opensearch.flint.app.FlintInstance.formats
2323
import org.opensearch.flint.core.FlintOptions
24+
import org.opensearch.flint.core.logging.CustomLogging
2425
import org.opensearch.flint.core.metrics.MetricConstants
2526
import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer}
2627
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}
@@ -67,7 +68,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
6768
val (queryOption, resultIndex) = parseArgs(args)
6869

6970
if (Strings.isNullOrEmpty(resultIndex)) {
70-
throw new IllegalArgumentException("resultIndex is not set")
71+
logAndThrow("resultIndex is not set")
7172
}
7273

7374
// init SparkContext
@@ -84,7 +85,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
8485
conf.set("spark.sql.defaultCatalog", dataSource)
8586

8687
val jobType = conf.get(FlintSparkConf.JOB_TYPE.key, FlintSparkConf.JOB_TYPE.defaultValue.get)
87-
logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""")
88+
CustomLogging.logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""")
8889
conf.set(FlintSparkConf.JOB_TYPE.key, jobType)
8990

9091
val query = getQuery(queryOption, jobType, conf)
@@ -109,10 +110,10 @@ object FlintREPL extends Logging with FlintJobExecutor {
109110
val sessionId: Option[String] = Option(conf.get(FlintSparkConf.SESSION_ID.key, null))
110111

111112
if (sessionIndex.isEmpty) {
112-
throw new IllegalArgumentException(FlintSparkConf.REQUEST_INDEX.key + " is not set")
113+
logAndThrow(FlintSparkConf.REQUEST_INDEX.key + " is not set")
113114
}
114115
if (sessionId.isEmpty) {
115-
throw new IllegalArgumentException(FlintSparkConf.SESSION_ID.key + " is not set")
116+
logAndThrow(FlintSparkConf.SESSION_ID.key + " is not set")
116117
}
117118

118119
val spark = createSparkSession(conf)
@@ -238,27 +239,12 @@ object FlintREPL extends Logging with FlintJobExecutor {
238239
}
239240
}
240241

241-
def parseArgs(args: Array[String]): (Option[String], String) = {
242-
args.length match {
243-
case 1 =>
244-
(None, args(0)) // Starting from OS 2.13, resultIndex is the only argument
245-
case 2 =>
246-
(
247-
Some(args(0)),
248-
args(1)
249-
) // Before OS 2.13, there are two arguments, the second one is resultIndex
250-
case _ =>
251-
throw new IllegalArgumentException(
252-
"Unsupported number of arguments. Expected 1 or 2 arguments.")
253-
}
254-
}
255-
256242
def getQuery(queryOption: Option[String], jobType: String, conf: SparkConf): String = {
257243
queryOption.getOrElse {
258244
if (jobType.equalsIgnoreCase("streaming")) {
259245
val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "")
260246
if (defaultQuery.isEmpty) {
261-
throw new IllegalArgumentException("Query undefined for the streaming job.")
247+
logAndThrow("Query undefined for the streaming job.")
262248
}
263249
unescapeQuery(defaultQuery)
264250
} else ""
@@ -456,7 +442,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
456442
sessionIndex: String,
457443
sessionTimerContext: Timer.Context): Unit = {
458444
val error = s"Session error: ${e.getMessage}"
459-
logError(error, e)
445+
CustomLogging.logError(error, e)
460446

461447
val flintInstance = getExistingFlintInstance(osClient, sessionIndex, sessionId)
462448
.getOrElse(createFailedFlintInstance(applicationId, jobId, sessionId, jobStartTime, error))
@@ -476,7 +462,9 @@ object FlintREPL extends Logging with FlintJobExecutor {
476462
Option(getResponse.getSourceAsMap)
477463
.map(FlintInstance.deserializeFromMap)
478464
case Failure(exception) =>
479-
logError(s"Failed to retrieve existing FlintInstance: ${exception.getMessage}", exception)
465+
CustomLogging.logError(
466+
s"Failed to retrieve existing FlintInstance: ${exception.getMessage}",
467+
exception)
480468
None
481469
case _ => None
482470
}
@@ -645,7 +633,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
645633
// or invalid catalog (e.g., we are operating on data not defined in provided data source)
646634
case e: Exception =>
647635
val error = s"""Fail to write result of ${flintCommand}, cause: ${e.getMessage}"""
648-
logError(error, e)
636+
CustomLogging.logError(error, e)
649637
flintCommand.fail()
650638
updateSessionIndex(flintCommand, flintSessionIndexUpdater)
651639
recordStatementStateChange(flintCommand, statementTimerContext)
@@ -672,7 +660,6 @@ object FlintREPL extends Logging with FlintJobExecutor {
672660
* actions that require the computation of results that need to be collected or stored.
673661
*/
674662
spark.sparkContext.cancelJobGroup(flintCommand.queryId)
675-
logError(error)
676663
Some(
677664
handleCommandFailureAndGetFailedData(
678665
spark,
@@ -705,13 +692,9 @@ object FlintREPL extends Logging with FlintJobExecutor {
705692
queryWaitTimeMillis))
706693
} catch {
707694
case e: TimeoutException =>
708-
handleCommandTimeout(
709-
spark,
710-
dataSource,
711-
s"Executing ${flintCommand.query} timed out",
712-
flintCommand,
713-
sessionId,
714-
startTime)
695+
val error = s"Executing ${flintCommand.query} timed out"
696+
CustomLogging.logError(error, e)
697+
handleCommandTimeout(spark, dataSource, error, flintCommand, sessionId, startTime)
715698
case e: Exception =>
716699
val error = processQueryException(e, flintCommand)
717700
Some(
@@ -769,10 +752,12 @@ object FlintREPL extends Logging with FlintJobExecutor {
769752
} catch {
770753
case e: TimeoutException =>
771754
val error = s"Getting the mapping of index $resultIndex timed out"
755+
CustomLogging.logError(error, e)
772756
dataToWrite =
773757
handleCommandTimeout(spark, dataSource, error, flintCommand, sessionId, startTime)
774758
case NonFatal(e) =>
775759
val error = s"An unexpected error occurred: ${e.getMessage}"
760+
CustomLogging.logError(error, e)
776761
dataToWrite = Some(
777762
handleCommandFailureAndGetFailedData(
778763
spark,
@@ -1003,13 +988,13 @@ object FlintREPL extends Logging with FlintJobExecutor {
1003988
case ie: InterruptedException =>
1004989
// Preserve the interrupt status
1005990
Thread.currentThread().interrupt()
1006-
logError("HeartBeatUpdater task was interrupted", ie)
991+
CustomLogging.logError("HeartBeatUpdater task was interrupted", ie)
1007992
incrementCounter(
1008993
MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC
1009994
) // Record heartbeat failure metric
1010995
// maybe due to invalid sequence number or primary term
1011996
case e: Exception =>
1012-
logWarning(
997+
CustomLogging.logWarning(
1013998
s"""Fail to update the last update time of the flint instance ${sessionId}""",
1014999
e)
10151000
incrementCounter(
@@ -1069,7 +1054,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
10691054
} catch {
10701055
// still proceed since we are not sure what happened (e.g., OpenSearch cluster may be unresponsive)
10711056
case e: Exception =>
1072-
logError(s"""Fail to find id ${sessionId} from session index.""", e)
1057+
CustomLogging.logError(s"""Fail to find id ${sessionId} from session index.""", e)
10731058
true
10741059
}
10751060
}
@@ -1114,10 +1099,13 @@ object FlintREPL extends Logging with FlintJobExecutor {
11141099
if e.getCause != null && e.getCause.isInstanceOf[ConnectException] =>
11151100
retries += 1
11161101
val delay = initialDelay * math.pow(2, retries - 1).toLong
1117-
logError(s"Fail to connect to OpenSearch cluster. Retrying in $delay...", e)
1102+
CustomLogging.logError(
1103+
s"Fail to connect to OpenSearch cluster. Retrying in $delay...",
1104+
e)
11181105
Thread.sleep(delay.toMillis)
11191106

11201107
case e: Exception =>
1108+
CustomLogging.logError(e)
11211109
throw e
11221110
}
11231111
}

0 commit comments

Comments
 (0)