Skip to content

Commit 2460893

Browse files
yaooqinndongjoon-hyun
authored andcommitted
[SPARK-52704][SQL][FOLLOWUP] Move session state utilities from trait FileFormat to SessionStateHelper
### What changes were proposed in this pull request? Move session state utilities from trait FileFormat to SessionStateHelper ### Why are the changes needed? To addresses comments from #51398 (review) ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #51411 from yaooqinn/SPARK-52704-F. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent c6e8645 commit 2460893

File tree

6 files changed

+63
-27
lines changed

6 files changed

+63
-27
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
2929
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
3030
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
3131
import org.apache.spark.sql.errors.QueryExecutionErrors
32-
import org.apache.spark.sql.internal.{SessionState, SQLConf}
32+
import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf}
3333
import org.apache.spark.sql.sources.Filter
3434
import org.apache.spark.sql.types._
3535

@@ -235,20 +235,6 @@ trait FileFormat {
235235
*/
236236
def fileConstantMetadataExtractors: Map[String, PartitionedFile => Any] =
237237
FileFormat.BASE_METADATA_EXTRACTORS
238-
239-
protected def sessionState(sparkSession: SparkSession): SessionState = {
240-
sparkSession.sessionState
241-
}
242-
243-
protected def sqlConf(sparkSession: SparkSession): SQLConf = {
244-
sessionState(sparkSession).conf
245-
}
246-
247-
protected def hadoopConf(
248-
sparkSession: SparkSession,
249-
options: Map[String, String]): Configuration = {
250-
sessionState(sparkSession).newHadoopConfWithOptions(options)
251-
}
252238
}
253239

254240
object FileFormat {
@@ -370,15 +356,15 @@ object FileFormat {
370356
/**
371357
* The base class file format that is based on text file.
372358
*/
373-
abstract class TextBasedFileFormat extends FileFormat {
359+
abstract class TextBasedFileFormat extends FileFormat with SessionStateHelper {
374360
private var codecFactory: CompressionCodecFactory = _
375361

376362
override def isSplitable(
377363
sparkSession: SparkSession,
378364
options: Map[String, String],
379365
path: Path): Boolean = {
380366
if (codecFactory == null) {
381-
codecFactory = new CompressionCodecFactory(hadoopConf(sparkSession, options))
367+
codecFactory = new CompressionCodecFactory(getHadoopConf(sparkSession, options))
382368
}
383369
val codec = codecFactory.getCodec(path)
384370
codec == null || codec.isInstanceOf[SplittableCompressionCodec]

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ case class CSVFileFormat() extends TextBasedFileFormat with DataSourceRegister {
168168
private def getCsvOptions(
169169
sparkSession: SparkSession,
170170
options: Map[String, String]): CSVOptions = {
171-
val conf = sqlConf(sparkSession)
171+
val conf = getSqlConf(sparkSession)
172172
new CSVOptions(
173173
options,
174174
conf.csvColumnPruning,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ case class JsonFileFormat() extends TextBasedFileFormat with DataSourceRegister
136136
spark: SparkSession,
137137
options: Map[String, String],
138138
inRead: Boolean = true): JSONOptions = {
139-
val conf = sqlConf(spark)
139+
val conf = getSqlConf(spark)
140140
if (inRead) {
141141
new JSONOptionsInRead(options, conf.sessionLocalTimeZone, conf.columnNameOfCorruptRecord)
142142
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, Expr
3030
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
3131
import org.apache.spark.sql.catalyst.plans.QueryPlan
3232
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
33-
import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics}
33+
import org.apache.spark.sql.connector.read._
3434
import org.apache.spark.sql.errors.QueryCompilationErrors
3535
import org.apache.spark.sql.execution.PartitionedFileUtil
3636
import org.apache.spark.sql.execution.datasources._
37-
import org.apache.spark.sql.internal.{SessionState, SQLConf}
37+
import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf}
3838
import org.apache.spark.sql.internal.connector.SupportsMetadata
3939
import org.apache.spark.sql.sources.Filter
4040
import org.apache.spark.sql.types.StructType
@@ -113,10 +113,7 @@ trait FileScan extends Scan
113113

114114
override def hashCode(): Int = getClass.hashCode()
115115

116-
override def conf: SQLConf = {
117-
val sessionState: SessionState = sparkSession.sessionState
118-
sessionState.conf
119-
}
116+
override def conf: SQLConf = SessionStateHelper.getSqlConf(sparkSession)
120117

121118
val maxMetadataValueLength = conf.maxMetadataStringLength
122119

@@ -177,7 +174,7 @@ trait FileScan extends Scan
177174
if (splitFiles.length == 1) {
178175
val path = splitFiles(0).toPath
179176
if (!isSplitable(path) && splitFiles(0).length >
180-
sparkSession.sparkContext.conf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
177+
SessionStateHelper.getSparkConf(sparkSession).get(IO_WARNING_LARGEFILETHRESHOLD)) {
181178
logWarning(log"Loading one large unsplittable file ${MDC(PATH, path.toString)} with only " +
182179
log"one partition, the reason is: ${MDC(REASON, getFileUnSplittableReason(path))}")
183180
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ case class XmlFileFormat() extends TextBasedFileFormat with DataSourceRegister {
4242
private def getXmlOptions(
4343
sparkSession: SparkSession,
4444
parameters: Map[String, String]): XmlOptions = {
45-
val conf = sqlConf(sparkSession)
45+
val conf = getSqlConf(sparkSession)
4646
new XmlOptions(parameters, conf.sessionLocalTimeZone, conf.columnNameOfCorruptRecord, true)
4747
}
4848

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.internal
19+
20+
import org.apache.hadoop.conf.Configuration
21+
22+
import org.apache.spark.{SparkConf, SparkContext}
23+
import org.apache.spark.sql.SparkSession
24+
25+
/**
26+
* Helper trait to access session state related configurations and utilities.
27+
* It also provides type annotations for IDEs to build indexes.
28+
*/
29+
trait SessionStateHelper {
30+
private def sessionState(sparkSession: SparkSession): SessionState = {
31+
sparkSession.sessionState
32+
}
33+
34+
private def sparkContext(sparkSession: SparkSession): SparkContext = {
35+
sparkSession.sparkContext
36+
}
37+
38+
def getSparkConf(sparkSession: SparkSession): SparkConf = {
39+
sparkContext(sparkSession).conf
40+
}
41+
42+
def getSqlConf(sparkSession: SparkSession): SQLConf = {
43+
sessionState(sparkSession).conf
44+
}
45+
46+
def getHadoopConf(
47+
sparkSession: SparkSession,
48+
options: Map[String, String]): Configuration = {
49+
sessionState(sparkSession).newHadoopConfWithOptions(options)
50+
}
51+
}
52+
53+
object SessionStateHelper extends SessionStateHelper

0 commit comments

Comments
 (0)