From 353e6cb1b6f53c9dc692807238b4199cdd2f1822 Mon Sep 17 00:00:00 2001 From: Volodymyr Perebykivskyi Date: Sat, 28 Jun 2025 15:02:28 +0300 Subject: [PATCH] [bq] introduce BigQuery Storage Write API (JSON) Signed-off-by: Dgray16 --- spring-batch-bigquery/README.adoc | 27 ++- .../BigQueryQueryItemReaderBuilder.java | 3 +- .../BigQueryLoadJobBaseItemWriter.java} | 21 +- .../csv/BigQueryLoadJobCsvItemWriter.java} | 5 +- .../builder/BigQueryCsvItemWriterBuilder.java | 28 +-- .../json/BigQueryLoadJobJsonItemWriter.java} | 11 +- ...BigQueryLoadJobJsonItemWriterBuilder.java} | 55 +++--- .../bigquery/writer/loadjob/package-info.java | 30 +++ .../bigquery/writer/package-info.java | 11 +- .../json/BigQueryWriteApiJsonItemWriter.java | 182 ++++++++++++++++++ ...BigQueryWriteApiJsonItemWriterBuilder.java | 130 +++++++++++++ .../writer/writeapi/package-info.java | 30 +++ .../{TableUtils.java => NameUtils.java} | 8 +- .../extensions/bigquery/common/PersonDto.java | 16 ++ .../bigquery/common/TestConstants.java | 2 + ...mulatorTest.java => EmulatorBaseTest.java} | 9 +- ...latorBigQueryBaseDockerConfiguration.java} | 9 +- ...va => EmulatorBigQueryItemReaderTest.java} | 10 +- ...t.java => EmulatorBaseItemReaderTest.java} | 11 +- ...t.java => EmulatorBaseItemWriterTest.java} | 31 ++- .../writer/base/SpyResponseExtension.java | 4 +- ...atorBigQueryLoadJobCsvItemWriterTest.java} | 16 +- ...torBigQueryLoadJobJsonItemWriterTest.java} | 18 +- ...torBigQueryWriteApiJsonItemWriterTest.java | 56 ++++++ ...=> GcloudBaseBigQueryIntegrationTest.java} | 2 +- ...java => GcloudBigQueryItemReaderTest.java} | 20 +- ... => GcloudBaseBigQueryItemWriterTest.java} | 4 +- ...loudBigQueryLoadJobCsvItemWriterTest.java} | 23 ++- ...oudBigQueryLoadJobJsonItemWriterTest.java} | 25 +-- ...oudBigQueryWriteApiJsonItemWriterTest.java | 12 ++ .../BigQueryItemReaderBuilderTest.java | 21 +- .../BigQueryLoadJobBaseItemWriterTest.java} | 42 ++-- .../BigQueryLoadJobCsvItemWriterTest.java} | 29 +-- ...ueryLoadJobCsvItemWriterBuilderTests.java} | 28 +-- .../BigQueryLoadJobJsonItemWriterTest.java} | 18 +- ...eryLoadJobJsonItemWriterBuilderTests.java} | 28 +-- .../BigQueryWriteApiJsonItemWriterTest.java | 181 +++++++++++++++++ ...ueryWriteApiJsonItemWriterBuilderTest.java | 172 +++++++++++++++++ .../src/test/resources/logback.xml | 1 + 39 files changed, 1101 insertions(+), 228 deletions(-) rename spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/{BigQueryBaseItemWriter.java => loadjob/BigQueryLoadJobBaseItemWriter.java} (93%) rename spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/{BigQueryCsvItemWriter.java => loadjob/csv/BigQueryLoadJobCsvItemWriter.java} (93%) rename spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/{ => loadjob/csv}/builder/BigQueryCsvItemWriterBuilder.java (76%) rename spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/{BigQueryJsonItemWriter.java => loadjob/json/BigQueryLoadJobJsonItemWriter.java} (87%) rename spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/{builder/BigQueryJsonItemWriterBuilder.java => loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilder.java} (53%) create mode 100644 spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java create mode 100644 spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiJsonItemWriter.java create mode 100644 spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilder.java create mode 100644 spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/{TableUtils.java => NameUtils.java} (81%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/{BaseEmulatorTest.java => EmulatorBaseTest.java} (83%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/{BigQueryBaseDockerConfiguration.java => EmulatorBigQueryBaseDockerConfiguration.java} (76%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/{BigQueryEmulatorItemReaderTest.java => EmulatorBigQueryItemReaderTest.java} (90%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/{BaseEmulatorItemReaderTest.java => EmulatorBaseItemReaderTest.java} (76%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/{BaseEmulatorItemWriterTest.java => EmulatorBaseItemWriterTest.java} (63%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/{BigQueryEmulatorCsvItemWriterTest.java => loadjob/csv/EmulatorBigQueryLoadJobCsvItemWriterTest.java} (77%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/{BigQueryEmulatorJsonItemWriterTest.java => loadjob/json/EmulatorBigQueryLoadJobJsonItemWriterTest.java} (81%) create mode 100644 spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiJsonItemWriterTest.java rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/{BaseBigQueryGcloudIntegrationTest.java => GcloudBaseBigQueryIntegrationTest.java} (93%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/{BigQueryGcloudItemReaderTest.java => GcloudBigQueryItemReaderTest.java} (87%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/{BaseBigQueryGcloudItemWriterTest.java => GcloudBaseBigQueryItemWriterTest.java} (92%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/{BigQueryGcloudCsvItemWriterTest.java => loadjob/csv/GcloudBigQueryLoadJobCsvItemWriterTest.java} (75%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/{BigQueryGcloudJsonItemWriterTest.java => loadjob/json/GcloudBigQueryLoadJobJsonItemWriterTest.java} (73%) create mode 100644 spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiJsonItemWriterTest.java rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/{BigQueryBaseItemWriterTest.java => loadjob/BigQueryLoadJobBaseItemWriterTest.java} (87%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/{BigQueryCsvItemWriterTest.java => loadjob/csv/BigQueryLoadJobCsvItemWriterTest.java} (87%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/{builder/BigQueryCsvItemWriterBuilderTests.java => loadjob/csv/builder/BigQueryLoadJobCsvItemWriterBuilderTests.java} (73%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/{BigQueryJsonItemWriterTest.java => loadjob/json/BigQueryLoadJobJsonItemWriterTest.java} (90%) rename spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/{builder/BigQueryJsonItemWriterBuilderTests.java => loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilderTests.java} (73%) create mode 100644 spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiJsonItemWriterTest.java create mode 100644 spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilderTest.java diff --git a/spring-batch-bigquery/README.adoc b/spring-batch-bigquery/README.adoc index a930b198..bf8db525 100644 --- a/spring-batch-bigquery/README.adoc +++ b/spring-batch-bigquery/README.adoc @@ -2,26 +2,41 @@ Spring Batch extension which contains an `ItemWriter` and `ItemReader` implementations for https://cloud.google.com/bigquery[BigQuery]. -`ItemWriter` supports next formats (https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs]): +`ItemWriter` support: + +[cols="h,1,1"] +|=== +| |https://cloud.google.com/bigquery/docs/batch-loading-data[Load job] |https://cloud.google.com/bigquery/docs/write-api#committed_type[Write API (Commited)] + +|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported +|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported | +|=== + +`ItemReader` support: + +[cols="h,1"] +|=== + +|https://en.wikipedia.org/wiki/JSON[JSON] |Supported +|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported +|=== -* https://en.wikipedia.org/wiki/Comma-separated_values[CSV] -* https://en.wikipedia.org/wiki/JSON[JSON] Based on https://github.com/googleapis/java-bigquery[Java BigQuery]. -== Example of `BigQueryCsvItemWriter` +== Example of `BigQueryLoadJobCsvItemWriter` [source,java] ---- @Bean -BigQueryCsvItemWriter bigQueryCsvWriter() { +BigQueryLoadJobCsvItemWriter bigQueryCsvWriter() { WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration .newBuilder(TableId.of("csv_dataset", "csv_table")) .setAutodetect(true) .setFormatOptions(FormatOptions.csv()) .build(); - return new BigQueryCsvItemWriterBuilder() + return new BigQueryLoadJobCsvItemWriterBuilder() .bigQuery(bigQueryService) .writeChannelConfig(writeConfiguration) .build(); diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java index 8f24b124..76c7deb1 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java @@ -17,6 +17,7 @@ package org.springframework.batch.extensions.bigquery.reader.builder; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.FieldValueList; import com.google.cloud.bigquery.QueryJobConfiguration; import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; @@ -113,7 +114,7 @@ public BigQueryQueryItemReaderBuilder targetType(final Class targetType) { public BigQueryQueryItemReader build() { final BigQueryQueryItemReader reader = new BigQueryQueryItemReader<>(); - reader.setBigQuery(this.bigQuery); + reader.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery); if (this.rowMapper == null) { Assert.notNull(this.targetType, "No target type provided"); diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java similarity index 93% rename from spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java rename to spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java index f47984c8..583ca3b0 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/BigQueryLoadJobBaseItemWriter.java @@ -14,11 +14,12 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.writer; +package org.springframework.batch.extensions.bigquery.writer.loadjob; import com.google.cloud.bigquery.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException; import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.InitializingBean; @@ -34,13 +35,13 @@ import java.util.function.Consumer; /** - * Base class that holds shared code for JSON and CSV writers. + * Base class that holds shared code for load job JSON and CSV writers. * * @param your DTO type * @author Volodymyr Perebykivskyi * @since 0.1.0 */ -public abstract class BigQueryBaseItemWriter implements ItemWriter, InitializingBean { +public abstract class BigQueryLoadJobBaseItemWriter implements ItemWriter, InitializingBean { /** Logger that can be reused */ protected final Log logger = LogFactory.getLog(getClass()); @@ -71,7 +72,7 @@ public abstract class BigQueryBaseItemWriter implements ItemWriter, Initia /** * Fetches table from the provided configuration. * - * @return {@link Table} that is described in {@link BigQueryBaseItemWriter#writeChannelConfig} + * @return {@link Table} that is described in {@link BigQueryLoadJobBaseItemWriter#writeChannelConfig} */ protected Table getTable() { return this.bigQuery.getTable(this.writeChannelConfig.getDestinationTable()); @@ -119,8 +120,8 @@ public void write(final Chunk chunk) throws Exception { final List items = chunk.getItems(); doInitializeProperties(items); - if (this.logger.isDebugEnabled()) { - this.logger.debug(String.format("Mapping %d elements", items.size())); + if (logger.isDebugEnabled()) { + logger.debug(String.format("Mapping %d elements", items.size())); } doWriteDataToBigQuery(mapDataToBigQueryFormat(items)); @@ -147,8 +148,8 @@ private ByteBuffer mapDataToBigQueryFormat(final List items) throws } private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Writing data to BigQuery"); + if (logger.isDebugEnabled()) { + logger.debug("Writing data to BigQuery"); } TableDataWriteChannel writeChannel = null; @@ -174,8 +175,8 @@ private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) { } } - if (this.logger.isDebugEnabled()) { - this.logger.debug(logMessage); + if (logger.isDebugEnabled()) { + logger.debug(logMessage); } } } diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/BigQueryLoadJobCsvItemWriter.java similarity index 93% rename from spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java rename to spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/BigQueryLoadJobCsvItemWriter.java index f4b2e7c9..aa5eb2de 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/BigQueryLoadJobCsvItemWriter.java @@ -14,13 +14,14 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.writer; +package org.springframework.batch.extensions.bigquery.writer.loadjob.csv; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.google.cloud.bigquery.FormatOptions; import com.google.cloud.bigquery.Table; +import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter; import org.springframework.core.convert.converter.Converter; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -37,7 +38,7 @@ * @since 0.2.0 * @see CSV */ -public class BigQueryCsvItemWriter extends BigQueryBaseItemWriter { +public class BigQueryLoadJobCsvItemWriter extends BigQueryLoadJobBaseItemWriter { private Converter rowMapper; private ObjectWriter objectWriter; diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryCsvItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/builder/BigQueryCsvItemWriterBuilder.java similarity index 76% rename from spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryCsvItemWriterBuilder.java rename to spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/builder/BigQueryCsvItemWriterBuilder.java index 73dbae3a..fa13ed8a 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryCsvItemWriterBuilder.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/csv/builder/BigQueryCsvItemWriterBuilder.java @@ -14,19 +14,20 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.writer.builder; +package org.springframework.batch.extensions.bigquery.writer.loadjob.csv.builder; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.WriteChannelConfiguration; -import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.BigQueryLoadJobCsvItemWriter; import org.springframework.core.convert.converter.Converter; import java.util.function.Consumer; /** - * A builder for {@link BigQueryCsvItemWriter}. + * A builder for {@link BigQueryLoadJobCsvItemWriter}. * * @param your DTO type * @author Volodymyr Perebykivskyi @@ -47,7 +48,7 @@ public class BigQueryCsvItemWriterBuilder { * * @param rowMapper your row mapper * @return {@link BigQueryCsvItemWriterBuilder} - * @see BigQueryCsvItemWriter#setRowMapper(Converter) + * @see BigQueryLoadJobCsvItemWriter#setRowMapper(Converter) */ public BigQueryCsvItemWriterBuilder rowMapper(Converter rowMapper) { this.rowMapper = rowMapper; @@ -59,7 +60,7 @@ public BigQueryCsvItemWriterBuilder rowMapper(Converter rowMapper) * * @param datasetInfo BigQuery dataset info * @return {@link BigQueryCsvItemWriterBuilder} - * @see BigQueryCsvItemWriter#setDatasetInfo(DatasetInfo) + * @see BigQueryLoadJobCsvItemWriter#setDatasetInfo(DatasetInfo) */ public BigQueryCsvItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) { this.datasetInfo = datasetInfo; @@ -71,7 +72,7 @@ public BigQueryCsvItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) { * * @param consumer your consumer * @return {@link BigQueryCsvItemWriterBuilder} - * @see BigQueryCsvItemWriter#setJobConsumer(Consumer) + * @see BigQueryLoadJobCsvItemWriter#setJobConsumer(Consumer) */ public BigQueryCsvItemWriterBuilder jobConsumer(Consumer consumer) { this.jobConsumer = consumer; @@ -83,7 +84,7 @@ public BigQueryCsvItemWriterBuilder jobConsumer(Consumer consumer) { * * @param configuration BigQuery channel configuration * @return {@link BigQueryCsvItemWriterBuilder} - * @see BigQueryCsvItemWriter#setWriteChannelConfig(WriteChannelConfiguration) + * @see BigQueryLoadJobCsvItemWriter#setWriteChannelConfig(WriteChannelConfiguration) */ public BigQueryCsvItemWriterBuilder writeChannelConfig(WriteChannelConfiguration configuration) { this.writeChannelConfig = configuration; @@ -95,7 +96,7 @@ public BigQueryCsvItemWriterBuilder writeChannelConfig(WriteChannelConfigurat * * @param bigQuery BigQuery service * @return {@link BigQueryCsvItemWriterBuilder} - * @see BigQueryCsvItemWriter#setBigQuery(BigQuery) + * @see BigQueryLoadJobCsvItemWriter#setBigQuery(BigQuery) */ public BigQueryCsvItemWriterBuilder bigQuery(BigQuery bigQuery) { this.bigQuery = bigQuery; @@ -103,17 +104,18 @@ public BigQueryCsvItemWriterBuilder bigQuery(BigQuery bigQuery) { } /** - * Please remember about {@link BigQueryCsvItemWriter#afterPropertiesSet()}. + * Please remember about {@link BigQueryLoadJobCsvItemWriter#afterPropertiesSet()}. * - * @return {@link BigQueryCsvItemWriter} + * @return {@link BigQueryLoadJobCsvItemWriter} */ - public BigQueryCsvItemWriter build() { - BigQueryCsvItemWriter writer = new BigQueryCsvItemWriter<>(); + public BigQueryLoadJobCsvItemWriter build() { + BigQueryLoadJobCsvItemWriter writer = new BigQueryLoadJobCsvItemWriter<>(); + + writer.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery); writer.setRowMapper(this.rowMapper); writer.setWriteChannelConfig(this.writeChannelConfig); writer.setJobConsumer(this.jobConsumer); - writer.setBigQuery(this.bigQuery); writer.setDatasetInfo(this.datasetInfo); return writer; diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java similarity index 87% rename from spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java rename to spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java index 9200f7b2..a557110c 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/BigQueryLoadJobJsonItemWriter.java @@ -14,10 +14,11 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.writer; +package org.springframework.batch.extensions.bigquery.writer.loadjob.json; import com.google.cloud.bigquery.FormatOptions; import com.google.cloud.bigquery.Table; +import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter; import org.springframework.batch.item.json.JsonObjectMarshaller; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -28,14 +29,14 @@ import java.util.function.Predicate; /** - * JSON writer for BigQuery. + * JSON writer for BigQuery using Load Job. * * @param your DTO type * @author Volodymyr Perebykivskyi * @since 0.2.0 * @see JSON */ -public class BigQueryJsonItemWriter extends BigQueryBaseItemWriter { +public class BigQueryLoadJobJsonItemWriter extends BigQueryLoadJobBaseItemWriter { private static final String LF = "\n"; @@ -59,12 +60,12 @@ protected List convertObjectsToByteArrays(List items) { @Override protected void performFormatSpecificChecks() { - Assert.notNull(this.marshaller, "Marshaller is mandatory"); + Assert.notNull(this.marshaller, "Marshaller must be provided"); Table table = getTable(); if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) { - if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) { + if (tableHasDefinedSchema(table) && logger.isWarnEnabled()) { logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side"); } } else { diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilder.java similarity index 53% rename from spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java rename to spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilder.java index 9814a00f..7f8c612a 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilder.java @@ -14,26 +14,28 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.writer.builder; +package org.springframework.batch.extensions.bigquery.writer.loadjob.json.builder; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.WriteChannelConfiguration; -import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.json.BigQueryLoadJobJsonItemWriter; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import org.springframework.batch.item.json.JsonObjectMarshaller; import java.util.function.Consumer; /** - * A builder for {@link BigQueryJsonItemWriter}. + * A builder for {@link BigQueryLoadJobJsonItemWriter}. * * @param your DTO type * @author Volodymyr Perebykivskyi * @since 0.2.0 - * @see Examples + * @see Examples */ -public class BigQueryJsonItemWriterBuilder { +public class BigQueryLoadJobJsonItemWriterBuilder { private JsonObjectMarshaller marshaller; private Consumer jobConsumer; @@ -45,10 +47,10 @@ public class BigQueryJsonItemWriterBuilder { * Converts your DTO into a {@link String}. * * @param marshaller your mapper - * @return {@link BigQueryJsonItemWriter} - * @see BigQueryJsonItemWriter#setMarshaller(JsonObjectMarshaller) + * @return {@link BigQueryLoadJobJsonItemWriterBuilder} + * @see BigQueryLoadJobJsonItemWriter#setMarshaller(JsonObjectMarshaller) */ - public BigQueryJsonItemWriterBuilder marshaller(JsonObjectMarshaller marshaller) { + public BigQueryLoadJobJsonItemWriterBuilder marshaller(JsonObjectMarshaller marshaller) { this.marshaller = marshaller; return this; } @@ -57,10 +59,10 @@ public BigQueryJsonItemWriterBuilder marshaller(JsonObjectMarshaller marsh * Provides additional information about the {@link com.google.cloud.bigquery.Dataset}. * * @param datasetInfo BigQuery dataset info - * @return {@link BigQueryJsonItemWriter} - * @see BigQueryJsonItemWriter#setDatasetInfo(DatasetInfo) + * @return {@link BigQueryLoadJobJsonItemWriterBuilder} + * @see BigQueryLoadJobJsonItemWriter#setDatasetInfo(DatasetInfo) */ - public BigQueryJsonItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) { + public BigQueryLoadJobJsonItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) { this.datasetInfo = datasetInfo; return this; } @@ -69,10 +71,10 @@ public BigQueryJsonItemWriterBuilder datasetInfo(DatasetInfo datasetInfo) { * Callback when {@link Job} will be finished. * * @param consumer your consumer - * @return {@link BigQueryJsonItemWriter} - * @see BigQueryJsonItemWriter#setJobConsumer(Consumer) + * @return {@link BigQueryLoadJobJsonItemWriterBuilder} + * @see BigQueryLoadJobJsonItemWriter#setJobConsumer(Consumer) */ - public BigQueryJsonItemWriterBuilder jobConsumer(Consumer consumer) { + public BigQueryLoadJobJsonItemWriterBuilder jobConsumer(Consumer consumer) { this.jobConsumer = consumer; return this; } @@ -81,10 +83,10 @@ public BigQueryJsonItemWriterBuilder jobConsumer(Consumer consumer) { * Describes what should be written (format) and its destination (table). * * @param configuration BigQuery channel configuration - * @return {@link BigQueryJsonItemWriter} - * @see BigQueryJsonItemWriter#setWriteChannelConfig(WriteChannelConfiguration) + * @return {@link BigQueryLoadJobJsonItemWriterBuilder} + * @see BigQueryLoadJobJsonItemWriter#setWriteChannelConfig(WriteChannelConfiguration) */ - public BigQueryJsonItemWriterBuilder writeChannelConfig(WriteChannelConfiguration configuration) { + public BigQueryLoadJobJsonItemWriterBuilder writeChannelConfig(WriteChannelConfiguration configuration) { this.writeChannelConfig = configuration; return this; } @@ -93,26 +95,27 @@ public BigQueryJsonItemWriterBuilder writeChannelConfig(WriteChannelConfigura * BigQuery service, responsible for API calls. * * @param bigQuery BigQuery service - * @return {@link BigQueryJsonItemWriter} - * @see BigQueryJsonItemWriter#setBigQuery(BigQuery) + * @return {@link BigQueryLoadJobJsonItemWriter} + * @see BigQueryLoadJobJsonItemWriter#setBigQuery(BigQuery) */ - public BigQueryJsonItemWriterBuilder bigQuery(BigQuery bigQuery) { + public BigQueryLoadJobJsonItemWriterBuilder bigQuery(BigQuery bigQuery) { this.bigQuery = bigQuery; return this; } /** - * Please remember about {@link BigQueryJsonItemWriter#afterPropertiesSet()}. + * Please remember about {@link BigQueryLoadJobJsonItemWriter#afterPropertiesSet()}. * - * @return {@link BigQueryJsonItemWriter} + * @return {@link BigQueryLoadJobJsonItemWriter} */ - public BigQueryJsonItemWriter build() { - BigQueryJsonItemWriter writer = new BigQueryJsonItemWriter<>(); + public BigQueryLoadJobJsonItemWriter build() { + BigQueryLoadJobJsonItemWriter writer = new BigQueryLoadJobJsonItemWriter<>(); + + writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller); + writer.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery); - writer.setMarshaller(this.marshaller); writer.setWriteChannelConfig(this.writeChannelConfig); writer.setJobConsumer(this.jobConsumer); - writer.setBigQuery(this.bigQuery); writer.setDatasetInfo(this.datasetInfo); return writer; diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java new file mode 100644 index 00000000..77e882a8 --- /dev/null +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/loadjob/package-info.java @@ -0,0 +1,30 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link com.google.cloud.bigquery.JobConfiguration.Type#LOAD} {@link com.google.cloud.bigquery.Job} + * + *

Supported formats: + *

    + *
  • JSON
  • + *
  • CSV
  • + *
+ * + *

If you generate {@link com.google.cloud.bigquery.TableDataWriteChannel} + * and you {@link com.google.cloud.bigquery.TableDataWriteChannel#close()} it, + * there is no guarantee that single {@link com.google.cloud.bigquery.Job} will be created. + */ +package org.springframework.batch.extensions.bigquery.writer.loadjob; \ No newline at end of file diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java index 1f25f8be..48c3746a 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java @@ -18,17 +18,8 @@ * Google BigQuery related functionality. *

* These writers use a Java client from Google, so we cannot control this flow fully. - * Take into account that this writer produces {@link com.google.cloud.bigquery.JobConfiguration.Type#LOAD} {@link com.google.cloud.bigquery.Job}. - * - *

Supported formats: - *

    - *
  • JSON
  • - *
  • CSV
  • - *
- * - *

For example if you generate {@link com.google.cloud.bigquery.TableDataWriteChannel} and you {@link com.google.cloud.bigquery.TableDataWriteChannel#close()} it, - * there is no guarantee that single {@link com.google.cloud.bigquery.Job} will be created. *

+ * * Take into account that BigQuery has rate limits, and it is very easy to exceed those in concurrent environment. *

* Also, worth mentioning that you should ensure ordering of the fields in DTO that you are going to send to the BigQuery. diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiJsonItemWriter.java new file mode 100644 index 00000000..7311cbd1 --- /dev/null +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiJsonItemWriter.java @@ -0,0 +1,182 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.writer.writeapi.json; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.json.JSONArray; +import org.json.JSONObject; +import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.json.JsonObjectMarshaller; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; + +/** + * JSON writer for BigQuery using Storage Write API. + * + * @param your DTO type + * @author Volodymyr Perebykivskyi + * @see JSON + * @see Commited type storage write API + * @since 0.2.0 + */ +public class BigQueryWriteApiJsonItemWriter implements ItemWriter, InitializingBean { + + /** + * Logger that can be reused + */ + private final Log logger = LogFactory.getLog(getClass()); + + private final AtomicLong bigQueryWriteCounter = new AtomicLong(); + + private BigQueryWriteClient bigQueryWriteClient; + private TableName tableName; + private JsonObjectMarshaller marshaller; + private ApiFutureCallback apiFutureCallback; + private Executor executor; + + private boolean writeFailed; + + @Override + public void write(final Chunk chunk) throws Exception { + if (!chunk.isEmpty()) { + final List items = chunk.getItems(); + String streamName = null; + + try { + WriteStream writeStreamToCreate = WriteStream.newBuilder() + .setType(WriteStream.Type.COMMITTED) + .build(); + + CreateWriteStreamRequest createStreamRequest = CreateWriteStreamRequest.newBuilder() + .setParent(tableName.toString()) + .setWriteStream(writeStreamToCreate) + .build(); + + WriteStream writeStream = bigQueryWriteClient.createWriteStream(createStreamRequest); + streamName = writeStream.getName(); + + if (logger.isDebugEnabled()) { + logger.debug("Created a stream=" + streamName); + } + + try (final JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), bigQueryWriteClient).build()) { + if (logger.isDebugEnabled()) { + logger.debug(String.format("Mapping %d elements", items.size())); + } + final JSONArray array = new JSONArray(); + items.stream().map(marshaller::marshal).map(JSONObject::new).forEach(array::put); + + if (logger.isDebugEnabled()) { + logger.debug("Writing data to BigQuery"); + } + final ApiFuture future = writer.append(array); + + if (apiFutureCallback != null) { + ApiFutures.addCallback(future, apiFutureCallback, executor); + } + } + } catch (Exception e) { + writeFailed = true; + logger.error("BigQuery error", e); + throw new BigQueryItemWriterException("Error on write happened", e); + } finally { + if (StringUtils.hasText(streamName)) { + bigQueryWriteClient.finalizeWriteStream(streamName); + } + + if (!writeFailed && logger.isDebugEnabled()) { + logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet()); + } + } + } + } + + @Override + public void afterPropertiesSet() throws Exception { + Assert.notNull(this.bigQueryWriteClient, "BigQuery write client must be provided"); + Assert.notNull(this.tableName, "Table name must be provided"); + Assert.notNull(this.marshaller, "Marshaller must be provided"); + + if (this.apiFutureCallback != null) { + Assert.notNull(this.executor, "Executor must be provided"); + } + } + + /** + * GRPC client that wraps communication with BigQuery. + * + * @param bigQueryWriteClient a client + */ + public void setBigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) { + this.bigQueryWriteClient = bigQueryWriteClient; + } + + /** + * A full path to the BigQuery table. + * + * @param tableName a name + */ + public void setTableName(final TableName tableName) { + this.tableName = tableName; + } + + /** + * Converter that transforms a single row into a {@link String}. + * + * @param marshaller your JSON mapper + */ + public void setMarshaller(final JsonObjectMarshaller marshaller) { + this.marshaller = marshaller; + } + + /** + * {@link ApiFutureCallback} that will be called in case of successful of failed response. + * + * @param apiFutureCallback a callback + * @see BigQueryWriteApiJsonItemWriter#setExecutor(Executor) + */ + public void setApiFutureCallback(final ApiFutureCallback apiFutureCallback) { + this.apiFutureCallback = apiFutureCallback; + } + + /** + * An {@link Executor} that will be calling a {@link ApiFutureCallback}. + * + * @param executor an executor + * @see BigQueryWriteApiJsonItemWriter#setApiFutureCallback(ApiFutureCallback) + */ + public void setExecutor(final Executor executor) { + this.executor = executor; + } +} diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilder.java new file mode 100644 index 00000000..92caf181 --- /dev/null +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilder.java @@ -0,0 +1,130 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.writer.writeapi.json.builder; + +import com.google.api.core.ApiFutureCallback; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.common.util.concurrent.MoreExecutors; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiJsonItemWriter; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; +import org.springframework.batch.item.json.JsonObjectMarshaller; + +import java.io.IOException; +import java.util.concurrent.Executor; + +/** + * A builder for {@link BigQueryWriteApiJsonItemWriter}. + * + * @param your DTO type + * @author Volodymyr Perebykivskyi + * @since 0.2.0 + * @see Examples + */ +public class BigQueryWriteApiJsonItemWriterBuilder { + + private BigQueryWriteClient bigQueryWriteClient; + private TableName tableName; + private JsonObjectMarshaller marshaller; + private ApiFutureCallback apiFutureCallback; + private Executor executor; + + /** + * GRPC client that will be responsible for communication with BigQuery. + * + * @param bigQueryWriteClient a client + * @return {@link BigQueryWriteApiJsonItemWriterBuilder} + * @see BigQueryWriteApiJsonItemWriter#setBigQueryWriteClient(BigQueryWriteClient) + */ + public BigQueryWriteApiJsonItemWriterBuilder bigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) { + this.bigQueryWriteClient = bigQueryWriteClient; + return this; + } + + /** + * A table name along with a full path. + * + * @param tableName a name + * @return {@link BigQueryWriteApiJsonItemWriterBuilder} + * @see BigQueryWriteApiJsonItemWriter#setTableName(TableName) + */ + public BigQueryWriteApiJsonItemWriterBuilder tableName(final TableName tableName) { + this.tableName = tableName; + return this; + } + + /** + * Converts your DTO into a {@link String}. + * + * @param marshaller your mapper + * @return {@link BigQueryWriteApiJsonItemWriterBuilder} + * @see BigQueryWriteApiJsonItemWriter#setMarshaller(JsonObjectMarshaller) + */ + public BigQueryWriteApiJsonItemWriterBuilder marshaller(final JsonObjectMarshaller marshaller) { + this.marshaller = marshaller; + return this; + } + + /** + * A {@link ApiFutureCallback} that will be called on successful or failed event. + * + * @param apiFutureCallback a callback + * @return {@link BigQueryWriteApiJsonItemWriterBuilder} + * @see BigQueryWriteApiJsonItemWriter#setApiFutureCallback(ApiFutureCallback) + */ + public BigQueryWriteApiJsonItemWriterBuilder apiFutureCallback(final ApiFutureCallback apiFutureCallback) { + this.apiFutureCallback = apiFutureCallback; + return this; + } + + /** + * {@link Executor} that will be used for {@link ApiFutureCallback}. + * + * @param executor an executor + * @return {@link BigQueryWriteApiJsonItemWriterBuilder} + * @see BigQueryWriteApiJsonItemWriter#setExecutor(Executor) + * @see BigQueryWriteApiJsonItemWriter#setApiFutureCallback(ApiFutureCallback) + */ + public BigQueryWriteApiJsonItemWriterBuilder executor(final Executor executor) { + this.executor = executor; + return this; + } + + /** + * Please remember about {@link BigQueryWriteApiJsonItemWriter#afterPropertiesSet()}. + * + * @return {@link BigQueryWriteApiJsonItemWriter} + * @throws IOException in case when {@link BigQueryWriteClient} failed to be created automatically + */ + public BigQueryWriteApiJsonItemWriter build() throws IOException { + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + + writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller); + writer.setBigQueryWriteClient(this.bigQueryWriteClient == null ? BigQueryWriteClient.create() : this.bigQueryWriteClient); + + if (apiFutureCallback != null) { + writer.setApiFutureCallback(apiFutureCallback); + writer.setExecutor(this.executor == null ? MoreExecutors.directExecutor() : this.executor); + } + + writer.setTableName(tableName); + + return writer; + } + +} diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java new file mode 100644 index 00000000..e48e6ade --- /dev/null +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java @@ -0,0 +1,30 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link com.google.cloud.bigquery.storage.v1.WriteStream} + * + * Supported types: + *

    + *
  • {@link com.google.cloud.bigquery.storage.v1.WriteStream.Type#COMMITTED}
  • + *
+ * + * Supported formats: + *
    + *
  • JSON
  • + *
+ */ +package org.springframework.batch.extensions.bigquery.writer.writeapi; \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TableUtils.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/NameUtils.java similarity index 81% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TableUtils.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/NameUtils.java index a90c16e3..73fb713a 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TableUtils.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/NameUtils.java @@ -18,10 +18,12 @@ import java.util.concurrent.ThreadLocalRandom; -public final class TableUtils { - private TableUtils() {} +public final class NameUtils { + private static final String DASH = "-"; + private NameUtils() {} public static String generateTableName(String testType) { - return testType + "-" + ThreadLocalRandom.current().nextInt(100); + return testType + DASH + ThreadLocalRandom.current().nextInt(100); } + } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java index 15069e12..fec598ac 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java @@ -20,6 +20,8 @@ import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableSchema; @JsonPropertyOrder(value = {TestConstants.NAME, TestConstants.AGE}) public record PersonDto(String name, Integer age) { @@ -30,4 +32,18 @@ public static Schema getBigQuerySchema() { return Schema.of(nameField, ageField); } + public static TableSchema getWriteApiSchema() { + TableFieldSchema name = TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.STRING) + .setName(TestConstants.NAME) + .setMode(TableFieldSchema.Mode.REQUIRED) + .build(); + TableFieldSchema age = TableFieldSchema.newBuilder() + .setType(TableFieldSchema.Type.INT64) + .setName(TestConstants.AGE) + .setMode(TableFieldSchema.Mode.REQUIRED) + .build(); + return TableSchema.newBuilder().addFields(name).addFields(age).build(); + } + } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java index 0e37bd37..c8809fa3 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java @@ -29,6 +29,8 @@ public final class TestConstants { private TestConstants() {} public static final String DATASET = "spring_batch_extensions"; + public static final String PROJECT = "batch-test"; + public static final String NAME = "name"; public static final String AGE = "age"; public static final String CSV = "csv"; diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BaseEmulatorTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/EmulatorBaseTest.java similarity index 83% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BaseEmulatorTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/EmulatorBaseTest.java index 07cd75a6..de7b76eb 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BaseEmulatorTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/EmulatorBaseTest.java @@ -18,20 +18,19 @@ import com.google.cloud.NoCredentials; import com.google.cloud.bigquery.BigQueryOptions; +import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.testcontainers.containers.GenericContainer; -public abstract class BaseEmulatorTest { - - protected static final String PROJECT = "batch-test"; +public abstract class EmulatorBaseTest { protected static BigQueryOptions.Builder prepareBigQueryBuilder() { return BigQueryOptions .newBuilder() - .setProjectId(PROJECT) + .setProjectId(TestConstants.PROJECT) .setCredentials(NoCredentials.getInstance()); } protected static String getBigQueryUrl(GenericContainer container) { - return "http://%s:%d".formatted(container.getHost(), container.getMappedPort(BigQueryBaseDockerConfiguration.PORT)); + return "http://%s:%d".formatted(container.getHost(), container.getMappedPort(EmulatorBigQueryBaseDockerConfiguration.REST_PORT)); } } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BigQueryBaseDockerConfiguration.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/EmulatorBigQueryBaseDockerConfiguration.java similarity index 76% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BigQueryBaseDockerConfiguration.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/EmulatorBigQueryBaseDockerConfiguration.java index d5705c7b..b09e5f08 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BigQueryBaseDockerConfiguration.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/EmulatorBigQueryBaseDockerConfiguration.java @@ -18,12 +18,13 @@ import org.testcontainers.containers.GenericContainer; -public final class BigQueryBaseDockerConfiguration { +public final class EmulatorBigQueryBaseDockerConfiguration { - public static final int PORT = 9050; + public static final int REST_PORT = 9050; + public static final int GRPC_PORT = 9060; public static final GenericContainer CONTAINER = new GenericContainer<>("ghcr.io/goccy/bigquery-emulator:0.6.6") - .withExposedPorts(PORT); + .withExposedPorts(REST_PORT, GRPC_PORT); - private BigQueryBaseDockerConfiguration() {} + private EmulatorBigQueryBaseDockerConfiguration() {} } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BigQueryEmulatorItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/EmulatorBigQueryItemReaderTest.java similarity index 90% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BigQueryEmulatorItemReaderTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/EmulatorBigQueryItemReaderTest.java index f85b717a..cefe4bd3 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BigQueryEmulatorItemReaderTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/EmulatorBigQueryItemReaderTest.java @@ -21,17 +21,19 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.emulator.reader.base.BaseEmulatorItemReaderTest; +import org.springframework.batch.extensions.bigquery.emulator.reader.base.EmulatorBaseItemReaderTest; import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder; -class BigQueryEmulatorItemReaderTest extends BaseEmulatorItemReaderTest { +class EmulatorBigQueryItemReaderTest extends EmulatorBaseItemReaderTest { + + private static final TableId TABLE_ID = TableId.of(TestConstants.DATASET, TestConstants.CSV); @Test void testBatchReader() throws Exception { QueryJobConfiguration jobConfiguration = QueryJobConfiguration .newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.csv p ORDER BY p.name LIMIT 2") - .setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV)) + .setDestinationTable(TABLE_ID) .setPriority(QueryJobConfiguration.Priority.BATCH) .build(); @@ -50,7 +52,7 @@ void testBatchReader() throws Exception { void testInteractiveReader() throws Exception { QueryJobConfiguration jobConfiguration = QueryJobConfiguration .newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.csv p ORDER BY p.name LIMIT 2") - .setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV)) + .setDestinationTable(TABLE_ID) .build(); BigQueryQueryItemReader reader = new BigQueryQueryItemReaderBuilder() diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/BaseEmulatorItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/EmulatorBaseItemReaderTest.java similarity index 76% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/BaseEmulatorItemReaderTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/EmulatorBaseItemReaderTest.java index cdbca8fd..3da22ba2 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/BaseEmulatorItemReaderTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/EmulatorBaseItemReaderTest.java @@ -18,19 +18,20 @@ import com.google.cloud.bigquery.BigQuery; import org.junit.jupiter.api.BeforeAll; -import org.springframework.batch.extensions.bigquery.emulator.base.BaseEmulatorTest; -import org.springframework.batch.extensions.bigquery.emulator.base.BigQueryBaseDockerConfiguration; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.emulator.base.EmulatorBaseTest; +import org.springframework.batch.extensions.bigquery.emulator.base.EmulatorBigQueryBaseDockerConfiguration; import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.MountableFile; @Testcontainers -public abstract class BaseEmulatorItemReaderTest extends BaseEmulatorTest { +public abstract class EmulatorBaseItemReaderTest extends EmulatorBaseTest { @Container - private static final GenericContainer CONTAINER = BigQueryBaseDockerConfiguration.CONTAINER - .withCommand("--project=" + PROJECT, "--log-level=debug", "--data-from-yaml=/reader-test.yaml") + private static final GenericContainer CONTAINER = EmulatorBigQueryBaseDockerConfiguration.CONTAINER + .withCommand("--project=" + TestConstants.PROJECT, "--log-level=debug", "--data-from-yaml=/reader-test.yaml") .withCopyFileToContainer(MountableFile.forClasspathResource("reader-test.yaml"), "/reader-test.yaml"); protected static BigQuery bigQuery; diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/BaseEmulatorItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/EmulatorBaseItemWriterTest.java similarity index 63% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/BaseEmulatorItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/EmulatorBaseItemWriterTest.java index 2b4fb92d..fbc9ea04 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/BaseEmulatorItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/EmulatorBaseItemWriterTest.java @@ -19,11 +19,17 @@ import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import io.grpc.ManagedChannelBuilder; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.springframework.batch.extensions.bigquery.emulator.base.BaseEmulatorTest; -import org.springframework.batch.extensions.bigquery.emulator.base.BigQueryBaseDockerConfiguration; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.emulator.base.EmulatorBaseTest; +import org.springframework.batch.extensions.bigquery.emulator.base.EmulatorBigQueryBaseDockerConfiguration; import org.springframework.core.io.ClassPathResource; import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; @@ -35,14 +41,16 @@ import java.util.logging.LogManager; @Testcontainers -public abstract class BaseEmulatorItemWriterTest extends BaseEmulatorTest { +public abstract class EmulatorBaseItemWriterTest extends EmulatorBaseTest { @Container - private static final GenericContainer BIG_QUERY_CONTAINER = BigQueryBaseDockerConfiguration.CONTAINER - .withCommand("--project=" + PROJECT, "--log-level=debug", "--data-from-yaml=/writer-test.yaml", "--database=/test-db") + protected static final GenericContainer BIG_QUERY_CONTAINER = EmulatorBigQueryBaseDockerConfiguration.CONTAINER + .withCommand("--project=" + TestConstants.PROJECT, "--log-level=debug", "--data-from-yaml=/writer-test.yaml", "--database=/test-db") .withCopyFileToContainer(MountableFile.forClasspathResource("writer-test.yaml"), "/writer-test.yaml"); protected static BigQuery bigQuery; + protected static BigQueryWriteClient bigQueryWriteClient; + private static WireMockServer wireMockServer; static { @@ -54,7 +62,7 @@ public abstract class BaseEmulatorItemWriterTest extends BaseEmulatorTest { } @BeforeAll - static void setupAll() { + static void setupAll() throws IOException { SpyResponseExtension extension = new SpyResponseExtension(); wireMockServer = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort().extensions(extension)); @@ -66,6 +74,17 @@ static void setupAll() { ); bigQuery = prepareBigQueryBuilder().setHost(wireMockServer.baseUrl()).build().getService(); + + InstantiatingGrpcChannelProvider grpc = BigQueryWriteSettings + .defaultGrpcTransportProviderBuilder() + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build(); + BigQueryWriteSettings settings = BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setEndpoint("127.0.0.1:" + BIG_QUERY_CONTAINER.getMappedPort(EmulatorBigQueryBaseDockerConfiguration.GRPC_PORT)) + .setTransportChannelProvider(grpc) + .build(); + bigQueryWriteClient = BigQueryWriteClient.create(settings); } @AfterAll diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/SpyResponseExtension.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/SpyResponseExtension.java index da1c9efd..554c8c7a 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/SpyResponseExtension.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/SpyResponseExtension.java @@ -20,7 +20,7 @@ import com.github.tomakehurst.wiremock.http.HttpHeader; import com.github.tomakehurst.wiremock.http.Response; import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import org.springframework.batch.extensions.bigquery.emulator.base.BigQueryBaseDockerConfiguration; +import org.springframework.batch.extensions.bigquery.emulator.base.EmulatorBigQueryBaseDockerConfiguration; import wiremock.com.google.common.net.HttpHeaders; import java.util.List; @@ -29,7 +29,7 @@ public final class SpyResponseExtension implements ResponseTransformerV2 { private static final String BQ_DOCKER_URL_PREFIX = "http://0.0.0.0:"; - private static final String BQ_DOCKER_URL = BQ_DOCKER_URL_PREFIX + BigQueryBaseDockerConfiguration.PORT; + private static final String BQ_DOCKER_URL = BQ_DOCKER_URL_PREFIX + EmulatorBigQueryBaseDockerConfiguration.REST_PORT; private int wireMockPort; diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/csv/EmulatorBigQueryLoadJobCsvItemWriterTest.java similarity index 77% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorCsvItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/csv/EmulatorBigQueryLoadJobCsvItemWriterTest.java index 009078af..127d741e 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorCsvItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/csv/EmulatorBigQueryLoadJobCsvItemWriterTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.emulator.writer; +package org.springframework.batch.extensions.bigquery.emulator.writer.loadjob.csv; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.FormatOptions; @@ -24,14 +24,14 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.ResultVerifier; -import org.springframework.batch.extensions.bigquery.common.TableUtils; +import org.springframework.batch.extensions.bigquery.common.NameUtils; import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.emulator.writer.base.BaseEmulatorItemWriterTest; -import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; -import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder; +import org.springframework.batch.extensions.bigquery.emulator.writer.base.EmulatorBaseItemWriterTest; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.BigQueryLoadJobCsvItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.builder.BigQueryCsvItemWriterBuilder; import org.springframework.batch.item.Chunk; -class BigQueryEmulatorCsvItemWriterTest extends BaseEmulatorItemWriterTest { +class EmulatorBigQueryLoadJobCsvItemWriterTest extends EmulatorBaseItemWriterTest { // TODO find out why data is not persisted into the sqlite database // at the same time it works fine with json/insertAll job/yaml file @@ -39,7 +39,7 @@ class BigQueryEmulatorCsvItemWriterTest extends BaseEmulatorItemWriterTest { @Test @Disabled("Not working at the moment") void testWrite() throws Exception { - TableId tableId = TableId.of(TestConstants.DATASET, TableUtils.generateTableName(TestConstants.CSV)); + TableId tableId = TableId.of(TestConstants.DATASET, NameUtils.generateTableName(TestConstants.CSV)); Chunk expectedChunk = Chunk.of(new PersonDto("Ivan", 30)); WriteChannelConfiguration channelConfig = WriteChannelConfiguration @@ -48,7 +48,7 @@ void testWrite() throws Exception { .setSchema(PersonDto.getBigQuerySchema()) .build(); - BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() + BigQueryLoadJobCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() .bigQuery(bigQuery) .writeChannelConfig(channelConfig) .build(); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/json/EmulatorBigQueryLoadJobJsonItemWriterTest.java similarity index 81% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/json/EmulatorBigQueryLoadJobJsonItemWriterTest.java index 0e6d7bad..038d94b2 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/loadjob/json/EmulatorBigQueryLoadJobJsonItemWriterTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.emulator.writer; +package org.springframework.batch.extensions.bigquery.emulator.writer.loadjob.json; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.FormatOptions; @@ -25,17 +25,17 @@ import org.junit.jupiter.params.provider.MethodSource; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.ResultVerifier; -import org.springframework.batch.extensions.bigquery.common.TableUtils; +import org.springframework.batch.extensions.bigquery.common.NameUtils; import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.emulator.writer.base.BaseEmulatorItemWriterTest; -import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter; -import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder; +import org.springframework.batch.extensions.bigquery.emulator.writer.base.EmulatorBaseItemWriterTest; +import org.springframework.batch.extensions.bigquery.writer.loadjob.json.BigQueryLoadJobJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.json.builder.BigQueryLoadJobJsonItemWriterBuilder; import org.springframework.batch.item.Chunk; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import java.util.stream.Stream; -class BigQueryEmulatorJsonItemWriterTest extends BaseEmulatorItemWriterTest { +class EmulatorBigQueryLoadJobJsonItemWriterTest extends EmulatorBaseItemWriterTest { @ParameterizedTest @MethodSource("tables") @@ -50,7 +50,7 @@ void testWrite(String table, boolean autodetect) throws Exception { .setAutodetect(autodetect) .build(); - BigQueryJsonItemWriter writer = new BigQueryJsonItemWriterBuilder() + BigQueryLoadJobJsonItemWriter writer = new BigQueryLoadJobJsonItemWriterBuilder() .bigQuery(bigQuery) .writeChannelConfig(channelConfig) .marshaller(new JacksonJsonObjectMarshaller<>()) @@ -64,9 +64,9 @@ void testWrite(String table, boolean autodetect) throws Exception { private static Stream tables() { return Stream.of( - Arguments.of(TableUtils.generateTableName(TestConstants.JSON), false), + Arguments.of(NameUtils.generateTableName(TestConstants.JSON), false), - // TODO auto detect is broken on big query contained side? + // TODO auto detect is broken on big query container side? // Arguments.of(TableUtils.generateTableName(TestConstants.JSON), true), Arguments.of(TestConstants.JSON, false) diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiJsonItemWriterTest.java new file mode 100644 index 00000000..c3f42c2a --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiJsonItemWriterTest.java @@ -0,0 +1,56 @@ +package org.springframework.batch.extensions.bigquery.emulator.writer.writeapi.json; + +import com.google.api.core.ApiFutureCallback; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.TableName; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.batch.extensions.bigquery.common.NameUtils; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.ResultVerifier; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.emulator.writer.base.EmulatorBaseItemWriterTest; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiJsonItemWriter; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +class EmulatorBigQueryWriteApiJsonItemWriterTest extends EmulatorBaseItemWriterTest { + + @Test + void testWrite() throws Exception { + AtomicBoolean consumerCalled = new AtomicBoolean(); + TableId tableId = TableId.of(TestConstants.PROJECT, TestConstants.DATASET, NameUtils.generateTableName(TestConstants.JSON)); + TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema()); + bigQuery.create(TableInfo.of(tableId, tableDefinition)); + + Chunk expected = TestConstants.CHUNK; + + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + writer.setBigQueryWriteClient(bigQueryWriteClient); + writer.setTableName(TableName.of(tableId.getProject(), tableId.getDataset(), tableId.getTable())); + writer.setMarshaller(new JacksonJsonObjectMarshaller<>()); + writer.setApiFutureCallback(new ApiFutureCallback<>() { + @Override + public void onFailure(Throwable t) {} + + @Override + public void onSuccess(AppendRowsResponse result) { + consumerCalled.set(true); + } + }); + writer.setExecutor(Executors.newSingleThreadExecutor()); + + writer.write(expected); + + ResultVerifier.verifyTableResult(expected, bigQuery.listTableData(tableId)); + Assertions.assertTrue(consumerCalled.get()); + } + +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/BaseBigQueryGcloudIntegrationTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/GcloudBaseBigQueryIntegrationTest.java similarity index 93% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/BaseBigQueryGcloudIntegrationTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/GcloudBaseBigQueryIntegrationTest.java index ee2432ec..4fc0ca44 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/BaseBigQueryGcloudIntegrationTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/GcloudBaseBigQueryIntegrationTest.java @@ -19,6 +19,6 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; -public abstract class BaseBigQueryGcloudIntegrationTest { +public abstract class GcloudBaseBigQueryIntegrationTest { protected static final BigQuery BIG_QUERY = BigQueryOptions.getDefaultInstance().getService(); } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/BigQueryGcloudItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/GcloudBigQueryItemReaderTest.java similarity index 87% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/BigQueryGcloudItemReaderTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/GcloudBigQueryItemReaderTest.java index f2ff4f8c..63ce46fc 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/BigQueryGcloudItemReaderTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/GcloudBigQueryItemReaderTest.java @@ -31,15 +31,17 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.gcloud.base.BaseBigQueryGcloudIntegrationTest; +import org.springframework.batch.extensions.bigquery.gcloud.base.GcloudBaseBigQueryIntegrationTest; import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder; -import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; -import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.BigQueryLoadJobCsvItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.builder.BigQueryCsvItemWriterBuilder; import java.util.concurrent.atomic.AtomicReference; -class BigQueryGcloudItemReaderTest extends BaseBigQueryGcloudIntegrationTest { +class GcloudBigQueryItemReaderTest extends GcloudBaseBigQueryIntegrationTest { + + private static final TableId TABLE_ID = TableId.of(TestConstants.DATASET, TestConstants.CSV); @BeforeAll static void init() throws Exception { @@ -49,7 +51,7 @@ static void init() throws Exception { if (BIG_QUERY.getTable(TestConstants.DATASET, TestConstants.CSV) == null) { TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema()); - BIG_QUERY.create(TableInfo.of(TableId.of(TestConstants.DATASET, TestConstants.CSV), tableDefinition)); + BIG_QUERY.create(TableInfo.of(TABLE_ID, tableDefinition)); } loadCsvSample(); @@ -57,14 +59,14 @@ static void init() throws Exception { @AfterAll static void cleanupTest() { - BIG_QUERY.delete(TableId.of(TestConstants.DATASET, TestConstants.CSV)); + BIG_QUERY.delete(TABLE_ID); } @Test void testBatchQuery() throws Exception { QueryJobConfiguration jobConfiguration = QueryJobConfiguration .newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.%s p ORDER BY p.name LIMIT 2".formatted(TestConstants.CSV)) - .setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV)) + .setDestinationTable(TABLE_ID) .setPriority(QueryJobConfiguration.Priority.BATCH) .build(); @@ -116,13 +118,13 @@ private static void loadCsvSample() throws Exception { AtomicReference job = new AtomicReference<>(); WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(TestConstants.DATASET, TestConstants.CSV)) + .newBuilder(TABLE_ID) .setSchema(PersonDto.getBigQuerySchema()) .setAutodetect(false) .setFormatOptions(FormatOptions.csv()) .build(); - BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() + BigQueryLoadJobCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() .bigQuery(BIG_QUERY) .writeChannelConfig(channelConfiguration) .jobConsumer(job::set) diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BaseBigQueryGcloudItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/GcloudBaseBigQueryItemWriterTest.java similarity index 92% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BaseBigQueryGcloudItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/GcloudBaseBigQueryItemWriterTest.java index ce4f63ac..52f471fd 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BaseBigQueryGcloudItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/GcloudBaseBigQueryItemWriterTest.java @@ -24,9 +24,9 @@ import org.junit.jupiter.api.Assertions; import org.springframework.batch.extensions.bigquery.common.ResultVerifier; import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.gcloud.base.BaseBigQueryGcloudIntegrationTest; +import org.springframework.batch.extensions.bigquery.gcloud.base.GcloudBaseBigQueryIntegrationTest; -abstract class BaseBigQueryGcloudItemWriterTest extends BaseBigQueryGcloudIntegrationTest { +public abstract class GcloudBaseBigQueryItemWriterTest extends GcloudBaseBigQueryIntegrationTest { protected void verifyResults(String tableName) { Dataset dataset = BIG_QUERY.getDataset(TestConstants.DATASET); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/csv/GcloudBigQueryLoadJobCsvItemWriterTest.java similarity index 75% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudCsvItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/csv/GcloudBigQueryLoadJobCsvItemWriterTest.java index 5ff3638e..1c0e6007 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudCsvItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/csv/GcloudBigQueryLoadJobCsvItemWriterTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.gcloud.writer; +package org.springframework.batch.extensions.bigquery.gcloud.writer.loadjob.csv; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.FormatOptions; @@ -30,15 +30,18 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.springframework.batch.extensions.bigquery.common.PersonDto; -import org.springframework.batch.extensions.bigquery.common.TableUtils; +import org.springframework.batch.extensions.bigquery.common.NameUtils; import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; -import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder; +import org.springframework.batch.extensions.bigquery.gcloud.writer.GcloudBaseBigQueryItemWriterTest; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.BigQueryLoadJobCsvItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.builder.BigQueryCsvItemWriterBuilder; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; -class BigQueryGcloudCsvItemWriterTest extends BaseBigQueryGcloudItemWriterTest { +class GcloudBigQueryLoadJobCsvItemWriterTest extends GcloudBaseBigQueryItemWriterTest { + + private static final TableId TABLE_ID = TableId.of(TestConstants.DATASET, TestConstants.CSV); @BeforeAll static void prepareTest() { @@ -48,13 +51,13 @@ static void prepareTest() { if (BIG_QUERY.getTable(TestConstants.DATASET, TestConstants.CSV) == null) { TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema()); - BIG_QUERY.create(TableInfo.of(TableId.of(TestConstants.DATASET, TestConstants.CSV), tableDefinition)); + BIG_QUERY.create(TableInfo.of(TABLE_ID, tableDefinition)); } } @AfterAll static void cleanup() { - BIG_QUERY.delete(TableId.of(TestConstants.DATASET, TestConstants.CSV)); + BIG_QUERY.delete(TABLE_ID); } @ParameterizedTest @@ -69,7 +72,7 @@ void testWriteCsv(String tableName, boolean autodetect) throws Exception { .setFormatOptions(FormatOptions.csv()) .build(); - BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() + BigQueryLoadJobCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() .bigQuery(BIG_QUERY) .writeChannelConfig(channelConfiguration) .jobConsumer(job::set) @@ -84,8 +87,8 @@ void testWriteCsv(String tableName, boolean autodetect) throws Exception { private static Stream tables() { return Stream.of( - Arguments.of(TableUtils.generateTableName(TestConstants.CSV), false), - Arguments.of(TableUtils.generateTableName(TestConstants.CSV), true), + Arguments.of(NameUtils.generateTableName(TestConstants.CSV), false), + Arguments.of(NameUtils.generateTableName(TestConstants.CSV), true), Arguments.of(TestConstants.CSV, false) ); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/json/GcloudBigQueryLoadJobJsonItemWriterTest.java similarity index 73% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudJsonItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/json/GcloudBigQueryLoadJobJsonItemWriterTest.java index 23285e84..38b10195 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/loadjob/json/GcloudBigQueryLoadJobJsonItemWriterTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.gcloud.writer; +package org.springframework.batch.extensions.bigquery.gcloud.writer.loadjob.json; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.FormatOptions; @@ -30,15 +30,18 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.springframework.batch.extensions.bigquery.common.PersonDto; -import org.springframework.batch.extensions.bigquery.common.TableUtils; +import org.springframework.batch.extensions.bigquery.common.NameUtils; import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter; -import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder; +import org.springframework.batch.extensions.bigquery.gcloud.writer.GcloudBaseBigQueryItemWriterTest; +import org.springframework.batch.extensions.bigquery.writer.loadjob.json.BigQueryLoadJobJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.json.builder.BigQueryLoadJobJsonItemWriterBuilder; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; -class BigQueryGcloudJsonItemWriterTest extends BaseBigQueryGcloudItemWriterTest { +class GcloudBigQueryLoadJobJsonItemWriterTest extends GcloudBaseBigQueryItemWriterTest { + + private static final TableId TABLE_ID = TableId.of(TestConstants.DATASET, TestConstants.JSON); @BeforeAll static void prepareTest() { @@ -48,18 +51,18 @@ static void prepareTest() { if (BIG_QUERY.getTable(TestConstants.DATASET, TestConstants.JSON) == null) { TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema()); - BIG_QUERY.create(TableInfo.of(TableId.of(TestConstants.DATASET, TestConstants.JSON), tableDefinition)); + BIG_QUERY.create(TableInfo.of(TABLE_ID, tableDefinition)); } } @AfterAll static void cleanup() { - BIG_QUERY.delete(TableId.of(TestConstants.DATASET, TestConstants.JSON)); + BIG_QUERY.delete(TABLE_ID); } @ParameterizedTest @MethodSource("tables") - void testWriteJson(String tableName, boolean autodetect) throws Exception { + void testWrite(String tableName, boolean autodetect) throws Exception { AtomicReference job = new AtomicReference<>(); WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration @@ -69,7 +72,7 @@ void testWriteJson(String tableName, boolean autodetect) throws Exception { .setFormatOptions(FormatOptions.json()) .build(); - BigQueryJsonItemWriter writer = new BigQueryJsonItemWriterBuilder() + BigQueryLoadJobJsonItemWriter writer = new BigQueryLoadJobJsonItemWriterBuilder() .bigQuery(BIG_QUERY) .writeChannelConfig(channelConfiguration) .jobConsumer(job::set) @@ -84,8 +87,8 @@ void testWriteJson(String tableName, boolean autodetect) throws Exception { private static Stream tables() { return Stream.of( - Arguments.of(TableUtils.generateTableName(TestConstants.JSON), false), - Arguments.of(TableUtils.generateTableName(TestConstants.JSON), true), + Arguments.of(NameUtils.generateTableName(TestConstants.JSON), false), + Arguments.of(NameUtils.generateTableName(TestConstants.JSON), true), Arguments.of(TestConstants.JSON, false) ); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiJsonItemWriterTest.java new file mode 100644 index 00000000..509de2ff --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiJsonItemWriterTest.java @@ -0,0 +1,12 @@ +package org.springframework.batch.extensions.bigquery.gcloud.writer.writeapi.json; + +import org.junit.jupiter.api.Test; +import org.springframework.batch.extensions.bigquery.gcloud.writer.GcloudBaseBigQueryItemWriterTest; + +public class GcloudBigQueryWriteApiJsonItemWriterTest extends GcloudBaseBigQueryItemWriterTest { + + @Test + void testWrite() { + // TODO + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryItemReaderBuilderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryItemReaderBuilderTest.java index 44cb5a7f..4c8f7ae2 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryItemReaderBuilderTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryItemReaderBuilderTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; @@ -108,8 +109,8 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException { MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup()); QueryJobConfiguration jobConfiguration = QueryJobConfiguration - .newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.persons p LIMIT 2") - .setDestinationTable(TableId.of(TestConstants.DATASET, "persons_duplicate")) + .newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.csv p LIMIT 2") + .setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV)) .build(); BigQueryQueryItemReader reader = new BigQueryQueryItemReaderBuilder() @@ -146,10 +147,20 @@ void testBuild_Exception(String expectedMessage, BigQueryQueryItemReaderBuilder< private static Stream brokenBuilders() { final class HumanDto {} + BigQuery bigQuery = Mockito.mock(BigQuery.class); return Stream.of( - Arguments.of("No target type provided", new BigQueryQueryItemReaderBuilder()), - Arguments.of("Only Java record supported", new BigQueryQueryItemReaderBuilder().targetType(HumanDto.class)), - Arguments.of("No query provided", new BigQueryQueryItemReaderBuilder().rowMapper(source -> null)) + Arguments.of( + "No target type provided", + new BigQueryQueryItemReaderBuilder().bigQuery(bigQuery) + ), + Arguments.of( + "Only Java record supported", + new BigQueryQueryItemReaderBuilder().bigQuery(bigQuery).targetType(HumanDto.class) + ), + Arguments.of( + "No query provided", + new BigQueryQueryItemReaderBuilder().bigQuery(bigQuery).rowMapper(source -> null) + ) ); } } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/BigQueryLoadJobBaseItemWriterTest.java similarity index 87% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/BigQueryLoadJobBaseItemWriterTest.java index 9c9a75a0..361c2456 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/BigQueryLoadJobBaseItemWriterTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.unit.writer; +package org.springframework.batch.extensions.bigquery.unit.writer.loadjob; import com.google.cloud.bigquery.*; import org.junit.jupiter.api.Assertions; @@ -23,7 +23,7 @@ import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; -import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter; import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException; import java.lang.invoke.MethodHandles; @@ -34,9 +34,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -class BigQueryBaseItemWriterTest extends AbstractBigQueryTest { +class BigQueryLoadJobBaseItemWriterTest extends AbstractBigQueryTest { - private static final TableId TABLE_ID = TableId.of("dataset", "table"); + private static final TableId TABLE_ID = TableId.of(TestConstants.DATASET, TestConstants.CSV); @Test void testGetTable() { @@ -54,13 +54,13 @@ void testGetTable() { @Test void testSetDatasetInfo() throws IllegalAccessException, NoSuchFieldException { TestWriter writer = new TestWriter(); - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); - DatasetInfo expected = DatasetInfo.of("test"); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, MethodHandles.lookup()); + DatasetInfo expected = DatasetInfo.of(TABLE_ID.getDataset()); writer.setDatasetInfo(expected); DatasetInfo actual = (DatasetInfo) handle - .findVarHandle(BigQueryBaseItemWriter.class, "datasetInfo", DatasetInfo.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "datasetInfo", DatasetInfo.class) .get(writer); Assertions.assertEquals(expected, actual); @@ -69,13 +69,13 @@ void testSetDatasetInfo() throws IllegalAccessException, NoSuchFieldException { @Test void testSetJobConsumer() throws IllegalAccessException, NoSuchFieldException { TestWriter writer = new TestWriter(); - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, MethodHandles.lookup()); Consumer expected = job -> {}; writer.setJobConsumer(expected); Consumer actual = (Consumer) handle - .findVarHandle(BigQueryBaseItemWriter.class, "jobConsumer", Consumer.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "jobConsumer", Consumer.class) .get(writer); Assertions.assertEquals(expected, actual); @@ -84,13 +84,13 @@ void testSetJobConsumer() throws IllegalAccessException, NoSuchFieldException { @Test void testSetWriteChannelConfig() throws IllegalAccessException, NoSuchFieldException { TestWriter writer = new TestWriter(); - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, MethodHandles.lookup()); WriteChannelConfiguration expected = WriteChannelConfiguration.newBuilder(TABLE_ID).build(); writer.setWriteChannelConfig(expected); WriteChannelConfiguration actual = (WriteChannelConfiguration) handle - .findVarHandle(BigQueryBaseItemWriter.class, "writeChannelConfig", WriteChannelConfiguration.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "writeChannelConfig", WriteChannelConfiguration.class) .get(writer); Assertions.assertEquals(expected, actual); @@ -99,13 +99,13 @@ void testSetWriteChannelConfig() throws IllegalAccessException, NoSuchFieldExcep @Test void testSetBigQuery() throws IllegalAccessException, NoSuchFieldException { TestWriter writer = new TestWriter(); - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, MethodHandles.lookup()); BigQuery expected = prepareMockedBigQuery(); writer.setBigQuery(expected); BigQuery actual = (BigQuery) handle - .findVarHandle(BigQueryBaseItemWriter.class, "bigQuery", BigQuery.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "bigQuery", BigQuery.class) .get(writer); Assertions.assertEquals(expected, actual); @@ -113,7 +113,7 @@ void testSetBigQuery() throws IllegalAccessException, NoSuchFieldException { @Test void testWrite() throws Exception { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, MethodHandles.lookup()); AtomicBoolean consumerCalled = new AtomicBoolean(); Job job = Mockito.mock(Job.class); @@ -133,7 +133,7 @@ void testWrite() throws Exception { writer.write(TestConstants.CHUNK); AtomicLong actual = (AtomicLong) handle - .findVarHandle(BigQueryBaseItemWriter.class, "bigQueryWriteCounter", AtomicLong.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "bigQueryWriteCounter", AtomicLong.class) .get(writer); Assertions.assertEquals(1L, actual.get()); @@ -147,7 +147,7 @@ void testWrite() throws Exception { @Test void testWrite_Exception() throws Exception { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, MethodHandles.lookup()); AtomicBoolean consumerCalled = new AtomicBoolean(); Job job = Mockito.mock(Job.class); @@ -169,11 +169,11 @@ void testWrite_Exception() throws Exception { Assertions.assertEquals("Error on write happened", actual.getMessage()); AtomicLong actualCounter = (AtomicLong) handle - .findVarHandle(BigQueryBaseItemWriter.class, "bigQueryWriteCounter", AtomicLong.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "bigQueryWriteCounter", AtomicLong.class) .get(writer); boolean writeFailed = (Boolean) handle - .findVarHandle(BigQueryBaseItemWriter.class, "writeFailed", boolean.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "writeFailed", boolean.class) .get(writer); Assertions.assertEquals(0L, actualCounter.get()); @@ -248,7 +248,7 @@ void testBaseAfterPropertiesSet_Exception() { @Test void testBaseAfterPropertiesSet_Dataset() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, MethodHandles.lookup()); DatasetInfo datasetInfo = DatasetInfo.of(TABLE_ID.getDataset()); BigQuery bigQuery = prepareMockedBigQuery(); @@ -259,7 +259,7 @@ void testBaseAfterPropertiesSet_Dataset() throws IllegalAccessException, NoSuchF writer.afterPropertiesSet(); - DatasetInfo actual = (DatasetInfo) handle.findVarHandle(BigQueryBaseItemWriter.class, "datasetInfo", DatasetInfo.class).get(writer); + DatasetInfo actual = (DatasetInfo) handle.findVarHandle(BigQueryLoadJobBaseItemWriter.class, "datasetInfo", DatasetInfo.class).get(writer); Assertions.assertEquals(datasetInfo, actual); Mockito.verify(bigQuery).create(datasetInfo); @@ -288,7 +288,7 @@ void testTableHasDefinedSchema() { Assertions.assertTrue(writer.testTableHasDefinedSchema(table)); } - private static final class TestWriter extends BigQueryBaseItemWriter { + private static final class TestWriter extends BigQueryLoadJobBaseItemWriter { @Override protected void doInitializeProperties(List items) {} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/BigQueryLoadJobCsvItemWriterTest.java similarity index 87% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/BigQueryLoadJobCsvItemWriterTest.java index 4f801a7f..50de5d64 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/BigQueryLoadJobCsvItemWriterTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.unit.writer; +package org.springframework.batch.extensions.bigquery.unit.writer.loadjob.csv; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectWriter; @@ -37,22 +37,23 @@ import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; -import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.BigQueryLoadJobCsvItemWriter; import org.springframework.core.convert.converter.Converter; import java.lang.invoke.MethodHandles; import java.util.List; import java.util.stream.Stream; -class BigQueryCsvItemWriterTest extends AbstractBigQueryTest { +class BigQueryLoadJobCsvItemWriterTest extends AbstractBigQueryTest { - private static final TableId TABLE_ID = TableId.of("1", "2"); + private static final TableId TABLE_ID = TableId.of(TestConstants.DATASET, TestConstants.CSV); + private static final Schema SCHEMA = Schema.of(Field.of(TestConstants.AGE, StandardSQLTypeName.STRING)); @Test void testDoInitializeProperties() throws IllegalAccessException, NoSuchFieldException { TestWriter writer = new TestWriter(); List items = TestConstants.CHUNK.getItems(); - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryCsvItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobCsvItemWriter.class, MethodHandles.lookup()); // Exception Assertions.assertThrows(IllegalStateException.class, () -> writer.testInitializeProperties(List.of())); @@ -61,22 +62,24 @@ void testDoInitializeProperties() throws IllegalAccessException, NoSuchFieldExce writer.testInitializeProperties(items); Assertions.assertEquals( PersonDto.class.getSimpleName(), - ((Class) handle.findVarHandle(BigQueryCsvItemWriter.class, "itemClass", Class.class).get(writer)).getSimpleName() + ((Class) handle.findVarHandle(BigQueryLoadJobCsvItemWriter.class, "itemClass", Class.class).get(writer)).getSimpleName() ); - ObjectWriter objectWriter = (ObjectWriter) handle.findVarHandle(BigQueryCsvItemWriter.class, "objectWriter", ObjectWriter.class).get(writer); + ObjectWriter objectWriter = (ObjectWriter) handle + .findVarHandle(BigQueryLoadJobCsvItemWriter.class, "objectWriter", ObjectWriter.class) + .get(writer); Assertions.assertInstanceOf(CsvFactory.class, objectWriter.getFactory()); } @Test void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException { - BigQueryCsvItemWriter reader = new BigQueryCsvItemWriter<>(); - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryCsvItemWriter.class, MethodHandles.lookup()); + BigQueryLoadJobCsvItemWriter reader = new BigQueryLoadJobCsvItemWriter<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobCsvItemWriter.class, MethodHandles.lookup()); Converter expected = source -> null; reader.setRowMapper(expected); Converter actual = (Converter) handle - .findVarHandle(BigQueryCsvItemWriter.class, "rowMapper", Converter.class) + .findVarHandle(BigQueryLoadJobCsvItemWriter.class, "rowMapper", Converter.class) .get(reader); Assertions.assertEquals(expected, actual); @@ -131,7 +134,7 @@ void testPerformFormatSpecificChecks() { Table table = Mockito.mock(Table.class); StandardTableDefinition tableDefinition = StandardTableDefinition .newBuilder() - .setSchema(Schema.of(Field.of(TestConstants.AGE, StandardSQLTypeName.STRING))) + .setSchema(SCHEMA) .build(); Mockito.when(table.getDefinition()).thenReturn(tableDefinition); @@ -161,7 +164,7 @@ void testPerformFormatSpecificChecks_Format(FormatOptions formatOptions) { Table table = Mockito.mock(Table.class); StandardTableDefinition tableDefinition = StandardTableDefinition .newBuilder() - .setSchema(Schema.of(Field.of(TestConstants.AGE, StandardSQLTypeName.STRING))) + .setSchema(SCHEMA) .build(); Mockito.when(table.getDefinition()).thenReturn(tableDefinition); @@ -189,7 +192,7 @@ static Stream invalidFormats() { ); } - private static final class TestWriter extends BigQueryCsvItemWriter { + private static final class TestWriter extends BigQueryLoadJobCsvItemWriter { public void testInitializeProperties(List items) { doInitializeProperties(items); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryCsvItemWriterBuilderTests.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/builder/BigQueryLoadJobCsvItemWriterBuilderTests.java similarity index 73% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryCsvItemWriterBuilderTests.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/builder/BigQueryLoadJobCsvItemWriterBuilderTests.java index 73f7563a..f47cedbc 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryCsvItemWriterBuilderTests.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/csv/builder/BigQueryLoadJobCsvItemWriterBuilderTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.unit.writer.builder; +package org.springframework.batch.extensions.bigquery.unit.writer.loadjob.csv.builder; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetInfo; @@ -27,20 +27,20 @@ import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; -import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter; -import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; -import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder; +import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.BigQueryLoadJobCsvItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.builder.BigQueryCsvItemWriterBuilder; import org.springframework.core.convert.converter.Converter; import java.lang.invoke.MethodHandles; import java.util.function.Consumer; -class BigQueryCsvItemWriterBuilderTests extends AbstractBigQueryTest { +class BigQueryLoadJobCsvItemWriterBuilderTests extends AbstractBigQueryTest { @Test void testBuild() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup csvWriterHandle = MethodHandles.privateLookupIn(BigQueryCsvItemWriter.class, MethodHandles.lookup()); - MethodHandles.Lookup baseWriterHandle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup csvWriterHandle = MethodHandles.privateLookupIn(BigQueryLoadJobCsvItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup baseWriterHandle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, MethodHandles.lookup()); Converter rowMapper = source -> new byte[0]; DatasetInfo datasetInfo = DatasetInfo.newBuilder(TestConstants.DATASET).setLocation("europe-west-2").build(); @@ -48,11 +48,11 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException { BigQuery mockedBigQuery = prepareMockedBigQuery(); WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), "persons_csv")) + .newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), TestConstants.CSV)) .setFormatOptions(FormatOptions.csv()) .build(); - BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() + BigQueryLoadJobCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() .rowMapper(rowMapper) .writeChannelConfig(writeConfiguration) .jobConsumer(jobConsumer) @@ -63,23 +63,23 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException { Assertions.assertNotNull(writer); Converter actualRowMapper = (Converter) csvWriterHandle - .findVarHandle(BigQueryCsvItemWriter.class, "rowMapper", Converter.class) + .findVarHandle(BigQueryLoadJobCsvItemWriter.class, "rowMapper", Converter.class) .get(writer); WriteChannelConfiguration actualWriteChannelConfig = (WriteChannelConfiguration) csvWriterHandle - .findVarHandle(BigQueryCsvItemWriter.class, "writeChannelConfig", WriteChannelConfiguration.class) + .findVarHandle(BigQueryLoadJobCsvItemWriter.class, "writeChannelConfig", WriteChannelConfiguration.class) .get(writer); Consumer actualJobConsumer = (Consumer) baseWriterHandle - .findVarHandle(BigQueryBaseItemWriter.class, "jobConsumer", Consumer.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "jobConsumer", Consumer.class) .get(writer); BigQuery actualBigQuery = (BigQuery) baseWriterHandle - .findVarHandle(BigQueryBaseItemWriter.class, "bigQuery", BigQuery.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "bigQuery", BigQuery.class) .get(writer); DatasetInfo actualDatasetInfo = (DatasetInfo) baseWriterHandle - .findVarHandle(BigQueryCsvItemWriter.class, "datasetInfo", DatasetInfo.class) + .findVarHandle(BigQueryLoadJobCsvItemWriter.class, "datasetInfo", DatasetInfo.class) .get(writer); Assertions.assertEquals(rowMapper, actualRowMapper); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/BigQueryLoadJobJsonItemWriterTest.java similarity index 90% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/BigQueryLoadJobJsonItemWriterTest.java index c19f9fe4..20a99e5c 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/BigQueryLoadJobJsonItemWriterTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.unit.writer; +package org.springframework.batch.extensions.bigquery.unit.writer.loadjob.json; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.Field; @@ -33,7 +33,7 @@ import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; -import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.json.BigQueryLoadJobJsonItemWriter; import org.springframework.batch.item.json.GsonJsonObjectMarshaller; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import org.springframework.batch.item.json.JsonObjectMarshaller; @@ -42,20 +42,20 @@ import java.util.List; import java.util.stream.Stream; -class BigQueryJsonItemWriterTest extends AbstractBigQueryTest { +class BigQueryLoadJobJsonItemWriterTest extends AbstractBigQueryTest { - private static final TableId TABLE_ID = TableId.of("1", "2"); + private static final TableId TABLE_ID = TableId.of(TestConstants.DATASET, TestConstants.JSON); @Test void testSetMarshaller() throws IllegalAccessException, NoSuchFieldException { - BigQueryJsonItemWriter reader = new BigQueryJsonItemWriter<>(); - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup()); + BigQueryLoadJobJsonItemWriter reader = new BigQueryLoadJobJsonItemWriter<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryLoadJobJsonItemWriter.class, MethodHandles.lookup()); JsonObjectMarshaller expected = new JacksonJsonObjectMarshaller<>(); reader.setMarshaller(expected); JsonObjectMarshaller actual = (JsonObjectMarshaller) handle - .findVarHandle(BigQueryJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) + .findVarHandle(BigQueryLoadJobJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) .get(reader); Assertions.assertEquals(expected, actual); @@ -104,7 +104,7 @@ void testPerformFormatSpecificChecks() { // marshaller IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks); - Assertions.assertEquals("Marshaller is mandatory", actual.getMessage()); + Assertions.assertEquals("Marshaller must be provided", actual.getMessage()); // schema writer.setMarshaller(new JacksonJsonObjectMarshaller<>()); @@ -159,7 +159,7 @@ static Stream invalidFormats() { ); } - private static final class TestWriter extends BigQueryJsonItemWriter { + private static final class TestWriter extends BigQueryLoadJobJsonItemWriter { public List testConvert(List items) { return convertObjectsToByteArrays(items); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilderTests.java similarity index 73% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilderTests.java index b0364ebd..0a2f317e 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/loadjob/json/builder/BigQueryLoadJobJsonItemWriterBuilderTests.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.extensions.bigquery.unit.writer.builder; +package org.springframework.batch.extensions.bigquery.unit.writer.loadjob.json.builder; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetInfo; @@ -27,21 +27,21 @@ import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; -import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter; -import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter; -import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder; +import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.json.BigQueryLoadJobJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.loadjob.json.builder.BigQueryLoadJobJsonItemWriterBuilder; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import org.springframework.batch.item.json.JsonObjectMarshaller; import java.lang.invoke.MethodHandles; import java.util.function.Consumer; -class BigQueryJsonItemWriterBuilderTests extends AbstractBigQueryTest { +class BigQueryLoadJobJsonItemWriterBuilderTests extends AbstractBigQueryTest { @Test void testBuild() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup jsonWriterHandle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup()); - MethodHandles.Lookup baseWriterHandle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup jsonWriterHandle = MethodHandles.privateLookupIn(BigQueryLoadJobJsonItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup baseWriterHandle = MethodHandles.privateLookupIn(BigQueryLoadJobBaseItemWriter.class, MethodHandles.lookup()); JsonObjectMarshaller marshaller = new JacksonJsonObjectMarshaller<>(); DatasetInfo datasetInfo = DatasetInfo.newBuilder(TestConstants.DATASET).setLocation("europe-west-2").build(); @@ -49,11 +49,11 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException { BigQuery mockedBigQuery = prepareMockedBigQuery(); WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), "persons_json")) + .newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), TestConstants.JSON)) .setFormatOptions(FormatOptions.json()) .build(); - BigQueryJsonItemWriter writer = new BigQueryJsonItemWriterBuilder() + BigQueryLoadJobJsonItemWriter writer = new BigQueryLoadJobJsonItemWriterBuilder() .marshaller(marshaller) .writeChannelConfig(writeConfiguration) .jobConsumer(jobConsumer) @@ -64,23 +64,23 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException { Assertions.assertNotNull(writer); JsonObjectMarshaller actualMarshaller = (JsonObjectMarshaller) jsonWriterHandle - .findVarHandle(BigQueryJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) + .findVarHandle(BigQueryLoadJobJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) .get(writer); WriteChannelConfiguration actualWriteChannelConfig = (WriteChannelConfiguration) jsonWriterHandle - .findVarHandle(BigQueryJsonItemWriter.class, "writeChannelConfig", WriteChannelConfiguration.class) + .findVarHandle(BigQueryLoadJobJsonItemWriter.class, "writeChannelConfig", WriteChannelConfiguration.class) .get(writer); Consumer actualJobConsumer = (Consumer) baseWriterHandle - .findVarHandle(BigQueryBaseItemWriter.class, "jobConsumer", Consumer.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "jobConsumer", Consumer.class) .get(writer); BigQuery actualBigQuery = (BigQuery) baseWriterHandle - .findVarHandle(BigQueryBaseItemWriter.class, "bigQuery", BigQuery.class) + .findVarHandle(BigQueryLoadJobBaseItemWriter.class, "bigQuery", BigQuery.class) .get(writer); DatasetInfo actualDatasetInfo = (DatasetInfo) baseWriterHandle - .findVarHandle(BigQueryJsonItemWriter.class, "datasetInfo", DatasetInfo.class) + .findVarHandle(BigQueryLoadJobJsonItemWriter.class, "datasetInfo", DatasetInfo.class) .get(writer); Assertions.assertEquals(marshaller, actualMarshaller); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiJsonItemWriterTest.java new file mode 100644 index 00000000..659316f5 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiJsonItemWriterTest.java @@ -0,0 +1,181 @@ +package org.springframework.batch.extensions.bigquery.unit.writer.writeapi.json; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.bigquery.storage.v1.WriteStreamName; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiJsonItemWriter; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.json.GsonJsonObjectMarshaller; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; +import org.springframework.batch.item.json.JsonObjectMarshaller; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +class BigQueryWriteApiJsonItemWriterTest { + + private static final TableName TABLE_NAME = TableName.of(TestConstants.PROJECT, TestConstants.DATASET, TestConstants.JSON); + + @Test + void testWrite_Empty() throws Exception { + BigQueryWriteClient writeClient = Mockito.mock(BigQueryWriteClient.class); + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + writer.setBigQueryWriteClient(writeClient); + + writer.write(Chunk.of()); + + Mockito.verifyNoInteractions(writeClient); + } + + @Test + void testWrite_Exception() { + BigQueryItemWriterException ex = Assertions.assertThrows( + BigQueryItemWriterException.class, () -> new BigQueryWriteApiJsonItemWriter<>().write(TestConstants.CHUNK) + ); + Assertions.assertEquals("Error on write happened", ex.getMessage()); + } + + @Test + void testWrite() throws Exception { + WriteStreamName streamName = WriteStreamName.of(TABLE_NAME.getProject(), TABLE_NAME.getDataset(), TABLE_NAME.getTable(), "test-stream-1"); + + WriteStream writeStream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build(); + CreateWriteStreamRequest streamRequest = CreateWriteStreamRequest.newBuilder().setParent(TABLE_NAME.toString()).setWriteStream(writeStream).build(); + + BigQueryWriteClient writeClient = Mockito.mock(BigQueryWriteClient.class); + WriteStream generatedWriteStream = WriteStream.newBuilder().setName(streamName.toString()).setTableSchema(PersonDto.getWriteApiSchema()).build(); + Mockito.when(writeClient.createWriteStream(streamRequest)).thenReturn(generatedWriteStream); + Mockito.when(writeClient.getWriteStream(Mockito.any(GetWriteStreamRequest.class))).thenReturn(generatedWriteStream); + Mockito.when(writeClient.getSettings()).thenReturn(BigQueryWriteSettings.newBuilder().setCredentialsProvider(NoCredentialsProvider.create()).build()); + + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + writer.setTableName(TABLE_NAME); + writer.setBigQueryWriteClient(writeClient); + writer.setMarshaller(new JacksonJsonObjectMarshaller<>()); + + writer.write(TestConstants.CHUNK); + + Mockito.verify(writeClient).createWriteStream(streamRequest); + Mockito.verify(writeClient).finalizeWriteStream(streamName.toString()); + } + + @Test + void testAfterPropertiesSet() { + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + + // bigQueryWriteClient + IllegalArgumentException ex = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("BigQuery write client must be provided", ex.getMessage()); + + // tableName + writer.setBigQueryWriteClient(Mockito.mock(BigQueryWriteClient.class)); + ex = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Table name must be provided", ex.getMessage()); + + // marshaller + writer.setTableName(TABLE_NAME); + ex = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Marshaller must be provided", ex.getMessage()); + + // executor + writer.setApiFutureCallback(new TestCallback()); + writer.setMarshaller(new GsonJsonObjectMarshaller<>()); + ex = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Executor must be provided", ex.getMessage()); + + // All good + writer.setExecutor(Executors.newSingleThreadExecutor()); + Assertions.assertDoesNotThrow(writer::afterPropertiesSet); + } + + @Test + void testSetBigQueryWriteClient() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + BigQueryWriteClient expected = Mockito.mock(BigQueryWriteClient.class); + + writer.setBigQueryWriteClient(expected); + + BigQueryWriteClient actual = (BigQueryWriteClient) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "bigQueryWriteClient", BigQueryWriteClient.class) + .get(writer); + Assertions.assertEquals(expected, actual); + } + + @Test + void testSetTableName() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + + writer.setTableName(TABLE_NAME); + + TableName actual = (TableName) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "tableName", TableName.class) + .get(writer); + Assertions.assertEquals(TABLE_NAME, actual); + } + + @Test + void testSetMarshaller() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + JsonObjectMarshaller expected = new JacksonJsonObjectMarshaller<>(); + + writer.setMarshaller(expected); + + JsonObjectMarshaller actual = (JsonObjectMarshaller) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) + .get(writer); + Assertions.assertEquals(expected, actual); + } + + @Test + void testSetApiFutureCallback() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + ApiFutureCallback expected = new TestCallback(); + + writer.setApiFutureCallback(expected); + + ApiFutureCallback actual = (ApiFutureCallback) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "apiFutureCallback", ApiFutureCallback .class) + .get(writer); + Assertions.assertEquals(expected, actual); + } + + @Test + void testSetExecutor() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + Executor expected = Executors.newSingleThreadExecutor(); + + writer.setExecutor(expected); + + Executor actual = (Executor) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "executor", Executor.class) + .get(writer); + Assertions.assertEquals(expected, actual); + } + + private static final class TestCallback implements ApiFutureCallback { + @Override + public void onFailure(Throwable t) {} + + @Override + public void onSuccess(AppendRowsResponse result) {} + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilderTest.java new file mode 100644 index 00000000..f3d04ee4 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilderTest.java @@ -0,0 +1,172 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.unit.writer.writeapi.json.builder; + +import com.google.api.core.ApiFutureCallback; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.TableName; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.builder.BigQueryWriteApiJsonItemWriterBuilder; +import org.springframework.batch.item.json.GsonJsonObjectMarshaller; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; +import org.springframework.batch.item.json.JsonObjectMarshaller; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +class BigQueryWriteApiJsonItemWriterBuilderTest { + + @Test + void testBigQueryWriteClient() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + BigQueryWriteClient expected = Mockito.mock(BigQueryWriteClient.class); + + builder.bigQueryWriteClient(expected); + + BigQueryWriteClient actual = (BigQueryWriteClient) handle + .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "bigQueryWriteClient", BigQueryWriteClient.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testTableName() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + TableName expected = TableName.of(TestConstants.PROJECT, TestConstants.DATASET, TestConstants.JSON); + + builder.tableName(expected); + + TableName actual = (TableName) handle + .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "tableName", TableName.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testMarshaller() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + JsonObjectMarshaller expected = new GsonJsonObjectMarshaller<>(); + + builder.marshaller(expected); + + JsonObjectMarshaller actual = (JsonObjectMarshaller) handle + .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "marshaller", JsonObjectMarshaller.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testApiFutureCallback() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + + ApiFutureCallback expected = new ApiFutureCallback<>() { + @Override + public void onFailure(Throwable t) {} + + @Override + public void onSuccess(AppendRowsResponse result) {} + }; + + builder.apiFutureCallback(expected); + + ApiFutureCallback actual = (ApiFutureCallback) handle + .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "apiFutureCallback", ApiFutureCallback.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testExecutor() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + Executor expected = Executors.newSingleThreadExecutor(); + + builder.executor(expected); + + Executor actual = (Executor) handle + .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "executor", Executor.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testBuild() throws IOException, IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); + JsonObjectMarshaller expectedMarshaller = new JacksonJsonObjectMarshaller<>(); + BigQueryWriteClient expectedWriteClient = Mockito.mock(BigQueryWriteClient.class); + Executor expectedExecutor = Executors.newCachedThreadPool(); + TableName expectedTableName = TableName.of(TestConstants.PROJECT, TestConstants.DATASET, TestConstants.JSON); + + ApiFutureCallback expectedCallback = new ApiFutureCallback<>() { + @Override + public void onFailure(Throwable t) { + } + + @Override + public void onSuccess(AppendRowsResponse result) { + } + }; + + BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriterBuilder() + .marshaller(expectedMarshaller) + .bigQueryWriteClient(expectedWriteClient) + .apiFutureCallback(expectedCallback) + .executor(expectedExecutor) + .tableName(expectedTableName) + .build(); + + Assertions.assertNotNull(writer); + + JsonObjectMarshaller actualMarshaller = (JsonObjectMarshaller) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) + .get(writer); + + BigQueryWriteClient actualWriteClient = (BigQueryWriteClient) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "bigQueryWriteClient", BigQueryWriteClient.class) + .get(writer); + + ApiFutureCallback actualCallback = (ApiFutureCallback) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "apiFutureCallback", ApiFutureCallback.class) + .get(writer); + + Executor actualExecutor = (Executor) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "executor", Executor.class) + .get(writer); + + TableName actualTableName = (TableName) handle + .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "tableName", TableName.class) + .get(writer); + + Assertions.assertEquals(expectedMarshaller, actualMarshaller); + Assertions.assertEquals(expectedWriteClient, actualWriteClient); + Assertions.assertEquals(expectedCallback, actualCallback); + Assertions.assertEquals(expectedExecutor, actualExecutor); + Assertions.assertEquals(expectedTableName, actualTableName); + } +} diff --git a/spring-batch-bigquery/src/test/resources/logback.xml b/spring-batch-bigquery/src/test/resources/logback.xml index feb55f5f..d90c4b25 100644 --- a/spring-batch-bigquery/src/test/resources/logback.xml +++ b/spring-batch-bigquery/src/test/resources/logback.xml @@ -13,6 +13,7 @@ +