-
Notifications
You must be signed in to change notification settings - Fork 965
[KYUUBI #6691] A new Spark SQL command to merge small files #6695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 15 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
532369d
1. involve a compact table command to merge small files
gabry-lab d2cbcc2
parser tests pass
gabry-lab 1a92be0
reformat all codes
gabry-lab 906b47e
SparkPlan resolved successfully
gabry-lab 4e9d8ca
reformat
gabry-lab 350326e
adding unit test to recover command
gabry-lab 0d30887
compact table execution tests pass
gabry-lab a52d501
remove unnecessary comments
gabry-lab 7b10692
recover compact table command tests pass
gabry-lab 0ef55ff
more unit tests
gabry-lab f04170b
fix scala style issue
gabry-lab 02f6303
reduce message count
gabry-lab cc0ecce
involve createToScalaConverter
gabry-lab 32276b5
remove unused import
gabry-lab c48b167
remove unused import
gabry-lab b4fd2ad
remove unnecessary comment & reformat
gabry-lab 79e4e94
involve SPECULATION_ENABLED_SYNONYM
gabry-lab 257dfd6
involve createRandomTable
gabry-lab 66faca4
try to catch unknown Row
gabry-lab fd5a3c4
use Seq instead of WrappedArray
gabry-lab 3775c95
compile on scala-2.13 successfully
gabry-lab dccc23a
remove unused import
gabry-lab 62c1044
ByteUnit.MiB.toBytes to fix default target size
gabry-lab 42d1fc0
rename compact-table.md to docs
gabry-lab 232407a
remove unused comments
gabry-lab 5a34b97
support orc
gabry-lab c042dfa
involve toJavaList to compile on scala 2.13
gabry-lab d35f7d4
add bzip2 unit tests
gabry-lab 8b637de
spotless:apply
gabry-lab 7fbc805
fix getCodecFromFilePath for orc
gabry-lab 59f0b99
reformat
gabry-lab ab9b674
support more codec
gabry-lab 22f0b79
remove unused util class, close opened stream in finally block
gabry-lab 8cc2390
rollback regardless of the success or failure of the command
gabry-lab fd39f66
reformat
gabry-lab File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# kyuubi-extension-spark-3-3 | ||
|
||
## compact table command | ||
|
||
it's a new spark sql command to compact small files in a table into larger files, such as 128MB. After compacting is | ||
done, it create a temporary view to query the compacted file details. | ||
|
||
### syntax | ||
|
||
#### compact table | ||
|
||
```sparksql | ||
compact table table_name [INTO ${targetFileSize} ${targetFileSizeUnit} ] [ cleanup | retain | list ] | ||
-- targetFileSizeUnit can be 'b','k','m','g','t','p' | ||
-- cleanup means cleaning compact staging folders, which contains original small files, default behavior | ||
-- retain means retaining compact staging folders, for testing, and we can recover with the staging data | ||
-- list means this command only get the merging result, and don't run actually | ||
``` | ||
#### recover table | ||
```sparksql | ||
corecover mpact table table_name | ||
-- recover the compacted table, and restore the small files from staging to the original location | ||
``` | ||
|
||
### example | ||
|
||
The following command will compact the small files in the table `default.small_files_table` into 128MB files, and create | ||
a temporary view `v_merged_files` to query the compacted file details. | ||
|
||
```sparksql | ||
set spark.sql.shuffle.partitions=32; | ||
|
||
compact table default.small_files_table; | ||
|
||
select * from v_merged_files; | ||
``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
52 changes: 52 additions & 0 deletions
52
...i-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/ParquetFileWriterWrapper.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.kyuubi.sql | ||
|
||
import java.lang.reflect.Method | ||
|
||
import org.apache.parquet.hadoop.ParquetFileWriter | ||
import org.apache.parquet.hadoop.metadata.{FileMetaData, GlobalMetaData} | ||
|
||
object ParquetFileWriterWrapper { | ||
// Caused by: java.lang.IllegalAccessError: tried to access method | ||
// org.apache.parquet.hadoop.ParquetFileWriter.mergeInto( | ||
// Lorg/apache/parquet/hadoop/metadata/FileMetaData; | ||
// Lorg/apache/parquet/hadoop/metadata/GlobalMetaData;Z) | ||
// Lorg/apache/parquet/hadoop/metadata/GlobalMetaData; | ||
// from class org.apache.parquet.hadoop.ParquetFileWriterWrapper$ | ||
|
||
val mergeInfoField: Method = classOf[ParquetFileWriter] | ||
.getDeclaredMethod( | ||
"mergeInto", | ||
classOf[FileMetaData], | ||
classOf[GlobalMetaData], | ||
classOf[Boolean]) | ||
|
||
mergeInfoField.setAccessible(true) | ||
|
||
def mergeInto( | ||
toMerge: FileMetaData, | ||
mergedMetadata: GlobalMetaData, | ||
strict: Boolean): GlobalMetaData = { | ||
mergeInfoField.invoke( | ||
null, | ||
toMerge.asInstanceOf[AnyRef], | ||
mergedMetadata.asInstanceOf[AnyRef], | ||
strict.asInstanceOf[AnyRef]).asInstanceOf[GlobalMetaData] | ||
} | ||
} |
70 changes: 70 additions & 0 deletions
70
...-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CachePerformanceViewCommand.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.kyuubi.sql.compact | ||
|
||
import org.apache.hadoop.fs.FileSystem | ||
import org.apache.spark.sql.{Row, SparkInternalExplorer, SparkSession} | ||
import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.execution.command.{DropTableCommand, LeafRunnableCommand} | ||
|
||
case class CachePerformanceViewCommand( | ||
tableIdentifier: Seq[String], | ||
performancePlan: LogicalPlan, | ||
originalFileLocations: Seq[String], | ||
options: CompactTableOption) extends LeafRunnableCommand { | ||
|
||
override def innerChildren: Seq[QueryPlan[_]] = Seq(performancePlan) | ||
|
||
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
val dropViewCommand = DropTableCommand( | ||
CompactTableUtils.getTableIdentifier(tableIdentifier), | ||
ifExists = true, | ||
isView = true, | ||
purge = true) | ||
dropViewCommand.run(sparkSession) | ||
|
||
val speculation = | ||
sparkSession.sparkContext.getConf.getBoolean("spark.speculation", defaultValue = false) | ||
if (speculation) { | ||
sparkSession.sparkContext.getConf.set("spark.speculation", "false") | ||
gabry-lab marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
log.warn("set spark.speculation to false") | ||
} | ||
|
||
val cacheTableCommand = | ||
SparkInternalExplorer.CacheTableAsSelectExec(tableIdentifier.head, performancePlan) | ||
|
||
// this result always empty | ||
cacheTableCommand.run() | ||
|
||
if (options == CompactTableOptions.CleanupStagingFolder) { | ||
val fileSystem = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration) | ||
originalFileLocations.foreach { originalFileLocation => | ||
val compactStagingDir = CompactTableUtils.getCompactStagingDir(originalFileLocation) | ||
fileSystem.delete(compactStagingDir, true) | ||
} | ||
|
||
} | ||
if (speculation) { | ||
sparkSession.sparkContext.getConf.set("spark.speculation", "true") | ||
log.warn("rollback spark.speculation to true") | ||
} | ||
Seq.empty[Row] | ||
} | ||
|
||
} |
83 changes: 83 additions & 0 deletions
83
...yuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTable.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.kyuubi.sql.compact | ||
|
||
import org.apache.spark.sql.catalyst.analysis.UnresolvedUnaryNode | ||
import org.apache.spark.sql.catalyst.expressions.AttributeReference | ||
import org.apache.spark.sql.catalyst.plans.logical.{LeafParsedStatement, LogicalPlan} | ||
import org.apache.spark.sql.types._ | ||
|
||
object CompactTable { | ||
private val fileLocAndSizeStructArrayType: ArrayType = | ||
DataTypes.createArrayType(DataTypes.createStructType(Array( | ||
DataTypes.createStructField("sub_group_id", IntegerType, false), | ||
DataTypes.createStructField("name", StringType, false), | ||
DataTypes.createStructField("length", LongType, false)))) | ||
|
||
val smallFileCollectOutput: StructType = DataTypes.createStructType(Array( | ||
DataTypes.createStructField("group_id", IntegerType, false), | ||
DataTypes.createStructField("location", StringType, false), | ||
DataTypes.createStructField("data_source", StringType, false), | ||
DataTypes.createStructField("codec", StringType, true), | ||
DataTypes.createStructField("smallFiles", fileLocAndSizeStructArrayType, false))) | ||
|
||
val smallFileCollectOutputAttribute: Seq[AttributeReference] = smallFileCollectOutput | ||
.map(field => AttributeReference(field.name, field.dataType, field.nullable)()) | ||
|
||
val mergedFilesCachedTableName = "v_merged_files" | ||
val mergeMetadataKey = "spark.sql.compact.parquet.metadata.merge" | ||
} | ||
|
||
trait CompactTableOption | ||
|
||
object CompactTableOptions { | ||
def apply(options: Option[String]): CompactTableOption = options.map(_.toLowerCase) match { | ||
case Some("retain") => RetainStagingFolder | ||
case Some("list") => DryRun | ||
case _ => CleanupStagingFolder | ||
} | ||
|
||
case object CleanupStagingFolder extends CompactTableOption | ||
|
||
case object RetainStagingFolder extends CompactTableOption | ||
|
||
case object DryRun extends CompactTableOption | ||
} | ||
|
||
case class CompactTable( | ||
child: LogicalPlan, | ||
targetSizeInBytes: Option[Long], | ||
options: CompactTableOption) extends UnresolvedUnaryNode { | ||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { | ||
CompactTable(newChild, targetSizeInBytes, options) | ||
} | ||
} | ||
|
||
case class CompactTableStatement( | ||
tableParts: Seq[String], | ||
targetSizeInMB: Option[Long], | ||
options: CompactTableOption) extends LeafParsedStatement | ||
|
||
case class RecoverCompactTableStatement(tableParts: Seq[String]) | ||
extends LeafParsedStatement | ||
|
||
case class RecoverCompactTable(child: LogicalPlan) extends UnresolvedUnaryNode { | ||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { | ||
RecoverCompactTable(newChild) | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.