Skip to content

Commit 92d8984

Browse files
dongjoon-hyunyhuang-db
authored andcommitted
[SPARK-47618][CORE] Use Magic Committer for all S3 buckets by default
### What changes were proposed in this pull request? This PR aims to use Apache Hadoop `Magic Committer` for all S3 buckets by default in Apache Spark 4.1.0. ### Why are the changes needed? Apache Hadoop `Magic Committer` has been used for S3 buckets to get the best performance since [S3 became fully consistent on December 1st, 2020](https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/). - https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html#ConsistencyModel > Amazon S3 provides strong read-after-write consistency for PUT and DELETE requests of objects in your Amazon S3 bucket in all AWS Regions. This behavior applies to both writes to new objects as well as PUT requests that overwrite existing objects and DELETE requests. In addition, read operations on Amazon S3 Select, Amazon S3 access controls lists (ACLs), Amazon S3 Object Tags, and object metadata (for example, the HEAD object) are strongly consistent. ### Does this PR introduce _any_ user-facing change? Yes, the migration guide is updated. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51010 from dongjoon-hyun/SPARK-47618-2. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 0757f2a commit 92d8984

File tree

3 files changed

+8
-25
lines changed

3 files changed

+8
-25
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ class SparkContext(config: SparkConf) extends Logging {
428428
conf.setIfMissing("spark.hadoop.fs.s3a.vectored.read.min.seek.size", "128K")
429429
conf.setIfMissing("spark.hadoop.fs.s3a.vectored.read.max.merged.size", "2M")
430430
// This should be set as early as possible.
431-
SparkContext.fillMissingMagicCommitterConfsIfNeeded(_conf)
431+
SparkContext.enableMagicCommitterIfNeeded(_conf)
432432

433433
SparkContext.supplementJavaModuleOptions(_conf)
434434
SparkContext.supplementJavaIPv6Options(_conf)
@@ -3389,16 +3389,10 @@ object SparkContext extends Logging {
33893389
}
33903390

33913391
/**
3392-
* This is a helper function to complete the missing S3A magic committer configurations
3393-
* based on a single conf: `spark.hadoop.fs.s3a.bucket.<bucket>.committer.magic.enabled`
3392+
* Enable Magic Committer by default for all S3 buckets if hadoop-cloud module exists.
33943393
*/
3395-
private def fillMissingMagicCommitterConfsIfNeeded(conf: SparkConf): Unit = {
3396-
val magicCommitterConfs = conf
3397-
.getAllWithPrefix("spark.hadoop.fs.s3a.bucket.")
3398-
.filter(_._1.endsWith(".committer.magic.enabled"))
3399-
.filter(_._2.equalsIgnoreCase("true"))
3400-
if (magicCommitterConfs.nonEmpty &&
3401-
Utils.classIsLoadable("org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter") &&
3394+
private def enableMagicCommitterIfNeeded(conf: SparkConf): Unit = {
3395+
if (Utils.classIsLoadable("org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter") &&
34023396
Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")) {
34033397
// Try to enable S3 magic committer if missing
34043398
conf.setIfMissing("spark.hadoop.fs.s3a.committer.magic.enabled", "true")

docs/core-migration-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ license: |
2626

2727
- Since Spark 4.1, Spark Master deamon provides REST API by default. To restore the behavior before Spark 4.1, you can set `spark.master.rest.enabled` to `false`.
2828
- Since Spark 4.1, Spark will compress RDD checkpoints by default. To restore the behavior before Spark 4.1, you can set `spark.checkpoint.compress` to `false`.
29+
- Since Spark 4.1, Spark uses Apache Hadoop Magic Committer for all S3 buckets by default. To restore the behavior before Spark 4.0, you can set `spark.hadoop.fs.s3a.committer.magic.enabled=false`.
2930

3031
## Upgrading from Core 3.5 to 4.0
3132

hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class SparkContextSuite extends SparkFunSuite with BeforeAndAfterEach {
3434
}
3535
}
3636

37-
test("SPARK-35383: Fill missing S3A magic committer configs if needed") {
37+
test("SPARK-47618: Use Magic Committer for all S3 buckets by default") {
3838
Seq(
3939
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
4040
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
@@ -44,16 +44,6 @@ class SparkContextSuite extends SparkFunSuite with BeforeAndAfterEach {
4444

4545
val c1 = new SparkConf().setAppName("s3a-test").setMaster("local")
4646
sc = new SparkContext(c1)
47-
assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
48-
sc.stop()
49-
50-
val c2 = c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "false")
51-
sc = new SparkContext(c2)
52-
assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
53-
sc.stop()
54-
55-
val c3 = c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "true")
56-
sc = new SparkContext(c3)
5747
Seq(
5848
"spark.hadoop.fs.s3a.committer.magic.enabled" -> "true",
5949
"spark.hadoop.fs.s3a.committer.name" -> "magic",
@@ -69,10 +59,8 @@ class SparkContextSuite extends SparkFunSuite with BeforeAndAfterEach {
6959
sc.stop()
7060

7161
// Respect a user configuration
72-
val c4 = c1.clone
73-
.set("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
74-
.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "true")
75-
sc = new SparkContext(c4)
62+
val c2 = c1.clone.set("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
63+
sc = new SparkContext(c2)
7664
Seq(
7765
"spark.hadoop.fs.s3a.committer.magic.enabled" -> "false",
7866
"spark.hadoop.fs.s3a.committer.name" -> null,

0 commit comments

Comments
 (0)