diff --git a/build.gradle b/build.gradle index fd636b5..2e2300b 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,7 @@ plugins { id "scala" id "maven-publish" id "com.diffplug.gradle.spotless" version "3.27.1" - id "org.embulk.embulk-plugins" version "0.4.1" + id "org.embulk.embulk-plugins" version "0.7.0" } repositories { @@ -10,48 +10,47 @@ repositories { jcenter() } -group = "pro.civitaspo" -version = "0.5.3" -description = "Dumps records to S3 Parquet." +version = "0.6.0" -sourceCompatibility = 1.8 -targetCompatibility = 1.8 +sourceCompatibility = 17 +targetCompatibility = 17 dependencies { - compileOnly "org.embulk:embulk-core:0.9.23" + compileOnly "org.embulk:embulk-core:0.11.5" + compileOnly 'org.msgpack:msgpack-core:0.8.16' + compileOnly "org.embulk:embulk-spi:0.11" + compileOnly "org.embulk:embulk-api:0.10.43" // NOTE: Is shadow plugin required in the future? - compile "org.scala-lang:scala-library:2.13.1" - + implementation "org.scala-lang:scala-library:2.13.1" + implementation 'com.google.guava:guava:30.1.1-jre' ['glue', 's3', 'sts'].each { v -> - compile("com.amazonaws:aws-java-sdk-${v}:1.11.769") { + implementation("com.amazonaws:aws-java-sdk-${v}:1.11.769") { exclude group: 'joda-time', module: 'joda-time' - exclude group: 'com.fasterxml.jackson.core', module: 'jackson-annotations' - exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' - exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' } } + // Add Jackson dependencies explicitly + implementation 'com.fasterxml.jackson.core:jackson-core:2.14.0' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.0' + implementation 'com.fasterxml.jackson.core:jackson-annotations:2.14.0' ['column', 'common', 'encoding', 'hadoop', 'jackson'].each { v -> - compile("org.apache.parquet:parquet-${v}:1.11.0") { + implementation("org.apache.parquet:parquet-${v}:1.11.0") { exclude group: 'org.slf4j', module: 'slf4j-api' } } // ref. https://github.com/apache/parquet-mr/blob/apache-parquet-1.11.0/pom.xml#L85 - compile('org.apache.parquet:parquet-format:2.7.0') { + implementation('org.apache.parquet:parquet-format:2.7.0') { exclude group: 'org.slf4j', module: 'slf4j-api' } - compile('org.apache.hadoop:hadoop-common:2.10.2') { + implementation('org.apache.hadoop:hadoop-common:2.10.2') { exclude group: 'org.slf4j', module: 'slf4j-api' exclude group: 'com.google.guava', module: 'guava' exclude group: 'commons-beanutils', module: 'commons-beanutils-core' exclude group: 'org.apache.commons', module: 'commons-lang3' exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' } - compile 'org.xerial.snappy:snappy-java:1.1.7.3' + implementation 'org.xerial.snappy:snappy-java:1.1.7.3' - ['core', 'standards', 'deps-buffer', 'deps-config'].each { v -> - testImplementation "org.embulk:embulk-${v}:0.9.23" - } - testImplementation "org.embulk:embulk-core:0.9.23:tests" + testImplementation "org.embulk:embulk-core:0.11.5" testImplementation "org.scalatest:scalatest_2.13:3.1.1" testImplementation 'org.apache.parquet:parquet-avro:1.11.0' testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-avro:2.14.0' @@ -101,3 +100,25 @@ task scalatest(dependsOn: ['testClasses'], type: JavaExec) { args = ['-R', 'build/classes/scala/test', '-o'] classpath = sourceSets.test.runtimeClasspath } + +tasks.withType(JavaExec) { + jvmArgs += [ + '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', + '--add-opens', 'java.base/java.io=ALL-UNNAMED', + '--add-opens', 'java.base/java.nio=ALL-UNNAMED', + '--add-opens', 'java.base/java.util=ALL-UNNAMED', + '--add-opens', 'java.base/java.lang=ALL-UNNAMED', + '--add-opens', 'java.base/java.lang.reflect=ALL-UNNAMED' + ] +} + +tasks.withType(Test) { + jvmArgs += [ + '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', + '--add-opens', 'java.base/java.io=ALL-UNNAMED', + '--add-opens', 'java.base/java.nio=ALL-UNNAMED', + '--add-opens', 'java.base/java.util=ALL-UNNAMED', + '--add-opens', 'java.base/java.lang=ALL-UNNAMED', + '--add-opens', 'java.base/java.lang.reflect=ALL-UNNAMED' + ] +} diff --git a/gradle/dependency-locks/embulkPluginRuntime.lockfile b/gradle/dependency-locks/embulkPluginRuntime.lockfile deleted file mode 100644 index ca7a748..0000000 --- a/gradle/dependency-locks/embulkPluginRuntime.lockfile +++ /dev/null @@ -1,91 +0,0 @@ -# This is a Gradle generated file for dependency locking. -# Manual edits can break the build and are not advised. -# This file is expected to be part of source control. -asm:asm:3.1 -ch.qos.reload4j:reload4j:1.2.19 -com.amazonaws:aws-java-sdk-core:1.11.769 -com.amazonaws:aws-java-sdk-glue:1.11.769 -com.amazonaws:aws-java-sdk-kms:1.11.769 -com.amazonaws:aws-java-sdk-s3:1.11.769 -com.amazonaws:aws-java-sdk-sts:1.11.769 -com.amazonaws:jmespath-java:1.11.769 -com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.6.7 -com.fasterxml.woodstox:woodstox-core:5.3.0 -com.github.spotbugs:spotbugs-annotations:3.1.9 -com.github.stephenc.jcip:jcip-annotations:1.0-1 -com.google.code.findbugs:jsr305:3.0.2 -com.google.code.gson:gson:2.2.4 -com.google.protobuf:protobuf-java:2.5.0 -com.jamesmurty.utils:java-xmlbuilder:0.4 -com.jcraft:jsch:0.1.55 -com.nimbusds:nimbus-jose-jwt:7.9 -com.sun.jersey:jersey-core:1.9 -com.sun.jersey:jersey-json:1.9 -com.sun.jersey:jersey-server:1.9 -com.sun.xml.bind:jaxb-impl:2.2.3-1 -com.thoughtworks.paranamer:paranamer:2.3 -commons-beanutils:commons-beanutils:1.9.4 -commons-cli:commons-cli:1.2 -commons-codec:commons-codec:1.11 -commons-collections:commons-collections:3.2.2 -commons-configuration:commons-configuration:1.6 -commons-digester:commons-digester:1.8 -commons-io:commons-io:2.5 -commons-lang:commons-lang:2.6 -commons-logging:commons-logging:1.2 -commons-net:commons-net:3.1 -commons-pool:commons-pool:1.6 -io.netty:netty:3.10.6.Final -javax.activation:activation:1.1 -javax.annotation:javax.annotation-api:1.3.2 -javax.servlet.jsp:jsp-api:2.1 -javax.servlet:servlet-api:2.5 -javax.xml.bind:jaxb-api:2.2.2 -javax.xml.stream:stax-api:1.0-2 -jline:jline:0.9.94 -log4j:log4j:1.2.17 -net.java.dev.jets3t:jets3t:0.9.0 -net.minidev:accessors-smart:1.2 -net.minidev:json-smart:2.3 -org.apache.avro:avro:1.7.7 -org.apache.commons:commons-compress:1.21 -org.apache.commons:commons-math3:3.1.1 -org.apache.curator:curator-client:2.13.0 -org.apache.curator:curator-framework:2.13.0 -org.apache.curator:curator-recipes:2.13.0 -org.apache.directory.api:api-asn1-api:1.0.0-M20 -org.apache.directory.api:api-util:1.0.0-M20 -org.apache.directory.server:apacheds-i18n:2.0.0-M15 -org.apache.directory.server:apacheds-kerberos-codec:2.0.0-M15 -org.apache.hadoop:hadoop-annotations:2.10.2 -org.apache.hadoop:hadoop-auth:2.10.2 -org.apache.hadoop:hadoop-common:2.10.2 -org.apache.htrace:htrace-core4:4.1.0-incubating -org.apache.httpcomponents:httpclient:4.5.13 -org.apache.httpcomponents:httpcore:4.4.13 -org.apache.parquet:parquet-column:1.11.0 -org.apache.parquet:parquet-common:1.11.0 -org.apache.parquet:parquet-encoding:1.11.0 -org.apache.parquet:parquet-format-structures:1.11.0 -org.apache.parquet:parquet-format:2.7.0 -org.apache.parquet:parquet-hadoop:1.11.0 -org.apache.parquet:parquet-jackson:1.11.0 -org.apache.yetus:audience-annotations:0.11.0 -org.apache.zookeeper:zookeeper:3.4.14 -org.codehaus.jackson:jackson-core-asl:1.9.13 -org.codehaus.jackson:jackson-jaxrs:1.8.3 -org.codehaus.jackson:jackson-mapper-asl:1.9.13 -org.codehaus.jackson:jackson-xc:1.8.3 -org.codehaus.jettison:jettison:1.1 -org.codehaus.woodstox:stax2-api:4.2.1 -org.mortbay.jetty:jetty-sslengine:6.1.26 -org.mortbay.jetty:jetty-util:6.1.26 -org.mortbay.jetty:jetty:6.1.26 -org.mortbay.jetty:servlet-api:2.5-20081211 -org.ow2.asm:asm:5.0.4 -org.scala-lang:scala-library:2.13.1 -org.slf4j:slf4j-log4j12:1.7.25 -org.slf4j:slf4j-reload4j:1.7.36 -org.xerial.snappy:snappy-java:1.1.7.3 -software.amazon.ion:ion-java:1.0.2 -xmlenc:xmlenc:0.52 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index a4b4429..070cb70 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetColumnType.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetColumnType.scala index e89c06b..5bacdaa 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetColumnType.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetColumnType.scala @@ -23,7 +23,6 @@ import org.embulk.output.s3_parquet.catalog.GlueDataType import org.embulk.output.s3_parquet.implicits import org.embulk.spi.{Column, DataException, Exec} import org.embulk.spi.time.{Timestamp, TimestampFormatter} -import org.embulk.spi.time.TimestampFormatter.TimestampColumnOption import org.msgpack.value.Value import org.slf4j.{Logger, LoggerFactory} @@ -37,7 +36,7 @@ object ParquetColumnType { private val logger: Logger = LoggerFactory.getLogger(classOf[ParquetColumnType]) - trait Task extends EmbulkTask with TimestampColumnOption { + trait Task extends EmbulkTask { @Config("logical_type") @ConfigDefault("null") def getLogicalType: Optional[LogicalTypeOption] @@ -242,7 +241,7 @@ object ParquetColumnType { isSigned = o.getIsSigned, isAdjustedToUtc = o.getIsAdjustedToUtc, timeUnit = o.getTimeUnit, - timeZone = task.getTimeZoneId.map(ZoneId.of) + timeZone = Some(ZoneId.of("UTC")) ) } } diff --git a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriteSupport.scala b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriteSupport.scala index a666320..b48cc20 100644 --- a/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriteSupport.scala +++ b/src/main/scala/org/embulk/output/s3_parquet/parquet/ParquetFileWriteSupport.scala @@ -37,7 +37,7 @@ object ParquetFileWriteSupport { def getColumnOptions: JMap[String, ParquetColumnType.Task] def setColumnOptions( columnOptions: JMap[String, ParquetColumnType.Task] - ): Unit + ): Unit @Config("type_options") @ConfigDefault("{}") @@ -52,7 +52,7 @@ object ParquetFileWriteSupport { override def self(): WriterBuilder = this override def getWriteSupport( conf: Configuration - ): WriteSupport[PageReader] = writeSupport + ): WriteSupport[PageReader] = writeSupport } def configure(task: Task): Unit = { @@ -72,22 +72,7 @@ object ParquetFileWriteSupport { task.getTypeOptions.keys.foreach( embulkType - ) // throw ConfigException if unknown type name is found. - - task.getColumnOptions.foreach { - case (c: String, t: ParquetColumnType.Task) => - val column: Column = schema.lookupColumn(c) // throw ConfigException if columnName does not exist. - - if (t.getFormat.isDefined || t.getTimeZoneId.isDefined) { - if (!column.getType.isInstanceOf[TimestampType]) { - // NOTE: Warning is better instead of throwing. - throw new ConfigException( - s"The type of column{name:${column.getName},type:${column.getType.getName}} is not 'timestamp'," + - " but timestamp options (\"format\" or \"timezone\") are set." - ) - } - } - } + ) // throw ConfigException if unknown type name is found. } private def embulkType(typeName: String): Type = { @@ -104,19 +89,28 @@ object ParquetFileWriteSupport { throw new ConfigException(s"Unknown embulk type: $typeName.") } + private def createTimestampFormatters(task: Task, schema: Schema): Seq[TimestampFormatter] = { + (0 until schema.getColumnCount).map { i => + val column = schema.getColumn(i) + if (column.getType.isInstanceOf[TimestampType]) { + TimestampFormatter.of(task.getDefaultTimestampFormat(), "UTC") + } else + null + } + } + def apply(task: Task, schema: Schema): ParquetFileWriteSupport = { validateTask(task, schema) val parquetSchema: Map[Column, ParquetColumnType] = schema.getColumns.map { c: Column => - c -> task.getColumnOptions.toMap - .get(c.getName) - .orElse(task.getTypeOptions.toMap.get(c.getType.getName)) - .flatMap(ParquetColumnType.fromTask) - .getOrElse(DefaultColumnType) + c -> task.getColumnOptions.toMap + .get(c.getName) + .orElse(task.getTypeOptions.toMap.get(c.getType.getName)) + .flatMap(ParquetColumnType.fromTask) + .getOrElse(DefaultColumnType) }.toMap - val timestampFormatters: Seq[TimestampFormatter] = Timestamps - .newTimestampColumnFormatters(task, schema, task.getColumnOptions) + val timestampFormatters: Seq[TimestampFormatter] = createTimestampFormatters(task, schema) new ParquetFileWriteSupport(schema, parquetSchema, timestampFormatters) } } @@ -157,7 +151,7 @@ case class ParquetFileWriteSupport private ( schema.visitColumns(new ColumnVisitor { override def booleanColumn(column: Column): Unit = nullOr(column) { parquetSchema(column) - .consumeBoolean(current, record.getBoolean(column)) + .consumeBoolean(current, record.getBoolean(column)) } override def longColumn(column: Column): Unit = nullOr(column) { parquetSchema(column).consumeLong(current, record.getLong(column)) @@ -196,6 +190,6 @@ case class ParquetFileWriteSupport private ( current.endField(column.getName, column.getIndex) } - def newWriterBuilder(pathString: String): WriterBuilder = - WriterBuilder(new Path(pathString), this) + def newWriterBuilder(pathString: String): ParquetFileWriteSupport.WriterBuilder = + ParquetFileWriteSupport.WriterBuilder(new Path(pathString), this) }