Skip to content

Commit a1d55d7

Browse files
cxzl25amuraru
authored andcommitted
[SPARK-49386][CORE][SQL] Add memory based thresholds for shuffle spill
Original author: amuraru ### What changes were proposed in this pull request? This PR aims to support add memory based thresholds for shuffle spill. Introduce configuration - spark.shuffle.spill.maxRecordsSizeForSpillThreshold - spark.sql.windowExec.buffer.spill.size.threshold - spark.sql.sessionWindow.buffer.spill.size.threshold - spark.sql.sortMergeJoinExec.buffer.spill.size.threshold - spark.sql.cartesianProductExec.buffer.spill.size.threshold ### Why are the changes needed? #24618 We can only determine the number of spills by configuring `spark.shuffle.spill.numElementsForceSpillThreshold`. In some scenarios, the size of a row may be very large in the memory. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA Verified in the production environment, the task time is shortened, the number of spill disks is reduced, there is a better chance to compress the shuffle data, and the size of the spill to disk is also significantly reduced. **Current** <img width="1281" alt="image" src="https://github.com/user-attachments/assets/b6e172b8-0da8-4b60-b456-024880d0987e"> ``` 24/08/19 07:02:54,947 [Executor task launch worker for task 0.0 in stage 53.0 (TID 1393)] INFO ShuffleExternalSorter: Thread 126 spilling sort data of 62.0 MiB to disk (11490 times so far) 24/08/19 07:02:55,029 [Executor task launch worker for task 0.0 in stage 53.0 (TID 1393)] INFO ShuffleExternalSorter: Thread 126 spilling sort data of 62.0 MiB to disk (11491 times so far) 24/08/19 07:02:55,093 [Executor task launch worker for task 0.0 in stage 53.0 (TID 1393)] INFO ShuffleExternalSorter: Thread 126 spilling sort data of 62.0 MiB to disk (11492 times so far) 24/08/19 07:08:59,894 [Executor task launch worker for task 0.0 in stage 53.0 (TID 1393)] INFO Executor: Finished task 0.0 in stage 53.0 (TID 1393). 7409 bytes result sent to driver ``` **PR** <img width="1294" alt="image" src="https://github.com/user-attachments/assets/aedb83a4-c8a1-4ac9-a805-55ba44ebfc9e"> ### Was this patch authored or co-authored using generative AI tooling? No Closes #47856 from cxzl25/SPARK-27734. Lead-authored-by: sychen <sychen@ctrip.com> Co-authored-by: Adi Muraru <amuraru@adobe.com> Signed-off-by: attilapiros <piros.attila.zsolt@gmail.com>
1 parent eb5af45 commit a1d55d7

File tree

27 files changed

+238
-46
lines changed

27 files changed

+238
-46
lines changed

common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ private[spark] object LogKeys {
487487
case object NUM_DRIVERS extends LogKey
488488
case object NUM_DROPPED_PARTITIONS extends LogKey
489489
case object NUM_EFFECTIVE_RULE_OF_RUNS extends LogKey
490+
case object NUM_ELEMENTS_SPILL_RECORDS extends LogKey
490491
case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey
491492
case object NUM_EVENTS extends LogKey
492493
case object NUM_EXAMPLES extends LogKey
@@ -768,6 +769,8 @@ private[spark] object LogKeys {
768769
case object SPARK_REPO_URL extends LogKey
769770
case object SPARK_REVISION extends LogKey
770771
case object SPARK_VERSION extends LogKey
772+
case object SPILL_RECORDS_SIZE extends LogKey
773+
case object SPILL_RECORDS_SIZE_THRESHOLD extends LogKey
771774
case object SPILL_TIMES extends LogKey
772775
case object SQL_TEXT extends LogKey
773776
case object SRC_PATH extends LogKey

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck
8989
*/
9090
private final int numElementsForSpillThreshold;
9191

92+
/**
93+
* Force this sorter to spill when the size in memory is beyond this threshold.
94+
*/
95+
private final long recordsSizeForSpillThreshold;
96+
9297
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
9398
private final int fileBufferSizeBytes;
9499

@@ -112,6 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck
112117
@Nullable private ShuffleInMemorySorter inMemSorter;
113118
@Nullable private MemoryBlock currentPage = null;
114119
private long pageCursor = -1;
120+
private long inMemRecordsSize = 0;
115121

116122
// Checksum calculator for each partition. Empty when shuffle checksum disabled.
117123
private final Checksum[] partitionChecksums;
@@ -136,6 +142,8 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck
136142
(int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
137143
this.numElementsForSpillThreshold =
138144
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
145+
this.recordsSizeForSpillThreshold =
146+
(long) conf.get(package$.MODULE$.SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD());
139147
this.writeMetrics = writeMetrics;
140148
this.inMemSorter = new ShuffleInMemorySorter(
141149
this, initialSize, (boolean) conf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()));
@@ -338,6 +346,7 @@ private long freeMemory() {
338346
allocatedPages.clear();
339347
currentPage = null;
340348
pageCursor = 0;
349+
inMemRecordsSize = 0;
341350
return memoryFreed;
342351
}
343352

@@ -417,12 +426,17 @@ private void acquireNewPageIfNecessary(int required) {
417426
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
418427
throws IOException {
419428

420-
// for tests
421429
assert(inMemSorter != null);
422430
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
423-
logger.info("Spilling data because number of spilledRecords crossed the threshold {}" +
431+
logger.info("Spilling data because number of spilledRecords ({}) crossed the threshold {}",
432+
MDC.of(LogKeys.NUM_ELEMENTS_SPILL_RECORDS$.MODULE$, inMemSorter.numRecords()),
424433
MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, numElementsForSpillThreshold));
425434
spill();
435+
} else if (inMemRecordsSize >= recordsSizeForSpillThreshold) {
436+
logger.info("Spilling data because size of spilledRecords ({}) crossed the size threshold {}",
437+
MDC.of(LogKeys.SPILL_RECORDS_SIZE$.MODULE$, inMemRecordsSize),
438+
MDC.of(LogKeys.SPILL_RECORDS_SIZE_THRESHOLD$.MODULE$, recordsSizeForSpillThreshold));
439+
spill();
426440
}
427441

428442
growPointerArrayIfNecessary();
@@ -439,6 +453,7 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
439453
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
440454
pageCursor += length;
441455
inMemSorter.insertRecord(recordAddress, partitionId);
456+
inMemRecordsSize += required;
442457
}
443458

444459
/**

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
8080
*/
8181
private final int numElementsForSpillThreshold;
8282

83+
/**
84+
* Force this sorter to spill when the size in memory is beyond this threshold.
85+
*/
86+
private final long recordsSizeForSpillThreshold;
87+
8388
/**
8489
* Memory pages that hold the records being sorted. The pages in this list are freed when
8590
* spilling, although in principle we could recycle these pages across spills (on the other hand,
@@ -92,6 +97,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
9297

9398
// These variables are reset after spilling:
9499
@Nullable private volatile UnsafeInMemorySorter inMemSorter;
100+
private long inMemRecordsSize = 0;
95101

96102
private MemoryBlock currentPage = null;
97103
private long pageCursor = -1;
@@ -110,11 +116,13 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
110116
int initialSize,
111117
long pageSizeBytes,
112118
int numElementsForSpillThreshold,
119+
long recordsSizeForSpillThreshold,
113120
UnsafeInMemorySorter inMemorySorter,
114121
long existingMemoryConsumption) throws IOException {
115122
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
116123
serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize,
117-
pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */);
124+
pageSizeBytes, numElementsForSpillThreshold, recordsSizeForSpillThreshold,
125+
inMemorySorter, false /* ignored */);
118126
sorter.spill(Long.MAX_VALUE, sorter);
119127
taskContext.taskMetrics().incMemoryBytesSpilled(existingMemoryConsumption);
120128
sorter.totalSpillBytes += existingMemoryConsumption;
@@ -133,10 +141,11 @@ public static UnsafeExternalSorter create(
133141
int initialSize,
134142
long pageSizeBytes,
135143
int numElementsForSpillThreshold,
144+
long recordsSizeForSpillThreshold,
136145
boolean canUseRadixSort) {
137146
return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager,
138147
taskContext, recordComparatorSupplier, prefixComparator, initialSize, pageSizeBytes,
139-
numElementsForSpillThreshold, null, canUseRadixSort);
148+
numElementsForSpillThreshold, recordsSizeForSpillThreshold, null, canUseRadixSort);
140149
}
141150

142151
private UnsafeExternalSorter(
@@ -149,6 +158,7 @@ private UnsafeExternalSorter(
149158
int initialSize,
150159
long pageSizeBytes,
151160
int numElementsForSpillThreshold,
161+
long recordsSizeForSpillThreshold,
152162
@Nullable UnsafeInMemorySorter existingInMemorySorter,
153163
boolean canUseRadixSort) {
154164
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
@@ -178,6 +188,7 @@ private UnsafeExternalSorter(
178188
this.inMemSorter = existingInMemorySorter;
179189
}
180190
this.peakMemoryUsedBytes = getMemoryUsage();
191+
this.recordsSizeForSpillThreshold = recordsSizeForSpillThreshold;
181192
this.numElementsForSpillThreshold = numElementsForSpillThreshold;
182193

183194
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
@@ -238,6 +249,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
238249
// pages will currently be counted as memory spilled even though that space isn't actually
239250
// written to disk. This also counts the space needed to store the sorter's pointer array.
240251
inMemSorter.freeMemory();
252+
inMemRecordsSize = 0;
241253
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
242254
// records. Otherwise, if the task is over allocated memory, then without freeing the memory
243255
// pages, we might not be able to get memory for the pointer array.
@@ -480,9 +492,15 @@ public void insertRecord(
480492

481493
assert(inMemSorter != null);
482494
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
483-
logger.info("Spilling data because number of spilledRecords crossed the threshold {}",
495+
logger.info("Spilling data because number of spilledRecords ({}) crossed the threshold {}",
496+
MDC.of(LogKeys.NUM_ELEMENTS_SPILL_RECORDS$.MODULE$, inMemSorter.numRecords()),
484497
MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, numElementsForSpillThreshold));
485498
spill();
499+
} else if (inMemRecordsSize >= recordsSizeForSpillThreshold) {
500+
logger.info("Spilling data because size of spilledRecords ({}) crossed the size threshold {}",
501+
MDC.of(LogKeys.SPILL_RECORDS_SIZE$.MODULE$, inMemRecordsSize),
502+
MDC.of(LogKeys.SPILL_RECORDS_SIZE_THRESHOLD$.MODULE$, recordsSizeForSpillThreshold));
503+
spill();
486504
}
487505

488506
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
@@ -497,6 +515,7 @@ public void insertRecord(
497515
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
498516
pageCursor += length;
499517
inMemSorter.insertRecord(recordAddress, prefix, prefixIsNull);
518+
inMemRecordsSize += required;
500519
}
501520

502521
/**

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,6 +1596,18 @@ package object config {
15961596
.intConf
15971597
.createWithDefault(Integer.MAX_VALUE)
15981598

1599+
private[spark] val SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD =
1600+
ConfigBuilder("spark.shuffle.spill.maxRecordsSizeForSpillThreshold")
1601+
.internal()
1602+
.doc("The maximum size in memory before forcing the shuffle sorter to spill. " +
1603+
"By default it is Long.MAX_VALUE, which means we never force the sorter to spill, " +
1604+
"until we reach some limitations, like the max page size limitation for the pointer " +
1605+
"array in the sorter.")
1606+
.version("4.1.0")
1607+
.bytesConf(ByteUnit.BYTE)
1608+
.checkValue(v => v > 0, "The threshold should be positive.")
1609+
.createWithDefault(Long.MaxValue)
1610+
15991611
private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
16001612
ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
16011613
.internal()

core/src/main/scala/org/apache/spark/util/collection/Spillable.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
5858
private[this] val numElementsForceSpillThreshold: Int =
5959
SparkEnv.get.conf.get(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD)
6060

61+
// Force this collection to spill when its size is greater than this threshold
62+
private[this] val maxSizeForceSpillThreshold: Long =
63+
SparkEnv.get.conf.get(SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD)
64+
6165
// Threshold for this collection's size in bytes before we start tracking its memory usage
6266
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
6367
@volatile private[this] var myMemoryThreshold = initialMemoryThreshold
@@ -80,21 +84,25 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
8084
* @return true if `collection` was spilled to disk; false otherwise
8185
*/
8286
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
83-
var shouldSpill = false
84-
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
87+
val shouldSpill = if (_elementsRead > numElementsForceSpillThreshold
88+
|| currentMemory > maxSizeForceSpillThreshold) {
89+
// Check number of elements or memory usage limits, whichever is hit first
90+
true
91+
} else if (_elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
8592
// Claim up to double our current memory from the shuffle memory pool
8693
val amountToRequest = 2 * currentMemory - myMemoryThreshold
8794
val granted = acquireMemory(amountToRequest)
8895
myMemoryThreshold += granted
8996
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
9097
// or we already had more memory than myMemoryThreshold), spill the current collection
91-
shouldSpill = currentMemory >= myMemoryThreshold
98+
currentMemory >= myMemoryThreshold
99+
} else {
100+
false
92101
}
93-
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
94102
// Actually spill
95103
if (shouldSpill) {
96104
_spillCount += 1
97-
logSpillage(currentMemory)
105+
logSpillage(currentMemory, _elementsRead)
98106
spill(collection)
99107
_elementsRead = 0
100108
_memoryBytesSpilled += currentMemory
@@ -140,12 +148,14 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
140148
* Prints a standard log message detailing spillage.
141149
*
142150
* @param size number of bytes spilled
151+
* @param elements number of elements read from input since last spill
143152
*/
144-
@inline private def logSpillage(size: Long): Unit = {
153+
@inline private def logSpillage(size: Long, elements: Int): Unit = {
145154
val threadId = Thread.currentThread().getId
146155
logInfo(log"Thread ${MDC(LogKeys.THREAD_ID, threadId)} " +
147156
log"spilling in-memory map of ${MDC(LogKeys.BYTE_SIZE,
148-
org.apache.spark.util.Utils.bytesToString(size))} to disk " +
157+
org.apache.spark.util.Utils.bytesToString(size))} " +
158+
log"(elements: ${MDC(LogKeys.NUM_ELEMENTS_SPILL_RECORDS, elements)}) to disk " +
149159
log"(${MDC(LogKeys.NUM_SPILLS, _spillCount)} times so far)")
150160
}
151161
}

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,13 @@ public int compare(
8787
private final long pageSizeBytes = conf.getSizeAsBytes(
8888
package$.MODULE$.BUFFER_PAGESIZE().key(), "4m");
8989

90-
private final int spillThreshold =
90+
private final int spillElementsThreshold =
9191
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
9292

93+
private final long spillSizeThreshold =
94+
(long) conf.get(package$.MODULE$.SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD());
95+
96+
9397
@BeforeEach
9498
public void setUp() throws Exception {
9599
MockitoAnnotations.openMocks(this).close();
@@ -163,7 +167,8 @@ private UnsafeExternalSorter newSorter() throws IOException {
163167
prefixComparator,
164168
/* initialSize */ 1024,
165169
pageSizeBytes,
166-
spillThreshold,
170+
spillElementsThreshold,
171+
spillSizeThreshold,
167172
shouldUseRadixSort());
168173
}
169174

@@ -453,7 +458,8 @@ public void forcedSpillingWithoutComparator() throws Exception {
453458
null,
454459
/* initialSize */ 1024,
455460
pageSizeBytes,
456-
spillThreshold,
461+
spillElementsThreshold,
462+
spillSizeThreshold,
457463
shouldUseRadixSort());
458464
long[] record = new long[100];
459465
int recordSize = record.length * 8;
@@ -515,7 +521,8 @@ public void testPeakMemoryUsed() throws Exception {
515521
prefixComparator,
516522
1024,
517523
pageSizeBytes,
518-
spillThreshold,
524+
spillElementsThreshold,
525+
spillSizeThreshold,
519526
shouldUseRadixSort());
520527

521528
// Peak memory should be monotonically increasing. More specifically, every time

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3366,6 +3366,13 @@ object SQLConf {
33663366
.intConf
33673367
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
33683368

3369+
val WINDOW_EXEC_BUFFER_SIZE_SPILL_THRESHOLD =
3370+
buildConf("spark.sql.windowExec.buffer.spill.size.threshold")
3371+
.internal()
3372+
.doc("Threshold for size of rows to be spilled by window operator")
3373+
.version("4.1.0")
3374+
.fallbackConf(SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD)
3375+
33693376
val WINDOW_GROUP_LIMIT_THRESHOLD =
33703377
buildConf("spark.sql.optimizer.windowGroupLimitThreshold")
33713378
.internal()
@@ -3387,6 +3394,15 @@ object SQLConf {
33873394
.intConf
33883395
.createWithDefault(4096)
33893396

3397+
val SESSION_WINDOW_BUFFER_SPILL_SIZE_THRESHOLD =
3398+
buildConf("spark.sql.sessionWindow.buffer.spill.size.threshold")
3399+
.internal()
3400+
.doc("Threshold for size of rows to be spilled by window operator. Note that " +
3401+
"the buffer is used only for the query Spark cannot apply aggregations on determining " +
3402+
"session window.")
3403+
.version("4.1.0")
3404+
.fallbackConf(SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD)
3405+
33903406
val SESSION_WINDOW_BUFFER_SPILL_THRESHOLD =
33913407
buildConf("spark.sql.sessionWindow.buffer.spill.threshold")
33923408
.internal()
@@ -3430,6 +3446,13 @@ object SQLConf {
34303446
.intConf
34313447
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
34323448

3449+
val SORT_MERGE_JOIN_EXEC_BUFFER_SIZE_SPILL_THRESHOLD =
3450+
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.size.threshold")
3451+
.internal()
3452+
.doc("Threshold for size of rows to be spilled by sort merge join operator")
3453+
.version("4.1.0")
3454+
.fallbackConf(SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD)
3455+
34333456
val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
34343457
buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
34353458
.internal()
@@ -3447,6 +3470,13 @@ object SQLConf {
34473470
.intConf
34483471
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
34493472

3473+
val CARTESIAN_PRODUCT_EXEC_BUFFER_SIZE_SPILL_THRESHOLD =
3474+
buildConf("spark.sql.cartesianProductExec.buffer.spill.size.threshold")
3475+
.internal()
3476+
.doc("Threshold for size of rows to be spilled by cartesian product operator")
3477+
.version("4.1.0")
3478+
.fallbackConf(SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD)
3479+
34503480
val SUPPORT_QUOTED_REGEX_COLUMN_NAME = buildConf("spark.sql.parser.quotedRegexColumnNames")
34513481
.doc("When true, quoted Identifiers (using backticks) in SELECT statement are interpreted" +
34523482
" as regular expressions.")
@@ -6699,24 +6729,35 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
66996729

67006730
def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)
67016731

6732+
def windowExecBufferSpillSizeThreshold: Long = getConf(WINDOW_EXEC_BUFFER_SIZE_SPILL_THRESHOLD)
6733+
67026734
def windowGroupLimitThreshold: Int = getConf(WINDOW_GROUP_LIMIT_THRESHOLD)
67036735

67046736
def sessionWindowBufferInMemoryThreshold: Int = getConf(SESSION_WINDOW_BUFFER_IN_MEMORY_THRESHOLD)
67056737

67066738
def sessionWindowBufferSpillThreshold: Int = getConf(SESSION_WINDOW_BUFFER_SPILL_THRESHOLD)
67076739

6740+
def sessionWindowBufferSpillSizeThreshold: Long =
6741+
getConf(SESSION_WINDOW_BUFFER_SPILL_SIZE_THRESHOLD)
6742+
67086743
def sortMergeJoinExecBufferInMemoryThreshold: Int =
67096744
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
67106745

67116746
def sortMergeJoinExecBufferSpillThreshold: Int =
67126747
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)
67136748

6749+
def sortMergeJoinExecBufferSpillSizeThreshold: Long =
6750+
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SIZE_SPILL_THRESHOLD)
6751+
67146752
def cartesianProductExecBufferInMemoryThreshold: Int =
67156753
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
67166754

67176755
def cartesianProductExecBufferSpillThreshold: Int =
67186756
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)
67196757

6758+
def cartesianProductExecBufferSizeSpillThreshold: Long =
6759+
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SIZE_SPILL_THRESHOLD)
6760+
67206761
def codegenSplitAggregateFunc: Boolean = getConf(SQLConf.CODEGEN_SPLIT_AGGREGATE_FUNC)
67216762

67226763
def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)

0 commit comments

Comments
 (0)