Skip to content

Commit de3d44d

Browse files
wecharyudongjoon-hyun
authored andcommitted
[SPARK-50137][SQL] Avoid fallback to Hive-incompatible ways when table creation fails by thrift exception
### What changes were proposed in this pull request? Enhance the datasource table creation, do not fallback to hive incompatible way if failure was caused by thrift exception. ### Why are the changes needed? Thrift exception is unrelated to hive compatibility of datasource table, so fallback is unnecessary in this case. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add a unit test: ```bash mvn test -Dtest=none -Dsuites="org.apache.spark.sql.hive.HiveExternalCatalogSuite SPARK-50137: Avoid fallback to Hive-incompatible ways on thrift exception" -pl :spark-hive_2.13 ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50985 from wecharyu/SPARK-50137_2. Authored-by: wecharyu <yuwq1996@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent e2f4f5b commit de3d44d

File tree

4 files changed

+47
-14
lines changed

4 files changed

+47
-14
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
417417
logInfo(message)
418418
saveTableIntoHive(table, ignoreIfExists)
419419
} catch {
420-
case NonFatal(e) =>
420+
case NonFatal(e) if !HiveUtils.causedByThrift(e) =>
421421
val warningMessage =
422422
log"Could not persist ${MDC(TABLE_NAME, table.identifier.quotedString)} in a Hive " +
423423
log"compatible way. Persisting it into Hive metastore in Spark SQL specific format."

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,4 +521,19 @@ private[spark] object HiveUtils extends Logging {
521521
case PATTERN_FOR_KEY_EQ_VAL(_, v) => FileUtils.unescapePathName(v)
522522
}
523523
}
524+
525+
/**
526+
* Determine if a Hive call exception is caused by thrift error.
527+
*/
528+
def causedByThrift(e: Throwable): Boolean = {
529+
var target = e
530+
while (target != null) {
531+
val msg = target.getMessage()
532+
if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
533+
return true
534+
}
535+
target = target.getCause()
536+
}
537+
false
538+
}
524539
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ private[hive] class HiveClientImpl(
236236
try {
237237
return f
238238
} catch {
239-
case e: Exception if causedByThrift(e) =>
239+
case e: Exception if HiveUtils.causedByThrift(e) =>
240240
caughtException = e
241241
logWarning(
242242
log"HiveClient got thrift exception, destroying client and retrying " +
@@ -251,18 +251,6 @@ private[hive] class HiveClientImpl(
251251
throw caughtException
252252
}
253253

254-
private def causedByThrift(e: Throwable): Boolean = {
255-
var target = e
256-
while (target != null) {
257-
val msg = target.getMessage()
258-
if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
259-
return true
260-
}
261-
target = target.getCause()
262-
}
263-
false
264-
}
265-
266254
private def client: Hive = {
267255
if (clientLoader.cachedHive != null) {
268256
clientLoader.cachedHive.asInstanceOf[Hive]

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive
1919

2020
import org.apache.hadoop.conf.Configuration
21+
import org.apache.logging.log4j.Level
2122

2223
import org.apache.spark.SparkConf
2324
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -241,4 +242,33 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
241242
val alteredTable = externalCatalog.getTable("db1", tableName)
242243
assert(DataTypeUtils.sameType(alteredTable.schema, newSchema))
243244
}
245+
246+
test("SPARK-50137: Avoid fallback to Hive-incompatible ways on thrift exception") {
247+
val hadoopConf = new Configuration()
248+
// Use an unavailable uri to mock client connection timeout.
249+
hadoopConf.set("hive.metastore.uris", "thrift://192.0.2.1:9083")
250+
hadoopConf.set("hive.metastore.client.connection.timeout", "1s")
251+
// Dummy HiveExternalCatalog to mock that the hive client is still available
252+
// when checking database and table.
253+
val catalog = new HiveExternalCatalog(new SparkConf, hadoopConf) {
254+
override def requireDbExists(db: String): Unit = {}
255+
override def tableExists(db: String, table: String): Boolean = false
256+
}
257+
val logAppender = new LogAppender()
258+
withLogAppender(logAppender, level = Some(Level.WARN)) {
259+
val table = CatalogTable(
260+
identifier = TableIdentifier("tbl", Some("default")),
261+
tableType = CatalogTableType.EXTERNAL,
262+
storage = storageFormat.copy(locationUri = Some(newUriForDatabase())),
263+
schema = new StructType()
264+
.add("col1", "string"),
265+
provider = Some("parquet"))
266+
intercept[Throwable] {
267+
catalog.createTable(table, ignoreIfExists = false)
268+
}
269+
}
270+
assert(!logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains(
271+
"Could not persist `default`.`tbl` in a Hive compatible way. " +
272+
"Persisting it into Hive metastore in Spark SQL specific format."))
273+
}
244274
}

0 commit comments

Comments
 (0)