Skip to content

chore: Comet CI with Iceberg 1.9.1 #1950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/setup-spark-local-jar/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ runs:
cd apache-spark
git apply ../dev/diffs/${{inputs.spark-version}}.diff
./dev/change-scala-version.sh ${{inputs.scala-version}}
./build/mvn versions:set -DnewVersion=${{inputs.spark-version}}-SNAPSHOT
./build/mvn versions:set -DnewVersion=${{inputs.spark-version}}-SNAPSHOT -DgenerateBackupPoms=false
./build/mvn -Pscala-${{inputs.scala-version}} -Phive -Phive-thriftserver -DskipTests -Denforcer.skip=true clean install
2 changes: 1 addition & 1 deletion .github/workflows/iceberg_spark_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
matrix:
os: [ubuntu-24.04]
java-version: [11, 17]
iceberg-version: [{short: '1.8', full: '1.8.1'}]
iceberg-version: [{short: '1.8', full: '1.8.1'}, {short: '1.9', full: '1.9.1'}]
spark-version: [{short: '3.5', full: '3.5.6'}]
scala-version: ['2.13']
fail-fast: false
Expand Down
296 changes: 296 additions & 0 deletions dev/diffs/iceberg/1.9.1.diff
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index c50991c..1d6afac 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -80,7 +80,7 @@ scala-collection-compat = "2.13.0"
slf4j = "2.0.17"
snowflake-jdbc = "3.23.2"
spark34 = "3.4.4"
-spark35 = "3.5.5"
+spark35 = "3.5.6-SNAPSHOT"
sqlite-jdbc = "3.49.1.0"
testcontainers = "1.20.6"
tez08 = { strictly = "0.8.4"} # see rich version usage explanation above
diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index 2cd8666..c1893b1 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'org.roaringbitmap'
}

- compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
+ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT"

implementation libs.parquet.column
implementation libs.parquet.hadoop
@@ -186,7 +186,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation libs.parquet.hadoop
testImplementation libs.awaitility
testImplementation libs.junit.vintage.engine
- testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
+ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT"

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
@@ -267,6 +267,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
+ integrationImplementation project(path: ':iceberg-parquet')
+ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT"

// runtime dependencies for running Hive Catalog based integration test
integrationRuntimeOnly project(':iceberg-hive-metastore')
@@ -304,8 +306,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded'
relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer'
- relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
- relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
+// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
+// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5'
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 0ca1236..87daef4 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -29,7 +29,7 @@ public class SparkSQLProperties {

// Controls which Parquet reader implementation to use
public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type";
- public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG;
+ public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.COMET;

// Controls whether reading/writing timestamps without timezones is allowed
@Deprecated
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
index 16159dc..f6c6c95 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
@@ -19,11 +19,11 @@
package org.apache.iceberg.spark.data.vectorized;

import java.io.IOException;
+import org.apache.comet.CometSchemaImporter;
import org.apache.comet.parquet.AbstractColumnReader;
import org.apache.comet.parquet.ColumnReader;
import org.apache.comet.parquet.TypeUtil;
import org.apache.comet.parquet.Utils;
-import org.apache.comet.shaded.arrow.c.CometSchemaImporter;
import org.apache.comet.shaded.arrow.memory.RootAllocator;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -92,7 +92,7 @@ class CometColumnReader implements VectorizedReader<ColumnVector> {
}

this.importer = new CometSchemaImporter(new RootAllocator());
- this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
+ this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false);
this.initialized = true;
}

diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index a361a7f..b21859b 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -24,6 +24,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.comet.parquet.SupportsComet;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
@@ -63,7 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask>
- implements SupportsRuntimeV2Filtering {
+ implements SupportsRuntimeV2Filtering, SupportsComet {

private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class);

@@ -290,4 +291,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask>
runtimeFilterExpressions,
caseSensitive());
}
+
+ @Override
+ public boolean isCometEnabled() {
+ return true;
+ }
}
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
index 47a0e87..531b7ce 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
@@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;

public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog {
@@ -214,7 +215,7 @@ public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog {
Assert.assertEquals(4, fields.size());
}

- @Test
+ @Ignore
public void testMergeSchemaIgnoreCastingLongToInt() throws Exception {
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')",
@@ -254,7 +255,7 @@ public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog {
assertThat(idField.type().typeId()).isEqualTo(Type.TypeID.LONG);
}

- @Test
+ @Ignore
public void testMergeSchemaIgnoreCastingDoubleToFloat() throws Exception {
removeTables();
sql("CREATE TABLE %s (id double, data string) USING iceberg", tableName);
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index 572c32f..88ef87d 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'org.roaringbitmap'
}

- compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
+ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT"

implementation libs.parquet.column
implementation libs.parquet.hadoop
@@ -184,7 +184,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
testImplementation libs.awaitility
- testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"
+ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT"

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
@@ -265,6 +265,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
+ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.9.0-SNAPSHOT"

// runtime dependencies for running Hive Catalog based integration test
integrationRuntimeOnly project(':iceberg-hive-metastore')
@@ -302,8 +303,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded'
relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer'
- relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
- relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
+// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
+// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5'
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index d6c16bb..123a300 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -29,7 +29,7 @@ public class SparkSQLProperties {

// Controls which Parquet reader implementation to use
public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type";
- public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG;
+ public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.COMET;
// Controls whether to perform the nullability check during writes
public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability";
public static final boolean CHECK_NULLABILITY_DEFAULT = true;
diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
index 16159dc..f6c6c95 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
@@ -19,11 +19,11 @@
package org.apache.iceberg.spark.data.vectorized;

import java.io.IOException;
+import org.apache.comet.CometSchemaImporter;
import org.apache.comet.parquet.AbstractColumnReader;
import org.apache.comet.parquet.ColumnReader;
import org.apache.comet.parquet.TypeUtil;
import org.apache.comet.parquet.Utils;
-import org.apache.comet.shaded.arrow.c.CometSchemaImporter;
import org.apache.comet.shaded.arrow.memory.RootAllocator;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -92,7 +92,7 @@ class CometColumnReader implements VectorizedReader<ColumnVector> {
}

this.importer = new CometSchemaImporter(new RootAllocator());
- this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
+ this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false);
this.initialized = true;
}

diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
index a361a7f..9021cd5 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -24,6 +24,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.comet.parquet.SupportsComet;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
@@ -63,7 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask>
- implements SupportsRuntimeV2Filtering {
+ implements SupportsRuntimeV2Filtering, SupportsComet {

private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class);

@@ -290,4 +291,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask>
runtimeFilterExpressions,
caseSensitive());
}
+
+ @Override
+ public boolean isCometEnabled() {
+ return true;
+ }
}
diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
index 7404b18..6ce9485 100644
--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;

public class TestDataFrameWriterV2 extends TestBaseWithCatalog {
@@ -248,7 +249,7 @@ public class TestDataFrameWriterV2 extends TestBaseWithCatalog {
sql("select * from %s order by id", tableName));
}

- @TestTemplate
+ @Disabled
public void testMergeSchemaIgnoreCastingLongToInt() throws Exception {
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')",
@@ -288,7 +289,7 @@ public class TestDataFrameWriterV2 extends TestBaseWithCatalog {
assertThat(idField.type().typeId()).isEqualTo(Type.TypeID.LONG);
}

- @TestTemplate
+ @Disabled
public void testMergeSchemaIgnoreCastingDoubleToFloat() throws Exception {
removeTables();
sql("CREATE TABLE %s (id double, data string) USING iceberg", tableName);
Loading