Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions common/utils/src/main/java/org/apache/spark/SparkThrowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,27 @@ default Map<String, String> getMessageParameters() {
return new HashMap<>();
}

/**
* Returns the default message template for this error.
*
* The template is a machine-readable string with placeholders
* to be filled by {@code getMessageParameters()}.
*
* This is the default template known to Spark, but clients are
* free to generate their own messages (e.g., translations,
* alternate formats) using the provided error metadata.
*
* @return the default message template for this error, or null if unavailable
*/
default String getDefaultMessageTemplate() {
try {
String cond = this.getCondition();
if (cond == null) return null;
return SparkThrowableHelper.getMessageTemplate(cond);
} catch (Throwable t) {
return null; // Unknown error condition
}
}

default QueryContext[] getQueryContext() { return new QueryContext[0]; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {

def getErrorMessage(errorClass: String, messageParameters: Map[String, Any]): String = {
val messageTemplate = getMessageTemplate(errorClass)
getErrorMessage(errorClass, messageTemplate, messageParameters)
}

def getErrorMessage(
errorClass: String,
messageTemplate: String,
messageParameters: Map[String, Any]): String = {
val sanitizedParameters = messageParameters.map {
case (key, null) => key -> "null"
case (key, value) => key -> value
Expand Down Expand Up @@ -71,6 +78,10 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {

def getMessageParameters(errorClass: String): Seq[String] = {
val messageTemplate = getMessageTemplate(errorClass)
getMessageParametersFromTemplate(messageTemplate)
}

def getMessageParametersFromTemplate(messageTemplate: String): Seq[String] = {
val matches = ErrorClassesJsonReader.TEMPLATE_REGEX.findAllIn(messageTemplate).toSeq
matches.map(m => m.stripSuffix(">").stripPrefix("<"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ private[spark] object SparkThrowableHelper {
// of this ticket: https://issues.apache.org/jira/browse/SPARK-47429
Seq(SparkClassUtils.getSparkClassLoader.getResource("error/error-conditions.json")))

// This method is only used to get default message template in SparkThrowable interface.
def getMessageTemplate(errorClass: String): String = {
errorReader.getMessageTemplate(errorClass)
}

def getMessage(
errorClass: String,
messageParameters: Map[String, String]): String = {
Expand All @@ -55,6 +60,26 @@ private[spark] object SparkThrowableHelper {
context: String): String = {
val displayMessage = errorReader.getErrorMessage(errorClass, messageParameters)
val sqlState = getSqlState(errorClass)
formatErrorMessage(errorClass, displayMessage, sqlState, context)
}

def getMessage(
errorClass: String,
sqlState: String,
messageTemplate: String,
messageParameters: Map[String, String]): String = {
val displayMessage = errorReader.getErrorMessage(
errorClass,
messageTemplate,
messageParameters)
formatErrorMessage(errorClass, displayMessage, sqlState, "")
}

def formatErrorMessage(
errorClass: String,
displayMessage: String,
sqlState: String,
context: String): String = {
val displaySqlState = if (sqlState == null) "" else s" SQLSTATE: $sqlState"
val displayQueryContext = (if (context.isEmpty) "" else "\n") + context
val prefix = if (errorClass.startsWith("_LEGACY_ERROR_")) "" else s"[$errorClass] "
Expand Down Expand Up @@ -106,7 +131,14 @@ private[spark] object SparkThrowableHelper {
g.writeStartObject()
g.writeStringField("errorClass", errorClass)
if (format == STANDARD) {
g.writeStringField("messageTemplate", errorReader.getMessageTemplate(errorClass))
val messageTemplate = e.getDefaultMessageTemplate
// This is required to properly handle null values when the messageTemplate
// is not available, ensuring correct JSON serialization of the field.
if (messageTemplate != null) {
g.writeStringField("messageTemplate", messageTemplate)
} else {
g.writeNullField("messageTemplate")
}
errorReader.getBreakingChangeInfo(errorClass).foreach { breakingChangeInfo =>
g.writeObjectFieldStart("breakingChangeInfo")
g.writeStringField("migrationMessage",
Expand Down
82 changes: 82 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.Files

import scala.jdk.CollectionConverters._
import scala.util.Properties.lineSeparator

import com.fasterxml.jackson.annotation.JsonInclude.Include
Expand Down Expand Up @@ -611,4 +612,85 @@ class SparkThrowableSuite extends SparkFunSuite {
)
)
}

test("getMessage uses custom getDefaultMessageTemplate from SparkThrowable") {
import ErrorMessageFormat._

// Create a custom throwable that overrides getDefaultMessageTemplate.
class CustomTemplatedThrowable extends Throwable with SparkThrowable {
override def getCondition: String = "DIVIDE_BY_ZERO"
override def getErrorClass: String = "DIVIDE_BY_ZERO"
override def getMessage: String = "Custom message"
override def getMessageParameters: java.util.Map[String, String] =
Map("config" -> "TEST_CONFIG").asJava
override def getDefaultMessageTemplate: String = "Custom template: Division by <config>"
}

val customThrowable = new CustomTemplatedThrowable

// Test STANDARD format uses the custom template.
val standardResult = SparkThrowableHelper.getMessage(customThrowable, STANDARD)
assert(standardResult.contains("Custom template: Division by <config>"))

// Test that it doesn't contain the default template from JSON.
assert(!standardResult.contains("Use `try_divide` to tolerate divisor being 0"))
}

test("getMessage falls back to JSON template when getDefaultMessageTemplate not overridden") {
import ErrorMessageFormat._

// Create a throwable that uses default getDefaultMessageTemplate implementation.
class ReadFromJSONThrowable extends Throwable with SparkThrowable {
override def getCondition: String = "DIVIDE_BY_ZERO"
override def getErrorClass: String = "DIVIDE_BY_ZERO"
override def getMessage: String = "Random message"
override def getMessageParameters: java.util.Map[String, String] =
Map("config" -> "TEST_CONFIG").asJava
}

val readFromJSONThrowable = new ReadFromJSONThrowable

// Test STANDARD format reads messageTemplate from JSON file.
val readFromJSONResult = SparkThrowableHelper.getMessage(readFromJSONThrowable, STANDARD)
assert(readFromJSONResult
.contains("\"messageTemplate\" : \"Division by zero. Use `try_divide` to tolerate divisor " +
"being 0 and return NULL instead. If necessary set <config> to \\\"false\\\" " +
"to bypass this error.\""))
}

test("getMessage writes null messageTemplate for non-existing error condition") {
import ErrorMessageFormat._

// Create a throwable with non-existing error condition.
class NonExistingConditionThrowable extends Throwable with SparkThrowable {
override def getCondition: String = "NON_EXISTING_ERROR_CONDITION"
override def getErrorClass: String = "NON_EXISTING_ERROR_CONDITION"
override def getMessage: String = "Non-existing error message"
override def getMessageParameters: java.util.Map[String, String] =
Map("param" -> "value").asJava
}

val nonExistingThrowable = new NonExistingConditionThrowable

// Test STANDARD format writes null messageTemplate when condition doesn't exist in JSON.
val standardResult = SparkThrowableHelper.getMessage(nonExistingThrowable, STANDARD)
assert(standardResult.contains("\"messageTemplate\" : null"))

// Verify it still contains the error class and other fields.
assert(standardResult.contains("\"errorClass\" : \"NON_EXISTING_ERROR_CONDITION\""))
assert(standardResult.contains("\"messageParameters\""))
}

test("getMessage with custom sqlState and messageTemplate") {
val errorClass = "TEST_CUSTOM_TEMPLATE"
val sqlState = "42S01"
val messageTemplate = "Custom error: <param1> occurred with <param2>"
val messageParameters = Map("param1" -> "something", "param2" -> "somewhere")

val result = getMessage(errorClass, sqlState, messageTemplate, messageParameters)

// Verify the message is formatted correctly.
assert(result == "[TEST_CUSTOM_TEMPLATE] Custom error: " +
"something occurred with somewhere SQLSTATE: 42S01")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,24 @@ class AnalysisException protected (
val cause: Option[Throwable] = None,
val errorClass: Option[String] = None,
val messageParameters: Map[String, String] = Map.empty,
val context: Array[QueryContext] = Array.empty)
val context: Array[QueryContext] = Array.empty,
val sqlState: Option[String] = None,
val messageTemplate: Option[String] = None)
extends Exception(message, cause.orNull)
with SparkThrowable
with Serializable
with WithOrigin {

def this(
message: String,
line: Option[Int],
startPosition: Option[Int],
cause: Option[Throwable],
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext]) =
this(message, line, startPosition, cause, errorClass, messageParameters, context, None, None)

def this(errorClass: String, messageParameters: Map[String, String], cause: Option[Throwable]) =
this(
SparkThrowableHelper.getMessage(errorClass, messageParameters),
Expand All @@ -61,6 +73,33 @@ class AnalysisException protected (
context = context,
cause = cause)

/**
* External constructor for callers that want to supply error fields directly, without requiring
* a local JSON definition for the error class.
*
* If `message` is provided (Some), it is used verbatim. Otherwise, the message is rendered from
* (errorClass, sqlState, messageTemplate, messageParameters).
*
* `messageTemplate` is always persisted into the exception so clients can read it via
* SparkThrowable.getDefaultMessageTemplate().
*/
def this(
errorClass: String,
sqlState: String,
messageTemplate: String,
messageParameters: Map[String, String],
cause: Option[Throwable],
message: Option[String]) =
this(
message = message.getOrElse(
SparkThrowableHelper
.getMessage(errorClass, sqlState, messageTemplate, messageParameters)),
cause = cause,
errorClass = Option(errorClass),
messageParameters = messageParameters,
sqlState = Option(sqlState),
messageTemplate = Option(messageTemplate))

def this(
errorClass: String,
messageParameters: Map[String, String],
Expand Down Expand Up @@ -99,22 +138,45 @@ class AnalysisException protected (
context = origin.getQueryContext,
cause = cause)

def copy(
message: String,
line: Option[Int],
startPosition: Option[Int],
cause: Option[Throwable],
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext]): AnalysisException =
new AnalysisException(
message,
line,
startPosition,
cause,
errorClass,
messageParameters,
context,
this.sqlState,
this.messageTemplate)

def copy(
message: String = this.message,
line: Option[Int] = this.line,
startPosition: Option[Int] = this.startPosition,
cause: Option[Throwable] = this.cause,
errorClass: Option[String] = this.errorClass,
messageParameters: Map[String, String] = this.messageParameters,
context: Array[QueryContext] = this.context): AnalysisException =
context: Array[QueryContext] = this.context,
sqlState: Option[String] = this.sqlState,
messageTemplate: Option[String] = this.messageTemplate): AnalysisException =
new AnalysisException(
message,
line,
startPosition,
cause,
errorClass,
messageParameters,
context)
context,
sqlState,
messageTemplate)

def withPosition(origin: Origin): AnalysisException = {
val newException = this.copy(
Expand All @@ -125,6 +187,11 @@ class AnalysisException protected (
newException
}

override def getDefaultMessageTemplate: String =
messageTemplate.getOrElse(super.getDefaultMessageTemplate)

override def getSqlState: String = sqlState.getOrElse(super.getSqlState)

override def getMessage: String = getSimpleMessage

// Outputs an exception without the logical plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,19 @@ class ExtendedAnalysisException private(
cause: Option[Throwable] = None,
errorClass: Option[String] = None,
messageParameters: Map[String, String] = Map.empty,
context: Array[QueryContext] = Array.empty)
context: Array[QueryContext] = Array.empty,
sqlState: Option[String] = None,
messageTemplate: Option[String] = None)
extends AnalysisException(
message,
line,
startPosition,
cause,
errorClass,
messageParameters,
context) {
context,
sqlState,
messageTemplate) {

def this(e: AnalysisException, plan: LogicalPlan) = {
this(
Expand All @@ -62,9 +66,11 @@ class ExtendedAnalysisException private(
cause: Option[Throwable],
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext]): ExtendedAnalysisException = {
context: Array[QueryContext],
sqlState: Option[String] = this.sqlState,
messageTemplate: Option[String] = this.messageTemplate): ExtendedAnalysisException = {
new ExtendedAnalysisException(message, line, startPosition, plan, cause, errorClass,
messageParameters, context)
messageParameters, context, sqlState, messageTemplate)
}

override def getMessage: String = {
Expand Down