diff --git a/spring-batch-bigquery/README.adoc b/spring-batch-bigquery/README.adoc index 04a597da..a930b198 100644 --- a/spring-batch-bigquery/README.adoc +++ b/spring-batch-bigquery/README.adoc @@ -1,11 +1,15 @@ = spring-batch-bigquery -Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery]. -It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs]. +Spring Batch extension which contains an `ItemWriter` and `ItemReader` implementations for https://cloud.google.com/bigquery[BigQuery]. -== Configuration of `BigQueryCsvItemWriter` +`ItemWriter` supports next formats (https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs]): -Next to the https://docs.spring.io/spring-batch/reference/html/configureJob.html[configuration of Spring Batch] one needs to configure the `BigQueryCsvItemWriter`. +* https://en.wikipedia.org/wiki/Comma-separated_values[CSV] +* https://en.wikipedia.org/wiki/JSON[JSON] + +Based on https://github.com/googleapis/java-bigquery[Java BigQuery]. + +== Example of `BigQueryCsvItemWriter` [source,java] ---- @@ -17,24 +21,25 @@ BigQueryCsvItemWriter bigQueryCsvWriter() { .setFormatOptions(FormatOptions.csv()) .build(); - BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() - .bigQuery(mockedBigQuery) + return new BigQueryCsvItemWriterBuilder() + .bigQuery(bigQueryService) .writeChannelConfig(writeConfiguration) .build(); } ---- -Additional examples could be found in https://github.com/spring-projects/spring-batch-extensions/blob/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/writer/builder/[here]. +== Example of `BigQueryItemReader` -== Configuration properties -[cols="1,1,4"] -.Properties for an item writer -|=== -| Property | Required | Description +[source,java] +---- +@Bean +BigQueryItemReader bigQueryReader() { + return new BigQueryQueryItemReaderBuilder() + .bigQuery(bigQueryService) + .rowMapper(res -> new PersonDto(res.get("name").getStringValue())) + .query("SELECT p.name FROM persons p") + .build(); +} +---- -| `bigQuery` | yes | BigQuery object that provided by BigQuery Java Library. Responsible for connection with BigQuery. -| `writeChannelConfig` | yes | BigQuery write channel config provided by BigQuery Java Library. Responsible for configuring data type, data channel, jobs that will be sent to BigQuery. -| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to a byte array. -| `datasetInfo` | no | Your way to customize how to create BigQuery dataset. -| `jobConsumer` | no | Your custom handler for BigQuery Job provided by BigQuery Java Library. -|=== \ No newline at end of file +Additional examples could be found in the https://github.com/spring-projects/spring-batch-extensions/tree/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery[test folder]. \ No newline at end of file diff --git a/spring-batch-bigquery/pom.xml b/spring-batch-bigquery/pom.xml index 6e9cf4cb..fe71c08e 100644 --- a/spring-batch-bigquery/pom.xml +++ b/spring-batch-bigquery/pom.xml @@ -14,13 +14,15 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - + 4.0.0 org.springframework.boot spring-boot-starter-parent - 3.4.5 + 3.5.0 @@ -62,11 +64,7 @@ com.google.cloud google-cloud-bigquery - 2.45.0 - - - org.apache.commons - commons-lang3 + 2.51.0 org.springframework.batch @@ -84,6 +82,11 @@ junit-jupiter-api test + + org.junit.jupiter + junit-jupiter-params + test + org.mockito mockito-core @@ -94,6 +97,18 @@ junit-jupiter test + + org.wiremock + wiremock-standalone + 3.13.0 + test + + + org.slf4j + jul-to-slf4j + test + + @@ -123,15 +138,15 @@ - + org.apache.maven.plugins maven-surefire-plugin - + + **/unit/** - **/emulator/** @@ -139,7 +154,7 @@ org.codehaus.mojo flatten-maven-plugin - 1.6.0 + 1.7.0 flatten diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/BigQueryQueryItemReader.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/BigQueryQueryItemReader.java index 84cb9928..26ae7bdb 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/BigQueryQueryItemReader.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/BigQueryQueryItemReader.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,7 +64,7 @@ public void setBigQuery(BigQuery bigQuery) { } /** - * Row mapper which transforms single BigQuery row into desired type. + * Row mapper which transforms single BigQuery row into a desired type. * * @param rowMapper your row mapper */ diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java index a882ffa3..63f417d1 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/reader/builder/BigQueryQueryItemReaderBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,10 +19,10 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.FieldValueList; import com.google.cloud.bigquery.QueryJobConfiguration; -import org.apache.commons.lang3.StringUtils; import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; import org.springframework.core.convert.converter.Converter; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * A builder for {@link BigQueryQueryItemReader}. @@ -103,7 +103,7 @@ public BigQueryQueryItemReader build() { reader.setRowMapper(this.rowMapper); if (this.jobConfiguration == null) { - Assert.isTrue(StringUtils.isNotBlank(this.query), "No query provided"); + Assert.isTrue(StringUtils.hasText(this.query), "No query provided"); reader.setJobConfiguration(QueryJobConfiguration.newBuilder(this.query).build()); } else { reader.setJobConfiguration(this.jobConfiguration); diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java index 07bd6087..aaea894e 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,21 +16,12 @@ package org.springframework.batch.extensions.bigquery.writer; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetInfo; -import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableDataWriteChannel; -import com.google.cloud.bigquery.TableDefinition; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.WriteChannelConfiguration; -import org.apache.commons.lang3.BooleanUtils; +import com.google.cloud.bigquery.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; import java.io.ByteArrayOutputStream; @@ -41,7 +32,6 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.function.Supplier; /** * Base class that holds shared code for JSON and CSV writers. @@ -50,7 +40,7 @@ * @author Volodymyr Perebykivskyi * @since 0.1.0 */ -public abstract class BigQueryBaseItemWriter implements ItemWriter { +public abstract class BigQueryBaseItemWriter implements ItemWriter, InitializingBean { /** Logger that can be reused */ protected final Log logger = LogFactory.getLog(getClass()); @@ -76,6 +66,7 @@ public abstract class BigQueryBaseItemWriter implements ItemWriter { private BigQuery bigQuery; + private boolean writeFailed; /** * Fetches table from the provided configuration. @@ -168,18 +159,25 @@ private void doWriteDataToBigQuery(ByteBuffer byteBuffer) throws IOException { writer.write(byteBuffer); writeChannel = writer; } + catch (Exception e) { + writeFailed = true; + logger.error("BigQuery error", e); + throw new BigQueryItemWriterException("Error on write happened", e); + } finally { - String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet(); - - if (writeChannel != null) { - logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob(); - if (this.jobConsumer != null) { - this.jobConsumer.accept(writeChannel.getJob()); + if (!writeFailed) { + String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet(); + + if (writeChannel != null) { + logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob(); + if (this.jobConsumer != null) { + this.jobConsumer.accept(writeChannel.getJob()); + } } - } - if (this.logger.isDebugEnabled()) { - this.logger.debug(logMessage); + if (this.logger.isDebugEnabled()) { + this.logger.debug(logMessage); + } } } } @@ -194,23 +192,22 @@ private TableDataWriteChannel getWriteChannel() { /** * Performs common validation for CSV and JSON types. - * - * @param formatSpecificChecks supplies type-specific validation */ - protected void baseAfterPropertiesSet(Supplier formatSpecificChecks) { + @Override + public void afterPropertiesSet() { Assert.notNull(this.bigQuery, "BigQuery service must be provided"); Assert.notNull(this.writeChannelConfig, "Write channel configuration must be provided"); + Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided"); - Assert.isTrue(BooleanUtils.isFalse(isBigtable()), "Google BigTable is not supported"); - Assert.isTrue(BooleanUtils.isFalse(isGoogleSheets()), "Google Sheets is not supported"); - Assert.isTrue(BooleanUtils.isFalse(isDatastore()), "Google Datastore is not supported"); - Assert.isTrue(BooleanUtils.isFalse(isParquet()), "Parquet is not supported"); - Assert.isTrue(BooleanUtils.isFalse(isOrc()), "Orc is not supported"); - Assert.isTrue(BooleanUtils.isFalse(isAvro()), "Avro is not supported"); - - formatSpecificChecks.get(); + Assert.isTrue(!isBigtable(), "Google BigTable is not supported"); + Assert.isTrue(!isGoogleSheets(), "Google Sheets is not supported"); + Assert.isTrue(!isDatastore(), "Google Datastore is not supported"); + Assert.isTrue(!isParquet(), "Parquet is not supported"); + Assert.isTrue(!isOrc(), "Orc is not supported"); + Assert.isTrue(!isAvro(), "Avro is not supported"); + Assert.isTrue(!isIceberg(), "Iceberg is not supported"); - Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided"); + performFormatSpecificChecks(); String dataset = this.writeChannelConfig.getDestinationTable().getDataset(); if (this.datasetInfo == null) { @@ -262,6 +259,10 @@ private boolean isDatastore() { return FormatOptions.datastoreBackup().getType().equals(this.writeChannelConfig.getFormat()); } + private boolean isIceberg() { + return FormatOptions.iceberg().getType().equals(this.writeChannelConfig.getFormat()); + } + /** * Schema can be computed on the BigQuery side during upload, * so it is good to know when schema is supplied by user manually. @@ -294,4 +295,9 @@ protected boolean tableHasDefinedSchema(Table table) { */ protected abstract List convertObjectsToByteArrays(List items); + /** + * Performs specific checks that are unique to the format. + */ + protected abstract void performFormatSpecificChecks(); + } \ No newline at end of file diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java index 98798d70..904bf0be 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,14 +19,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.google.cloud.bigquery.FormatOptions; import com.google.cloud.bigquery.Table; -import org.apache.commons.lang3.ArrayUtils; -import org.springframework.beans.factory.InitializingBean; import org.springframework.core.convert.converter.Converter; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; import java.util.function.Predicate; @@ -39,7 +37,7 @@ * @since 0.2.0 * @see CSV */ -public class BigQueryCsvItemWriter extends BigQueryBaseItemWriter implements InitializingBean { +public class BigQueryCsvItemWriter extends BigQueryBaseItemWriter { private Converter rowMapper; private ObjectWriter objectWriter; @@ -74,41 +72,37 @@ public void setRowMapper(Converter rowMapper) { this.rowMapper = rowMapper; } - @Override protected List convertObjectsToByteArrays(List items) { return items .stream() .map(this::mapItemToCsv) - .filter(ArrayUtils::isNotEmpty) - .map(String::new) .filter(Predicate.not(ObjectUtils::isEmpty)) - .map(row -> row.getBytes(StandardCharsets.UTF_8)) .toList(); } @Override - public void afterPropertiesSet() { - super.baseAfterPropertiesSet(() -> { - Table table = getTable(); - - if (Boolean.TRUE.equals(super.writeChannelConfig.getAutodetect())) { - if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) { - logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side"); - } - } else { - Assert.notNull(super.writeChannelConfig.getSchema(), "Schema must be provided"); - - if (tableHasDefinedSchema(table)) { - Assert.isTrue( - Objects.equals(table.getDefinition().getSchema(), super.writeChannelConfig.getSchema()), - "Schema should be the same" - ); - } + protected void performFormatSpecificChecks() { + Table table = getTable(); + + if (Boolean.TRUE.equals(super.writeChannelConfig.getAutodetect())) { + if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) { + logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side"); + } + } else { + Assert.notNull(super.writeChannelConfig.getSchema(), "Schema must be provided"); + + if (tableHasDefinedSchema(table)) { + Assert.isTrue( + Objects.equals(table.getDefinition().getSchema(), super.writeChannelConfig.getSchema()), + "Schema must be the same" + ); } + } + + String format = FormatOptions.csv().getType(); + Assert.isTrue(Objects.equals(format, super.writeChannelConfig.getFormat()), "Only %s format is allowed".formatted(format)); - return null; - }); } private byte[] mapItemToCsv(T t) { @@ -117,7 +111,7 @@ private byte[] mapItemToCsv(T t) { } catch (JsonProcessingException e) { logger.error("Error during processing of the line: ", e); - return null; + return new byte[]{}; } } diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java new file mode 100644 index 00000000..47a6fd6f --- /dev/null +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java @@ -0,0 +1,36 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.writer; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.ItemWriterException; + +/** + * Unchecked {@link Exception} indicating that an error has occurred on during {@link ItemWriter#write(Chunk)}. + */ +public class BigQueryItemWriterException extends ItemWriterException { + + /** + * Create a new {@link BigQueryItemWriterException} based on a message and another {@link Exception}. + * @param message the message for this {@link Exception} + * @param cause the other {@link Exception} + */ + public BigQueryItemWriterException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java index c159cf16..44bc9148 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,9 +19,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.cloud.bigquery.FormatOptions; import com.google.cloud.bigquery.Table; -import org.apache.commons.lang3.ArrayUtils; -import org.springframework.beans.factory.InitializingBean; import org.springframework.core.convert.converter.Converter; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -39,9 +38,11 @@ * @since 0.2.0 * @see JSON */ -public class BigQueryJsonItemWriter extends BigQueryBaseItemWriter implements InitializingBean { +public class BigQueryJsonItemWriter extends BigQueryBaseItemWriter { - private Converter rowMapper; + private static final String LF = "\n"; + + private Converter rowMapper; private ObjectWriter objectWriter; private Class itemClass; @@ -63,11 +64,11 @@ protected void doInitializeProperties(List items) { } /** - * Converter that transforms a single row into a byte array. + * Converter that transforms a single row into a {@link String}. * * @param rowMapper your JSON row mapper */ - public void setRowMapper(Converter rowMapper) { + public void setRowMapper(Converter rowMapper) { this.rowMapper = rowMapper; } @@ -76,41 +77,38 @@ protected List convertObjectsToByteArrays(List items) { return items .stream() .map(this::mapItemToJson) - .filter(ArrayUtils::isNotEmpty) - .map(String::new) - .map(this::convertToNdJson) .filter(Predicate.not(ObjectUtils::isEmpty)) + .map(this::convertToNdJson) .map(row -> row.getBytes(StandardCharsets.UTF_8)) .toList(); } @Override - public void afterPropertiesSet() { - super.baseAfterPropertiesSet(() -> { - Table table = getTable(); - - if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) { - if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) { - logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side"); - } - } else { - Assert.notNull(writeChannelConfig.getSchema(), "Schema must be provided"); - - if (tableHasDefinedSchema(table)) { - Assert.isTrue( - Objects.equals(table.getDefinition().getSchema(), writeChannelConfig.getSchema()), - "Schema should be the same" - ); - } + protected void performFormatSpecificChecks() { + Table table = getTable(); + + if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) { + if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) { + logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side"); } + } else { + Assert.notNull(writeChannelConfig.getSchema(), "Schema must be provided"); + + if (tableHasDefinedSchema(table)) { + Assert.isTrue( + Objects.equals(table.getDefinition().getSchema(), writeChannelConfig.getSchema()), + "Schema must be the same" + ); + } + } - return null; - }); + String format = FormatOptions.json().getType(); + Assert.isTrue(Objects.equals(format, super.writeChannelConfig.getFormat()), "Only %s format is allowed".formatted(format)); } - private byte[] mapItemToJson(T t) { + private String mapItemToJson(T t) { try { - return Objects.isNull(rowMapper) ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t); + return rowMapper == null ? objectWriter.writeValueAsString(t) : rowMapper.convert(t); } catch (JsonProcessingException e) { logger.error("Error during processing of the line: ", e); @@ -124,7 +122,7 @@ private byte[] mapItemToJson(T t) { * {@link com.fasterxml.jackson.databind.ObjectMapper} or any other JSON parser. */ private String convertToNdJson(String json) { - return json.concat(org.apache.commons.lang3.StringUtils.LF); + return json.concat(LF); } } \ No newline at end of file diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java index 09e4578f..911f0c8e 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java @@ -35,7 +35,7 @@ */ public class BigQueryJsonItemWriterBuilder { - private Converter rowMapper; + private Converter rowMapper; private Consumer jobConsumer; private DatasetInfo datasetInfo; @@ -43,13 +43,13 @@ public class BigQueryJsonItemWriterBuilder { private BigQuery bigQuery; /** - * Converts your DTO into a byte array. + * Converts your DTO into a {@link String}. * * @param rowMapper your mapping * @return {@link BigQueryJsonItemWriter} * @see BigQueryJsonItemWriter#setRowMapper(Converter) */ - public BigQueryJsonItemWriterBuilder rowMapper(Converter rowMapper) { + public BigQueryJsonItemWriterBuilder rowMapper(Converter rowMapper) { this.rowMapper = rowMapper; return this; } diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java index 69aa0eeb..1f25f8be 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/package-info.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/BigQueryDataLoader.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/BigQueryDataLoader.java deleted file mode 100644 index f8e280da..00000000 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/BigQueryDataLoader.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2002-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.batch.extensions.bigquery.common; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.WriteChannelConfiguration; -import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; -import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder; -import org.springframework.batch.item.Chunk; - -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; - -public class BigQueryDataLoader { - - /** Order must be defined so later executed queries results could be predictable */ - private static final List PERSONS = Stream - .of(new PersonDto("Volodymyr", 27), new PersonDto("Oleksandra", 26)) - .sorted(Comparator.comparing(PersonDto::name)) - .toList(); - - public static final Chunk CHUNK = new Chunk<>(PERSONS); - - private final BigQuery bigQuery; - - public BigQueryDataLoader(BigQuery bigQuery) { - this.bigQuery = bigQuery; - } - - public void loadCsvSample(String tableName) throws Exception { - AtomicReference job = new AtomicReference<>(); - - WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(TestConstants.DATASET, tableName)) - .setSchema(PersonDto.getBigQuerySchema()) - .setAutodetect(false) - .setFormatOptions(FormatOptions.csv()) - .build(); - - BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() - .bigQuery(bigQuery) - .writeChannelConfig(channelConfiguration) - .jobConsumer(job::set) - .build(); - - writer.afterPropertiesSet(); - writer.write(CHUNK); - job.get().waitFor(); - } - -} \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java index fbc3484b..15069e12 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/PersonDto.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,12 +21,12 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; -@JsonPropertyOrder(value = {"name", "age"}) +@JsonPropertyOrder(value = {TestConstants.NAME, TestConstants.AGE}) public record PersonDto(String name, Integer age) { public static Schema getBigQuerySchema() { - Field nameField = Field.newBuilder("name", StandardSQLTypeName.STRING).build(); - Field ageField = Field.newBuilder("age", StandardSQLTypeName.INT64).build(); + Field nameField = Field.newBuilder(TestConstants.NAME, StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build(); + Field ageField = Field.newBuilder(TestConstants.AGE, StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build(); return Schema.of(nameField, ageField); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/ResultVerifier.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/ResultVerifier.java new file mode 100644 index 00000000..d5e62c41 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/ResultVerifier.java @@ -0,0 +1,54 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.common; + +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.TableResult; +import org.junit.jupiter.api.Assertions; +import org.springframework.batch.item.Chunk; + +import java.util.List; + +public final class ResultVerifier { + private ResultVerifier() { + } + + public static void verifyTableResult(Chunk expected, TableResult actual) { + List actualList = actual.streamValues().toList(); + + Assertions.assertEquals(expected.size(), actual.getTotalRows()); + Assertions.assertEquals(expected.size(), actualList.size()); + + actualList.forEach(field -> { + boolean containsName = expected + .getItems() + .stream() + .map(PersonDto::name) + .anyMatch(name -> field.get(0).getStringValue().equals(name)); + + boolean containsAge = expected + .getItems() + .stream() + .map(PersonDto::age) + .map(Long::valueOf) + .anyMatch(age -> age.compareTo(field.get(1).getLongValue()) == 0); + + Assertions.assertTrue(containsName); + Assertions.assertTrue(containsAge); + }); + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TableUtils.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TableUtils.java new file mode 100644 index 00000000..a90c16e3 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TableUtils.java @@ -0,0 +1,27 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.common; + +import java.util.concurrent.ThreadLocalRandom; + +public final class TableUtils { + private TableUtils() {} + + public static String generateTableName(String testType) { + return testType + "-" + ThreadLocalRandom.current().nextInt(100); + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java index a845e7dc..0e37bd37 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/common/TestConstants.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,13 @@ package org.springframework.batch.extensions.bigquery.common; import com.google.cloud.bigquery.FieldValueList; +import org.springframework.batch.item.Chunk; import org.springframework.core.convert.converter.Converter; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Stream; + public final class TestConstants { private TestConstants() {} @@ -30,7 +35,15 @@ private TestConstants() {} public static final String JSON = "json"; public static final Converter PERSON_MAPPER = res -> new PersonDto( - res.get(NAME).getStringValue(), Long.valueOf(res.get(AGE).getLongValue()).intValue() + res.get(NAME).getStringValue(), res.get(AGE).getNumericValue().intValue() ); + /** Order must be defined so later executed queries results could be predictable */ + private static final List PERSONS = Stream + .of(new PersonDto("Volodymyr", 27), new PersonDto("Oleksandra", 26)) + .sorted(Comparator.comparing(PersonDto::name)) + .toList(); + + public static final Chunk CHUNK = new Chunk<>(PERSONS); + } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BaseEmulatorTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BaseEmulatorTest.java new file mode 100644 index 00000000..07cd75a6 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BaseEmulatorTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.emulator.base; + +import com.google.cloud.NoCredentials; +import com.google.cloud.bigquery.BigQueryOptions; +import org.testcontainers.containers.GenericContainer; + +public abstract class BaseEmulatorTest { + + protected static final String PROJECT = "batch-test"; + + protected static BigQueryOptions.Builder prepareBigQueryBuilder() { + return BigQueryOptions + .newBuilder() + .setProjectId(PROJECT) + .setCredentials(NoCredentials.getInstance()); + } + + protected static String getBigQueryUrl(GenericContainer container) { + return "http://%s:%d".formatted(container.getHost(), container.getMappedPort(BigQueryBaseDockerConfiguration.PORT)); + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BigQueryBaseDockerConfiguration.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BigQueryBaseDockerConfiguration.java new file mode 100644 index 00000000..d5705c7b --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/base/BigQueryBaseDockerConfiguration.java @@ -0,0 +1,29 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.emulator.base; + +import org.testcontainers.containers.GenericContainer; + +public final class BigQueryBaseDockerConfiguration { + + public static final int PORT = 9050; + + public static final GenericContainer CONTAINER = new GenericContainer<>("ghcr.io/goccy/bigquery-emulator:0.6.6") + .withExposedPorts(PORT); + + private BigQueryBaseDockerConfiguration() {} +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/package-info.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/package-info.java new file mode 100644 index 00000000..740c2f80 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These tests are run against BigQuery emulator. It is the next test level after unit tests. + * An attempt to cover integration with fake BigQuery. + * + * @see GitHub + */ +package org.springframework.batch.extensions.bigquery.emulator; \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BaseEmulatorItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BaseEmulatorItemReaderTest.java deleted file mode 100644 index c6ae4d98..00000000 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BaseEmulatorItemReaderTest.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.springframework.batch.extensions.bigquery.emulator.reader; - -import com.google.cloud.NoCredentials; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; -import org.junit.jupiter.api.BeforeAll; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.MountableFile; - -@Testcontainers -abstract class BaseEmulatorItemReaderTest { - private static final int PORT = 9050; - - private static final String PROJECT = "batch-test"; - - @Container - private static final GenericContainer CONTAINER = new GenericContainer<>("ghcr.io/goccy/bigquery-emulator") - .withExposedPorts(PORT) - .withCommand("--project=" + PROJECT, "--data-from-yaml=/test-data.yaml") - .withCopyFileToContainer(MountableFile.forClasspathResource("test-data.yaml"), "/test-data.yaml"); - - protected static BigQuery bigQuery; - - @BeforeAll - static void init() { - bigQuery = BigQueryOptions - .newBuilder() - .setHost("http://%s:%d".formatted(CONTAINER.getHost(), CONTAINER.getMappedPort(PORT))) - .setProjectId(PROJECT) - .setCredentials(NoCredentials.getInstance()) - .build() - .getService(); - } -} \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BigQueryEmulatorItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BigQueryEmulatorItemReaderTest.java index 68fb5d47..f85b717a 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BigQueryEmulatorItemReaderTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/BigQueryEmulatorItemReaderTest.java @@ -1,3 +1,19 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.batch.extensions.bigquery.emulator.reader; import com.google.cloud.bigquery.*; @@ -5,6 +21,7 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.emulator.reader.base.BaseEmulatorItemReaderTest; import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder; diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/BaseEmulatorItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/BaseEmulatorItemReaderTest.java new file mode 100644 index 00000000..cdbca8fd --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/reader/base/BaseEmulatorItemReaderTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.emulator.reader.base; + +import com.google.cloud.bigquery.BigQuery; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.batch.extensions.bigquery.emulator.base.BaseEmulatorTest; +import org.springframework.batch.extensions.bigquery.emulator.base.BigQueryBaseDockerConfiguration; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.MountableFile; + +@Testcontainers +public abstract class BaseEmulatorItemReaderTest extends BaseEmulatorTest { + + @Container + private static final GenericContainer CONTAINER = BigQueryBaseDockerConfiguration.CONTAINER + .withCommand("--project=" + PROJECT, "--log-level=debug", "--data-from-yaml=/reader-test.yaml") + .withCopyFileToContainer(MountableFile.forClasspathResource("reader-test.yaml"), "/reader-test.yaml"); + + protected static BigQuery bigQuery; + + @BeforeAll + static void init() { + bigQuery = prepareBigQueryBuilder().setHost(getBigQueryUrl(CONTAINER)).build().getService(); + } +} \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorCsvItemWriterTest.java new file mode 100644 index 00000000..009078af --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorCsvItemWriterTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.emulator.writer; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.ResultVerifier; +import org.springframework.batch.extensions.bigquery.common.TableUtils; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.emulator.writer.base.BaseEmulatorItemWriterTest; +import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; +import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder; +import org.springframework.batch.item.Chunk; + +class BigQueryEmulatorCsvItemWriterTest extends BaseEmulatorItemWriterTest { + + // TODO find out why data is not persisted into the sqlite database + // at the same time it works fine with json/insertAll job/yaml file + // cover 2 scenarios (predefined schema + generate on the fly) + @Test + @Disabled("Not working at the moment") + void testWrite() throws Exception { + TableId tableId = TableId.of(TestConstants.DATASET, TableUtils.generateTableName(TestConstants.CSV)); + Chunk expectedChunk = Chunk.of(new PersonDto("Ivan", 30)); + + WriteChannelConfiguration channelConfig = WriteChannelConfiguration + .newBuilder(tableId) + .setFormatOptions(FormatOptions.csv()) + .setSchema(PersonDto.getBigQuerySchema()) + .build(); + + BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() + .bigQuery(bigQuery) + .writeChannelConfig(channelConfig) + .build(); + writer.afterPropertiesSet(); + + writer.write(expectedChunk); + + ResultVerifier.verifyTableResult(expectedChunk, bigQuery.listTableData(tableId, BigQuery.TableDataListOption.pageSize(5L))); + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java new file mode 100644 index 00000000..3c48ff4a --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.emulator.writer; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.ResultVerifier; +import org.springframework.batch.extensions.bigquery.common.TableUtils; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.emulator.writer.base.BaseEmulatorItemWriterTest; +import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder; +import org.springframework.batch.item.Chunk; + +import java.util.stream.Stream; + +class BigQueryEmulatorJsonItemWriterTest extends BaseEmulatorItemWriterTest { + + @ParameterizedTest + @MethodSource("tables") + void testWrite(String table, boolean autodetect) throws Exception { + TableId tableId = TableId.of(TestConstants.DATASET, table); + Chunk expectedChunk = Chunk.of(new PersonDto("Ivan", 30)); + + WriteChannelConfiguration channelConfig = WriteChannelConfiguration + .newBuilder(tableId) + .setFormatOptions(FormatOptions.json()) + .setSchema(autodetect ? null : PersonDto.getBigQuerySchema()) + .setAutodetect(autodetect) + .build(); + + BigQueryJsonItemWriter writer = new BigQueryJsonItemWriterBuilder() + .bigQuery(bigQuery) + .writeChannelConfig(channelConfig) + .build(); + writer.afterPropertiesSet(); + + writer.write(expectedChunk); + + ResultVerifier.verifyTableResult(expectedChunk, bigQuery.listTableData(tableId, BigQuery.TableDataListOption.pageSize(5L))); + } + + private static Stream tables() { + return Stream.of( + Arguments.of(TableUtils.generateTableName(TestConstants.JSON), false), + + // TODO auto detect is broken on big query contained side? + // Arguments.of(TableUtils.generateTableName(TestConstants.JSON), true), + + Arguments.of(TestConstants.JSON, false) + ); + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/JsonWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/JsonWriterTest.java deleted file mode 100644 index 7d404e41..00000000 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/JsonWriterTest.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.springframework.batch.extensions.bigquery.emulator.writer; - -// TODO -public class JsonWriterTest { -} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/BaseEmulatorItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/BaseEmulatorItemWriterTest.java new file mode 100644 index 00000000..2b4fb92d --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/BaseEmulatorItemWriterTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.emulator.writer.base; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.google.cloud.bigquery.BigQuery; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.batch.extensions.bigquery.emulator.base.BaseEmulatorTest; +import org.springframework.batch.extensions.bigquery.emulator.base.BigQueryBaseDockerConfiguration; +import org.springframework.core.io.ClassPathResource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.MountableFile; + +import java.io.IOException; +import java.util.Optional; +import java.util.logging.LogManager; + +@Testcontainers +public abstract class BaseEmulatorItemWriterTest extends BaseEmulatorTest { + + @Container + private static final GenericContainer BIG_QUERY_CONTAINER = BigQueryBaseDockerConfiguration.CONTAINER + .withCommand("--project=" + PROJECT, "--log-level=debug", "--data-from-yaml=/writer-test.yaml", "--database=/test-db") + .withCopyFileToContainer(MountableFile.forClasspathResource("writer-test.yaml"), "/writer-test.yaml"); + + protected static BigQuery bigQuery; + private static WireMockServer wireMockServer; + + static { + try { + LogManager.getLogManager().readConfiguration(new ClassPathResource("java-util-logging.properties").getInputStream()); + } catch (IOException e) { + throw new IllegalStateException(); + } + } + + @BeforeAll + static void setupAll() { + SpyResponseExtension extension = new SpyResponseExtension(); + + wireMockServer = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort().extensions(extension)); + wireMockServer.start(); + extension.setWireMockPort(wireMockServer.port()); + + wireMockServer.stubFor( + WireMock.any(WireMock.urlMatching(".*")).willReturn(WireMock.aResponse().proxiedFrom(getBigQueryUrl(BIG_QUERY_CONTAINER))) + ); + + bigQuery = prepareBigQueryBuilder().setHost(wireMockServer.baseUrl()).build().getService(); + } + + @AfterAll + static void shutdownAll() { + Optional.ofNullable(wireMockServer).ifPresent(WireMockServer::stop); + } + +} \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/SpyResponseExtension.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/SpyResponseExtension.java new file mode 100644 index 00000000..da1c9efd --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/base/SpyResponseExtension.java @@ -0,0 +1,76 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.emulator.writer.base; + +import com.github.tomakehurst.wiremock.extension.ResponseTransformerV2; +import com.github.tomakehurst.wiremock.http.HttpHeader; +import com.github.tomakehurst.wiremock.http.Response; +import com.github.tomakehurst.wiremock.stubbing.ServeEvent; +import org.springframework.batch.extensions.bigquery.emulator.base.BigQueryBaseDockerConfiguration; +import wiremock.com.google.common.net.HttpHeaders; + +import java.util.List; +import java.util.function.Predicate; + +public final class SpyResponseExtension implements ResponseTransformerV2 { + + private static final String BQ_DOCKER_URL_PREFIX = "http://0.0.0.0:"; + private static final String BQ_DOCKER_URL = BQ_DOCKER_URL_PREFIX + BigQueryBaseDockerConfiguration.PORT; + + private int wireMockPort; + + @Override + public Response transform(Response response, ServeEvent serveEvent) { + var originalHeaders = response.getHeaders(); + HttpHeader originalLocationHeader = originalHeaders.getHeader(HttpHeaders.LOCATION); + + List locationHeaderValues = originalLocationHeader.getValues(); + boolean containsLocationHeader = locationHeaderValues.stream().anyMatch(s -> s.startsWith(BQ_DOCKER_URL)); + + if (containsLocationHeader) { + if (locationHeaderValues.size() > 1) { + throw new IllegalStateException(); + } + + List headersWithoutLocation = originalHeaders + .all() + .stream() + .filter(Predicate.not(hh -> hh.keyEquals(HttpHeaders.LOCATION))) + .toList(); + + HttpHeader updatedHeader = HttpHeader.httpHeader( + HttpHeaders.LOCATION, locationHeaderValues.get(0).replace(BQ_DOCKER_URL, BQ_DOCKER_URL_PREFIX + wireMockPort) + ); + + return Response.Builder + .like(response) + .but() + .headers(new com.github.tomakehurst.wiremock.http.HttpHeaders(headersWithoutLocation).plus(updatedHeader)) + .build(); + } + return response; + } + + @Override + public String getName() { + return "spy-response-extension"; + } + + public void setWireMockPort(int wireMockPort) { + this.wireMockPort = wireMockPort; + } +} \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/BaseBigQueryGcloudIntegrationTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/BaseBigQueryGcloudIntegrationTest.java index d005d3c3..ee2432ec 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/BaseBigQueryGcloudIntegrationTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/base/BaseBigQueryGcloudIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/package-info.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/package-info.java index 9df3f745..757ab805 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/package-info.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/package-info.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,9 @@ * Test names should follow this pattern: test1, test2, testN. * So later in BigQuery you will see generated table name: csv_test1, csv_test2, csv_testN. * This way, it will be easier to trace errors in BigQuery. - * + *

+ * It is the next test level after emulator tests. + * Real world integration testing. * @see Authentication */ package org.springframework.batch.extensions.bigquery.gcloud; \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/BigQueryGcloudItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/BigQueryGcloudItemReaderTest.java index 57b31c40..f2ff4f8c 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/BigQueryGcloudItemReaderTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/reader/BigQueryGcloudItemReaderTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,28 @@ package org.springframework.batch.extensions.bigquery.gcloud.reader; -import com.google.cloud.bigquery.*; -import org.junit.jupiter.api.*; -import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.gcloud.base.BaseBigQueryGcloudIntegrationTest; import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder; +import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; +import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder; + +import java.util.concurrent.atomic.AtomicReference; class BigQueryGcloudItemReaderTest extends BaseBigQueryGcloudIntegrationTest { @@ -38,7 +52,7 @@ static void init() throws Exception { BIG_QUERY.create(TableInfo.of(TableId.of(TestConstants.DATASET, TestConstants.CSV), tableDefinition)); } - new BigQueryDataLoader(BIG_QUERY).loadCsvSample(TestConstants.CSV); + loadCsvSample(); } @AfterAll @@ -69,8 +83,8 @@ void testBatchQuery() throws Exception { void testInteractiveQuery() throws Exception { BigQueryQueryItemReader reader = new BigQueryQueryItemReaderBuilder() .bigQuery(BIG_QUERY) - .query("SELECT p.name, p.age FROM spring_batch_extensions.%s p ORDER BY p.name LIMIT 2".formatted(TestConstants.CSV)) .rowMapper(TestConstants.PERSON_MAPPER) + .query("SELECT p.name, p.age FROM spring_batch_extensions.%s p ORDER BY p.name LIMIT 2".formatted(TestConstants.CSV)) .build(); reader.afterPropertiesSet(); @@ -80,10 +94,10 @@ void testInteractiveQuery() throws Exception { private void verifyResult(BigQueryQueryItemReader reader) throws Exception { PersonDto actualFirstPerson = reader.read(); - PersonDto expectedFirstPerson = BigQueryDataLoader.CHUNK.getItems().get(0); + PersonDto expectedFirstPerson = TestConstants.CHUNK.getItems().get(0); PersonDto actualSecondPerson = reader.read(); - PersonDto expectedSecondPerson = BigQueryDataLoader.CHUNK.getItems().get(1); + PersonDto expectedSecondPerson = TestConstants.CHUNK.getItems().get(1); PersonDto actualThirdPerson = reader.read(); @@ -98,4 +112,25 @@ private void verifyResult(BigQueryQueryItemReader reader) throws Exce Assertions.assertNull(actualThirdPerson); } + private static void loadCsvSample() throws Exception { + AtomicReference job = new AtomicReference<>(); + + WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration + .newBuilder(TableId.of(TestConstants.DATASET, TestConstants.CSV)) + .setSchema(PersonDto.getBigQuerySchema()) + .setAutodetect(false) + .setFormatOptions(FormatOptions.csv()) + .build(); + + BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() + .bigQuery(BIG_QUERY) + .writeChannelConfig(channelConfiguration) + .jobConsumer(job::set) + .build(); + + writer.afterPropertiesSet(); + writer.write(TestConstants.CHUNK); + job.get().waitFor(); + } + } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BaseBigQueryGcloudItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BaseBigQueryGcloudItemWriterTest.java index e10523d0..ce4f63ac 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BaseBigQueryGcloudItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BaseBigQueryGcloudItemWriterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,13 @@ package org.springframework.batch.extensions.bigquery.gcloud.writer; -import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableResult; import org.junit.jupiter.api.Assertions; -import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader; -import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.ResultVerifier; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.gcloud.base.BaseBigQueryGcloudIntegrationTest; @@ -33,24 +36,7 @@ protected void verifyResults(String tableName) { Assertions.assertNotNull(dataset.getDatasetId()); Assertions.assertNotNull(tableId); - Assertions.assertEquals(BigQueryDataLoader.CHUNK.size(), tableResult.getTotalRows()); - - tableResult - .getValues() - .forEach(field -> { - Assertions.assertTrue( - BigQueryDataLoader.CHUNK.getItems().stream().map(PersonDto::name).anyMatch(name -> field.get(0).getStringValue().equals(name)) - ); - - boolean ageCondition = BigQueryDataLoader.CHUNK - .getItems() - .stream() - .map(PersonDto::age) - .map(Long::valueOf) - .anyMatch(age -> age.compareTo(field.get(1).getLongValue()) == 0); - - Assertions.assertTrue(ageCondition); - }); + ResultVerifier.verifyTableResult(TestConstants.CHUNK, tableResult); } } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudCsvItemWriterTest.java index dd986daf..5ff3638e 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudCsvItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudCsvItemWriterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,15 +16,27 @@ package org.springframework.batch.extensions.bigquery.gcloud.writer; -import com.google.cloud.bigquery.*; -import org.junit.jupiter.api.*; -import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TableUtils; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; class BigQueryGcloudCsvItemWriterTest extends BaseBigQueryGcloudItemWriterTest { @@ -45,14 +57,15 @@ static void cleanup() { BIG_QUERY.delete(TableId.of(TestConstants.DATASET, TestConstants.CSV)); } - @Test - void testWriteCsv() throws Exception { + @ParameterizedTest + @MethodSource("tables") + void testWriteCsv(String tableName, boolean autodetect) throws Exception { AtomicReference job = new AtomicReference<>(); WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(TestConstants.DATASET, TestConstants.CSV)) - .setSchema(PersonDto.getBigQuerySchema()) - .setAutodetect(false) + .newBuilder(TableId.of(TestConstants.DATASET, tableName)) + .setSchema(autodetect ? null : PersonDto.getBigQuerySchema()) + .setAutodetect(autodetect) .setFormatOptions(FormatOptions.csv()) .build(); @@ -63,10 +76,18 @@ void testWriteCsv() throws Exception { .build(); writer.afterPropertiesSet(); - writer.write(BigQueryDataLoader.CHUNK); + writer.write(TestConstants.CHUNK); job.get().waitFor(); - verifyResults(TestConstants.CSV); + verifyResults(tableName); + } + + private static Stream tables() { + return Stream.of( + Arguments.of(TableUtils.generateTableName(TestConstants.CSV), false), + Arguments.of(TableUtils.generateTableName(TestConstants.CSV), true), + Arguments.of(TestConstants.CSV, false) + ); } } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudJsonItemWriterTest.java index 02d44400..23285e84 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/BigQueryGcloudJsonItemWriterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,17 +16,27 @@ package org.springframework.batch.extensions.bigquery.gcloud.writer; -import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.WriteChannelConfiguration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TableUtils; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter; import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; class BigQueryGcloudJsonItemWriterTest extends BaseBigQueryGcloudItemWriterTest { @@ -47,14 +57,15 @@ static void cleanup() { BIG_QUERY.delete(TableId.of(TestConstants.DATASET, TestConstants.JSON)); } - @Test - void testWriteJson() throws Exception { + @ParameterizedTest + @MethodSource("tables") + void testWriteJson(String tableName, boolean autodetect) throws Exception { AtomicReference job = new AtomicReference<>(); WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(TestConstants.DATASET, TestConstants.JSON)) - .setSchema(PersonDto.getBigQuerySchema()) - .setAutodetect(false) + .newBuilder(TableId.of(TestConstants.DATASET, tableName)) + .setSchema(autodetect ? null : PersonDto.getBigQuerySchema()) + .setAutodetect(autodetect) .setFormatOptions(FormatOptions.json()) .build(); @@ -65,10 +76,18 @@ void testWriteJson() throws Exception { .build(); writer.afterPropertiesSet(); - writer.write(BigQueryDataLoader.CHUNK); + writer.write(TestConstants.CHUNK); job.get().waitFor(); - verifyResults(TestConstants.JSON); + verifyResults(tableName); + } + + private static Stream tables() { + return Stream.of( + Arguments.of(TableUtils.generateTableName(TestConstants.JSON), false), + Arguments.of(TableUtils.generateTableName(TestConstants.JSON), true), + Arguments.of(TestConstants.JSON, false) + ); } } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/base/AbstractBigQueryTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/base/AbstractBigQueryTest.java index e91fcdd5..f28a26fb 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/base/AbstractBigQueryTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/base/AbstractBigQueryTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/BigQueryItemReaderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/BigQueryItemReaderTest.java new file mode 100644 index 00000000..ebd40b49 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/BigQueryItemReaderTest.java @@ -0,0 +1,137 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.unit.reader; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.TableResult; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; +import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; +import org.springframework.core.convert.converter.Converter; + +import java.lang.invoke.MethodHandles; +import java.util.List; + +class BigQueryItemReaderTest extends AbstractBigQueryTest { + + @Test + void testSetBigQuery() throws IllegalAccessException, NoSuchFieldException { + BigQueryQueryItemReader reader = new BigQueryQueryItemReader<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup()); + BigQuery bigQuery = prepareMockedBigQuery(); + + reader.setBigQuery(bigQuery); + + BigQuery actualBigQuery = (BigQuery) handle + .findVarHandle(BigQueryQueryItemReader.class, "bigQuery", BigQuery.class) + .get(reader); + + Assertions.assertEquals(bigQuery, actualBigQuery); + } + + @Test + void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException { + BigQueryQueryItemReader reader = new BigQueryQueryItemReader<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup()); + Converter rowMapper = source -> null; + + reader.setRowMapper(rowMapper); + + Converter actualRowMapper = (Converter) handle + .findVarHandle(BigQueryQueryItemReader.class, "rowMapper", Converter.class) + .get(reader); + + Assertions.assertEquals(rowMapper, actualRowMapper); + } + + @Test + void testSetJobConfiguration() throws IllegalAccessException, NoSuchFieldException { + BigQueryQueryItemReader reader = new BigQueryQueryItemReader<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup()); + QueryJobConfiguration jobConfiguration = QueryJobConfiguration.newBuilder("select").build(); + + reader.setJobConfiguration(jobConfiguration); + + QueryJobConfiguration actualJobConfiguration = (QueryJobConfiguration) handle + .findVarHandle(BigQueryQueryItemReader.class, "jobConfiguration", QueryJobConfiguration.class) + .get(reader); + + Assertions.assertEquals(jobConfiguration, actualJobConfiguration); + } + + @Test + void testRead() throws Exception { + BigQuery bigQuery = prepareMockedBigQuery(); + List items = TestConstants.CHUNK.getItems(); + + Field name = Field.of(TestConstants.NAME, StandardSQLTypeName.STRING); + Field age = Field.of(TestConstants.AGE, StandardSQLTypeName.INT64); + + PersonDto person1 = items.get(0); + FieldValue value10 = FieldValue.of(FieldValue.Attribute.PRIMITIVE, person1.name()); + FieldValue value11 = FieldValue.of(FieldValue.Attribute.PRIMITIVE, person1.age().toString()); + + FieldValueList row1 = FieldValueList.of(List.of(value10, value11), name, age); + + TableResult tableResult = Mockito.mock(TableResult.class); + Mockito.when(tableResult.getValues()).thenReturn(List.of(row1)); + + Mockito.when(bigQuery.query(Mockito.any(QueryJobConfiguration.class))).thenReturn(tableResult); + + BigQueryQueryItemReader reader = new BigQueryQueryItemReader<>(); + reader.setRowMapper(TestConstants.PERSON_MAPPER); + reader.setBigQuery(bigQuery); + reader.setJobConfiguration(QueryJobConfiguration.of("select")); + + // First call + PersonDto actual = reader.read(); + Assertions.assertEquals(person1.name(), actual.name()); + Assertions.assertEquals(person1.age(), actual.age()); + + // Second call + Assertions.assertNull(reader.read()); + } + + @Test + void testAfterPropertiesSet() { + BigQueryQueryItemReader reader = new BigQueryQueryItemReader<>(); + + // bigQuery + Assertions.assertThrows(IllegalArgumentException.class, reader::afterPropertiesSet); + + // rowMapper + reader.setBigQuery(prepareMockedBigQuery()); + Assertions.assertThrows(IllegalArgumentException.class, reader::afterPropertiesSet); + + // jobConfiguration + reader.setRowMapper(TestConstants.PERSON_MAPPER); + Assertions.assertThrows(IllegalArgumentException.class, reader::afterPropertiesSet); + + // No exception + reader.setJobConfiguration(QueryJobConfiguration.of("select")); + Assertions.assertDoesNotThrow(reader::afterPropertiesSet); + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryBatchQueryItemReaderBuilderTests.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryBatchQueryItemReaderBuilderTests.java deleted file mode 100644 index f798c6cc..00000000 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryBatchQueryItemReaderBuilderTests.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2002-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.batch.extensions.bigquery.unit.reader.builder; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.QueryJobConfiguration; -import com.google.cloud.bigquery.TableId; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.springframework.batch.extensions.bigquery.common.PersonDto; -import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; -import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder; -import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; - -// TODO verify all corner cases of afterPropertiesSet -class BigQueryBatchQueryItemReaderBuilderTests extends AbstractBigQueryTest { - - @Test - void testCustomQueryItemReader() { - BigQuery mockedBigQuery = prepareMockedBigQuery(); - - QueryJobConfiguration jobConfiguration = QueryJobConfiguration - .newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.persons p LIMIT 2") - .setDestinationTable(TableId.of(TestConstants.DATASET, "persons_duplicate")) - .setPriority(QueryJobConfiguration.Priority.BATCH) - .build(); - - BigQueryQueryItemReader reader = new BigQueryQueryItemReaderBuilder() - .bigQuery(mockedBigQuery) - .jobConfiguration(jobConfiguration) - .rowMapper(TestConstants.PERSON_MAPPER) - .build(); - - reader.afterPropertiesSet(); - - Assertions.assertNotNull(reader); - } - -} \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryInteractiveQueryItemReaderBuilderTests.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryInteractiveQueryItemReaderBuilderTests.java deleted file mode 100644 index 3ff4caf0..00000000 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryInteractiveQueryItemReaderBuilderTests.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2002-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.batch.extensions.bigquery.unit.reader.builder; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.QueryJobConfiguration; -import com.google.cloud.bigquery.TableId; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.springframework.batch.extensions.bigquery.common.PersonDto; -import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; -import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; -import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder; - -class BigQueryInteractiveQueryItemReaderBuilderTests extends AbstractBigQueryTest { - - @Test - void testSimpleQueryItemReader() { - BigQuery mockedBigQuery = prepareMockedBigQuery(); - - BigQueryQueryItemReader reader = new BigQueryQueryItemReaderBuilder() - .bigQuery(mockedBigQuery) - .query("SELECT p.name, p.age FROM spring_batch_extensions.persons p LIMIT 1") - .rowMapper(TestConstants.PERSON_MAPPER) - .build(); - - reader.afterPropertiesSet(); - - Assertions.assertNotNull(reader); - } - - @Test - void testCustomQueryItemReader() { - BigQuery mockedBigQuery = prepareMockedBigQuery(); - - QueryJobConfiguration jobConfiguration = QueryJobConfiguration - .newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.persons p LIMIT 2") - .setDestinationTable(TableId.of(TestConstants.DATASET, "persons_duplicate")) - .build(); - - BigQueryQueryItemReader reader = new BigQueryQueryItemReaderBuilder() - .bigQuery(mockedBigQuery) - .jobConfiguration(jobConfiguration) - .rowMapper(TestConstants.PERSON_MAPPER) - .build(); - - reader.afterPropertiesSet(); - - Assertions.assertNotNull(reader); - } - -} \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryItemReaderBuilderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryItemReaderBuilderTest.java new file mode 100644 index 00000000..ffb03206 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryItemReaderBuilderTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.unit.reader.builder; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableId; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader; +import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder; +import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; +import org.springframework.core.convert.converter.Converter; + +import java.lang.invoke.MethodHandles; + +class BigQueryItemReaderBuilderTest extends AbstractBigQueryTest { + + @Test + void testBuild_WithoutJobConfiguration() throws IllegalAccessException, NoSuchFieldException { + BigQuery mockedBigQuery = prepareMockedBigQuery(); + String query = "SELECT p.name, p.age FROM spring_batch_extensions.persons p LIMIT 1"; + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup()); + + BigQueryQueryItemReader reader = new BigQueryQueryItemReaderBuilder() + .bigQuery(mockedBigQuery) + .query(query) + .rowMapper(TestConstants.PERSON_MAPPER) + .build(); + + Assertions.assertNotNull(reader); + + BigQuery actualBigQuery = (BigQuery) handle + .findVarHandle(BigQueryQueryItemReader.class, "bigQuery", BigQuery.class) + .get(reader); + + Converter actualRowMapper = (Converter) handle + .findVarHandle(BigQueryQueryItemReader.class, "rowMapper", Converter.class) + .get(reader); + + QueryJobConfiguration actualJobConfiguration = (QueryJobConfiguration) handle + .findVarHandle(BigQueryQueryItemReader.class, "jobConfiguration", QueryJobConfiguration.class) + .get(reader); + + Assertions.assertEquals(mockedBigQuery, actualBigQuery); + Assertions.assertEquals(TestConstants.PERSON_MAPPER, actualRowMapper); + Assertions.assertEquals(QueryJobConfiguration.newBuilder(query).build(), actualJobConfiguration); + } + + @Test + void testBuild_WithJobConfiguration() throws IllegalAccessException, NoSuchFieldException { + BigQuery mockedBigQuery = prepareMockedBigQuery(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryQueryItemReader.class, MethodHandles.lookup()); + + QueryJobConfiguration jobConfiguration = QueryJobConfiguration + .newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.persons p LIMIT 2") + .setDestinationTable(TableId.of(TestConstants.DATASET, "persons_duplicate")) + .build(); + + BigQueryQueryItemReader reader = new BigQueryQueryItemReaderBuilder() + .bigQuery(mockedBigQuery) + .jobConfiguration(jobConfiguration) + .rowMapper(TestConstants.PERSON_MAPPER) + .build(); + + Assertions.assertNotNull(reader); + + BigQuery actualBigQuery = (BigQuery) handle + .findVarHandle(BigQueryQueryItemReader.class, "bigQuery", BigQuery.class) + .get(reader); + + Converter actualRowMapper = (Converter) handle + .findVarHandle(BigQueryQueryItemReader.class, "rowMapper", Converter.class) + .get(reader); + + QueryJobConfiguration actualJobConfiguration = (QueryJobConfiguration) handle + .findVarHandle(BigQueryQueryItemReader.class, "jobConfiguration", QueryJobConfiguration.class) + .get(reader); + + Assertions.assertEquals(mockedBigQuery, actualBigQuery); + Assertions.assertEquals(TestConstants.PERSON_MAPPER, actualRowMapper); + Assertions.assertEquals(jobConfiguration, actualJobConfiguration); + } + + @Test + void testBuild_NoQueryProvided() { + Assertions.assertThrows(IllegalArgumentException.class, new BigQueryQueryItemReaderBuilder<>()::build); + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java new file mode 100644 index 00000000..17af4851 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java @@ -0,0 +1,271 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.unit.writer; + +import com.google.cloud.bigquery.*; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; +import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter; + +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +class BigQueryBaseItemWriterTest extends AbstractBigQueryTest { + + private static final TableId TABLE_ID = TableId.of("dataset", "table"); + + @Test + void testGetTable() { + Table expected = Mockito.mock(Table.class); + BigQuery bigQuery = prepareMockedBigQuery(); + Mockito.when(bigQuery.getTable(TABLE_ID)).thenReturn(expected); + + TestWriter writer = new TestWriter(); + writer.setBigQuery(bigQuery); + writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID)); + + Assertions.assertEquals(expected, writer.testGetTable()); + } + + @Test + void testSetDatasetInfo() throws IllegalAccessException, NoSuchFieldException { + TestWriter writer = new TestWriter(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + DatasetInfo expected = DatasetInfo.of("test"); + + writer.setDatasetInfo(expected); + + DatasetInfo actual = (DatasetInfo) handle + .findVarHandle(BigQueryBaseItemWriter.class, "datasetInfo", DatasetInfo.class) + .get(writer); + + Assertions.assertEquals(expected, actual); + } + + @Test + void testSetJobConsumer() throws IllegalAccessException, NoSuchFieldException { + TestWriter writer = new TestWriter(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + Consumer expected = job -> {}; + + writer.setJobConsumer(expected); + + Consumer actual = (Consumer) handle + .findVarHandle(BigQueryBaseItemWriter.class, "jobConsumer", Consumer.class) + .get(writer); + + Assertions.assertEquals(expected, actual); + } + + @Test + void testSetWriteChannelConfig() throws IllegalAccessException, NoSuchFieldException { + TestWriter writer = new TestWriter(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + WriteChannelConfiguration expected = WriteChannelConfiguration.newBuilder(TABLE_ID).build(); + + writer.setWriteChannelConfig(expected); + + WriteChannelConfiguration actual = (WriteChannelConfiguration) handle + .findVarHandle(BigQueryBaseItemWriter.class, "writeChannelConfig", WriteChannelConfiguration.class) + .get(writer); + + Assertions.assertEquals(expected, actual); + } + + @Test + void testSetBigQuery() throws IllegalAccessException, NoSuchFieldException { + TestWriter writer = new TestWriter(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + BigQuery expected = prepareMockedBigQuery(); + + writer.setBigQuery(expected); + + BigQuery actual = (BigQuery) handle + .findVarHandle(BigQueryBaseItemWriter.class, "bigQuery", BigQuery.class) + .get(writer); + + Assertions.assertEquals(expected, actual); + } + + @Test + void testWrite() throws Exception { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + AtomicBoolean consumerCalled = new AtomicBoolean(); + + Job job = Mockito.mock(Job.class); + Mockito.when(job.getJobId()).thenReturn(JobId.newBuilder().build()); + + TableDataWriteChannel channel = Mockito.mock(TableDataWriteChannel.class); + Mockito.when(channel.getJob()).thenReturn(job); + + BigQuery bigQuery = prepareMockedBigQuery(); + Mockito.when(bigQuery.writer(Mockito.any(WriteChannelConfiguration.class))).thenReturn(channel); + + TestWriter writer = new TestWriter(); + writer.setBigQuery(bigQuery); + writer.setJobConsumer(j -> consumerCalled.set(true)); + writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID)); + + writer.write(TestConstants.CHUNK); + + AtomicLong actual = (AtomicLong) handle + .findVarHandle(BigQueryBaseItemWriter.class, "bigQueryWriteCounter", AtomicLong.class) + .get(writer); + + Assertions.assertEquals(1L, actual.get()); + Assertions.assertTrue(consumerCalled.get()); + + Mockito.verify(channel).write(Mockito.any(ByteBuffer.class)); + Mockito.verify(channel).close(); + Mockito.verify(channel, Mockito.times(2)).getJob(); + Mockito.verifyNoMoreInteractions(channel); + } + + @Test + void testBaseAfterPropertiesSet_Exception() { + TestWriter writer = new TestWriter(); + WriteChannelConfiguration.Builder channelBuilder = WriteChannelConfiguration.newBuilder(TABLE_ID); + + // bigQuery + IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("BigQuery service must be provided", actual.getMessage()); + + // writeChannelConfig + writer.setBigQuery(prepareMockedBigQuery()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Write channel configuration must be provided", actual.getMessage()); + + // format + writer.setWriteChannelConfig(channelBuilder.build()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Data format must be provided", actual.getMessage()); + + // bigtable + writer.setWriteChannelConfig(channelBuilder.setFormatOptions(FormatOptions.bigtable()).build()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Google BigTable is not supported", actual.getMessage()); + + // googleSheets + writer.setWriteChannelConfig(channelBuilder.setFormatOptions(FormatOptions.googleSheets()).build()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Google Sheets is not supported", actual.getMessage()); + + // datastore + writer.setWriteChannelConfig(channelBuilder.setFormatOptions(FormatOptions.datastoreBackup()).build()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Google Datastore is not supported", actual.getMessage()); + + // parquet + writer.setWriteChannelConfig(channelBuilder.setFormatOptions(FormatOptions.parquet()).build()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Parquet is not supported", actual.getMessage()); + + // orc + writer.setWriteChannelConfig(channelBuilder.setFormatOptions(FormatOptions.orc()).build()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Orc is not supported", actual.getMessage()); + + // avro + writer.setWriteChannelConfig(channelBuilder.setFormatOptions(FormatOptions.avro()).build()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Avro is not supported", actual.getMessage()); + + // iceberg + writer.setWriteChannelConfig(channelBuilder.setFormatOptions(FormatOptions.iceberg()).build()); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Iceberg is not supported", actual.getMessage()); + + // dataset + writer.setWriteChannelConfig(channelBuilder.setFormatOptions(FormatOptions.csv()).build()); + writer.setDatasetInfo(DatasetInfo.of("dataset-1")); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Dataset should be configured properly", actual.getMessage()); + } + + @Test + void testBaseAfterPropertiesSet_Dataset() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + + DatasetInfo datasetInfo = DatasetInfo.of(TABLE_ID.getDataset()); + BigQuery bigQuery = prepareMockedBigQuery(); + + TestWriter writer = new TestWriter(); + writer.setBigQuery(bigQuery); + writer.setWriteChannelConfig(WriteChannelConfiguration.newBuilder(TABLE_ID).setFormatOptions(FormatOptions.json()).build()); + + writer.afterPropertiesSet(); + + DatasetInfo actual = (DatasetInfo) handle.findVarHandle(BigQueryBaseItemWriter.class, "datasetInfo", DatasetInfo.class).get(writer); + Assertions.assertEquals(datasetInfo, actual); + + Mockito.verify(bigQuery).create(datasetInfo); + Mockito.verify(bigQuery).getDataset(TABLE_ID.getDataset()); + Mockito.verifyNoMoreInteractions(bigQuery); + } + + @Test + void testTableHasDefinedSchema() { + TestWriter writer = new TestWriter(); + Table table = Mockito.mock(Table.class); + + // Null + Assertions.assertFalse(writer.testTableHasDefinedSchema(null)); + + // Without definition + Assertions.assertFalse(writer.testTableHasDefinedSchema(table)); + + // Without schema + StandardTableDefinition.Builder definitionBuilder = StandardTableDefinition.newBuilder(); + Mockito.when(table.getDefinition()).thenReturn(definitionBuilder.build()); + Assertions.assertFalse(writer.testTableHasDefinedSchema(table)); + + // With schema + Mockito.when(table.getDefinition()).thenReturn(definitionBuilder.setSchema(Schema.of(Field.of(TestConstants.AGE, StandardSQLTypeName.STRING))).build()); + Assertions.assertTrue(writer.testTableHasDefinedSchema(table)); + } + + private static final class TestWriter extends BigQueryBaseItemWriter { + + @Override + protected void doInitializeProperties(List items) {} + + @Override + protected List convertObjectsToByteArrays(List items) { + return items.stream().map(Objects::toString).map(String::getBytes).toList(); + } + + @Override + protected void performFormatSpecificChecks() {} + + public Table testGetTable() { + return getTable(); + } + + public boolean testTableHasDefinedSchema(Table table) { + return tableHasDefinedSchema(table); + } + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java new file mode 100644 index 00000000..7c554088 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java @@ -0,0 +1,179 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.unit.writer; + +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.dataformat.csv.CsvFactory; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; +import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; +import org.springframework.core.convert.converter.Converter; + +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.stream.Stream; + +class BigQueryCsvItemWriterTest extends AbstractBigQueryTest { + + private static final TableId TABLE_ID = TableId.of("1", "2"); + + @Test + void testDoInitializeProperties() throws IllegalAccessException, NoSuchFieldException { + TestWriter writer = new TestWriter(); + List items = TestConstants.CHUNK.getItems(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryCsvItemWriter.class, MethodHandles.lookup()); + + // Exception + Assertions.assertThrows(IllegalStateException.class, () -> writer.testInitializeProperties(List.of())); + + // No exception + writer.testInitializeProperties(items); + Assertions.assertEquals( + PersonDto.class.getSimpleName(), + ((Class) handle.findVarHandle(BigQueryCsvItemWriter.class, "itemClass", Class.class).get(writer)).getSimpleName() + ); + ObjectWriter objectWriter = (ObjectWriter) handle.findVarHandle(BigQueryCsvItemWriter.class, "objectWriter", ObjectWriter.class).get(writer); + Assertions.assertInstanceOf(CsvFactory.class, objectWriter.getFactory()); + } + + @Test + void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException { + BigQueryCsvItemWriter reader = new BigQueryCsvItemWriter<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryCsvItemWriter.class, MethodHandles.lookup()); + Converter expected = source -> null; + + reader.setRowMapper(expected); + + Converter actual = (Converter) handle + .findVarHandle(BigQueryCsvItemWriter.class, "rowMapper", Converter.class) + .get(reader); + + Assertions.assertEquals(expected, actual); + } + + @Test + void testConvertObjectsToByteArrays() { + TestWriter writer = new TestWriter(); + + // Empty + Assertions.assertTrue(writer.testConvert(List.of()).isEmpty()); + + // Not empty + writer.setRowMapper(source -> source.toString().getBytes()); + List actual = writer.testConvert(TestConstants.CHUNK.getItems()); + List expected = TestConstants.CHUNK.getItems().stream().map(PersonDto::toString).map(String::getBytes).toList(); + Assertions.assertEquals(expected.size(), actual.size()); + + for (int i = 0; i < actual.size(); i++) { + Assertions.assertArrayEquals(expected.get(i), actual.get(i)); + } + } + + @Test + void testPerformFormatSpecificChecks() { + TestWriter writer = new TestWriter(); + + Table table = Mockito.mock(Table.class); + StandardTableDefinition tableDefinition = StandardTableDefinition + .newBuilder() + .setSchema(Schema.of(Field.of(TestConstants.AGE, StandardSQLTypeName.STRING))) + .build(); + Mockito.when(table.getDefinition()).thenReturn(tableDefinition); + + BigQuery bigQuery = prepareMockedBigQuery(); + Mockito.when(bigQuery.getTable(Mockito.any(TableId.class))).thenReturn(table); + + // schema + writer.setBigQuery(bigQuery); + writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID, FormatOptions.json())); + IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks); + Assertions.assertEquals("Schema must be provided", actual.getMessage()); + + // schema equality + WriteChannelConfiguration channelConfig = WriteChannelConfiguration + .newBuilder(TABLE_ID) + .setSchema(Schema.of(Field.of(TestConstants.NAME, StandardSQLTypeName.STRING))) + .setFormatOptions(FormatOptions.csv()) + .build(); + writer.setWriteChannelConfig(channelConfig); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks); + Assertions.assertEquals("Schema must be the same", actual.getMessage()); + } + + @ParameterizedTest + @MethodSource("invalidFormats") + void testPerformFormatSpecificChecks_Format(FormatOptions formatOptions) { + Table table = Mockito.mock(Table.class); + StandardTableDefinition tableDefinition = StandardTableDefinition + .newBuilder() + .setSchema(Schema.of(Field.of(TestConstants.AGE, StandardSQLTypeName.STRING))) + .build(); + Mockito.when(table.getDefinition()).thenReturn(tableDefinition); + + BigQuery bigQuery = prepareMockedBigQuery(); + Mockito.when(bigQuery.getTable(Mockito.any(TableId.class))).thenReturn(table); + + TestWriter writer = new TestWriter(); + writer.setBigQuery(bigQuery); + + writer.setWriteChannelConfig(WriteChannelConfiguration.newBuilder(TABLE_ID).setAutodetect(true).setFormatOptions(formatOptions).build()); + IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks); + Assertions.assertEquals("Only %s format is allowed".formatted(FormatOptions.csv().getType()), actual.getMessage()); + } + + static Stream invalidFormats() { + return Stream.of( + FormatOptions.parquet(), + FormatOptions.avro(), + FormatOptions.bigtable(), + FormatOptions.datastoreBackup(), + FormatOptions.googleSheets(), + FormatOptions.iceberg(), + FormatOptions.orc(), + FormatOptions.json() + ); + } + + private static final class TestWriter extends BigQueryCsvItemWriter { + public void testInitializeProperties(List items) { + doInitializeProperties(items); + } + + public List testConvert(List items) { + return convertObjectsToByteArrays(items); + } + + public void testPerformFormatSpecificChecks() { + performFormatSpecificChecks(); + } + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java new file mode 100644 index 00000000..a4c86de3 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java @@ -0,0 +1,179 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.unit.writer; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; +import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter; +import org.springframework.core.convert.converter.Converter; + +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.stream.Stream; + +class BigQueryJsonItemWriterTest extends AbstractBigQueryTest { + + private static final TableId TABLE_ID = TableId.of("1", "2"); + + @Test + void testDoInitializeProperties() throws IllegalAccessException, NoSuchFieldException { + TestWriter writer = new TestWriter(); + List items = TestConstants.CHUNK.getItems(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup()); + + // Exception + Assertions.assertThrows(IllegalStateException.class, () -> writer.testInitializeProperties(List.of())); + + // No exception + writer.testInitializeProperties(items); + Assertions.assertEquals( + PersonDto.class.getSimpleName(), + ((Class) handle.findVarHandle(BigQueryJsonItemWriter.class, "itemClass", Class.class).get(writer)).getSimpleName() + ); + ObjectWriter objectWriter = (ObjectWriter) handle.findVarHandle(BigQueryJsonItemWriter.class, "objectWriter", ObjectWriter.class).get(writer); + Assertions.assertInstanceOf(JsonFactory.class, objectWriter.getFactory()); + } + + @Test + void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException { + BigQueryJsonItemWriter reader = new BigQueryJsonItemWriter<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup()); + Converter expected = source -> null; + + reader.setRowMapper(expected); + + Converter actual = (Converter) handle + .findVarHandle(BigQueryJsonItemWriter.class, "rowMapper", Converter.class) + .get(reader); + + Assertions.assertEquals(expected, actual); + } + + @Test + void testConvertObjectsToByteArrays() { + TestWriter writer = new TestWriter(); + + // Empty + Assertions.assertTrue(writer.testConvert(List.of()).isEmpty()); + + // Not empty + writer.setRowMapper(Record::toString); + List actual = writer.testConvert(TestConstants.CHUNK.getItems()); + List expected = TestConstants.CHUNK.getItems().stream().map(PersonDto::toString).map(s -> s.concat("\n")).map(String::getBytes).toList(); + Assertions.assertEquals(expected.size(), actual.size()); + + for (int i = 0; i < actual.size(); i++) { + Assertions.assertArrayEquals(expected.get(i), actual.get(i)); + } + } + + @Test + void testPerformFormatSpecificChecks() { + TestWriter writer = new TestWriter(); + + Table table = Mockito.mock(Table.class); + StandardTableDefinition tableDefinition = StandardTableDefinition + .newBuilder() + .setSchema(Schema.of(Field.of(TestConstants.AGE, StandardSQLTypeName.STRING))) + .build(); + Mockito.when(table.getDefinition()).thenReturn(tableDefinition); + + BigQuery bigQuery = prepareMockedBigQuery(); + Mockito.when(bigQuery.getTable(Mockito.any(TableId.class))).thenReturn(table); + + // schema + writer.setBigQuery(bigQuery); + writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID, FormatOptions.csv())); + IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks); + Assertions.assertEquals("Schema must be provided", actual.getMessage()); + + // schema equality + WriteChannelConfiguration channelConfig = WriteChannelConfiguration + .newBuilder(TABLE_ID) + .setSchema(Schema.of(Field.of(TestConstants.NAME, StandardSQLTypeName.STRING))) + .setFormatOptions(FormatOptions.json()) + .build(); + writer.setWriteChannelConfig(channelConfig); + actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks); + Assertions.assertEquals("Schema must be the same", actual.getMessage()); + } + + @ParameterizedTest + @MethodSource("invalidFormats") + void testPerformFormatSpecificChecks_Format(FormatOptions formatOptions) { + Table table = Mockito.mock(Table.class); + StandardTableDefinition tableDefinition = StandardTableDefinition + .newBuilder() + .setSchema(Schema.of(Field.of(TestConstants.AGE, StandardSQLTypeName.STRING))) + .build(); + Mockito.when(table.getDefinition()).thenReturn(tableDefinition); + + BigQuery bigQuery = prepareMockedBigQuery(); + Mockito.when(bigQuery.getTable(Mockito.any(TableId.class))).thenReturn(table); + + TestWriter writer = new TestWriter(); + writer.setBigQuery(bigQuery); + + writer.setWriteChannelConfig(WriteChannelConfiguration.newBuilder(TABLE_ID).setAutodetect(true).setFormatOptions(formatOptions).build()); + IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks); + Assertions.assertEquals("Only %s format is allowed".formatted(FormatOptions.json().getType()), actual.getMessage()); + } + + static Stream invalidFormats() { + return Stream.of( + FormatOptions.parquet(), + FormatOptions.avro(), + FormatOptions.bigtable(), + FormatOptions.datastoreBackup(), + FormatOptions.googleSheets(), + FormatOptions.iceberg(), + FormatOptions.orc(), + FormatOptions.csv() + ); + } + + private static final class TestWriter extends BigQueryJsonItemWriter { + public void testInitializeProperties(List items) { + doInitializeProperties(items); + } + + public List testConvert(List items) { + return convertObjectsToByteArrays(items); + } + + public void testPerformFormatSpecificChecks() { + performFormatSpecificChecks(); + } + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryCsvItemWriterBuilderTests.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryCsvItemWriterBuilderTests.java index 19b59511..73f7563a 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryCsvItemWriterBuilderTests.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryCsvItemWriterBuilderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,84 +16,77 @@ package org.springframework.batch.extensions.bigquery.unit.writer.builder; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.WriteChannelConfiguration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; +import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter; import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter; import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder; +import org.springframework.core.convert.converter.Converter; -class BigQueryCsvItemWriterBuilderTests extends AbstractBigQueryTest { - - private static final String TABLE = "persons_csv"; +import java.lang.invoke.MethodHandles; +import java.util.function.Consumer; - private final Log logger = LogFactory.getLog(getClass()); +class BigQueryCsvItemWriterBuilderTests extends AbstractBigQueryTest { - /** - * Example how CSV writer is expected to be built without {@link org.springframework.context.annotation.Bean} annotation. - */ @Test - void testCsvWriterWithRowMapper() { - BigQuery mockedBigQuery = prepareMockedBigQuery(); - CsvMapper csvMapper = new CsvMapper(); + void testBuild() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup csvWriterHandle = MethodHandles.privateLookupIn(BigQueryCsvItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup baseWriterHandle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + + Converter rowMapper = source -> new byte[0]; DatasetInfo datasetInfo = DatasetInfo.newBuilder(TestConstants.DATASET).setLocation("europe-west-2").build(); + Consumer jobConsumer = job -> {}; + BigQuery mockedBigQuery = prepareMockedBigQuery(); WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), TABLE)) - .setAutodetect(true) + .newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), "persons_csv")) .setFormatOptions(FormatOptions.csv()) .build(); BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() - .bigQuery(mockedBigQuery) - .rowMapper(dto -> convertDtoToCsvByteArray(csvMapper, dto)) + .rowMapper(rowMapper) .writeChannelConfig(writeConfiguration) + .jobConsumer(jobConsumer) + .bigQuery(mockedBigQuery) .datasetInfo(datasetInfo) - .jobConsumer(job -> this.logger.debug("Job with id: " + job.getJobId() + " is created")) .build(); - writer.afterPropertiesSet(); - Assertions.assertNotNull(writer); - } - @Test - void testCsvWriterWithCsvMapper() { - BigQuery mockedBigQuery = prepareMockedBigQuery(); + Converter actualRowMapper = (Converter) csvWriterHandle + .findVarHandle(BigQueryCsvItemWriter.class, "rowMapper", Converter.class) + .get(writer); - WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(TestConstants.DATASET, TABLE)) - .setAutodetect(true) - .setFormatOptions(FormatOptions.csv()) - .build(); + WriteChannelConfiguration actualWriteChannelConfig = (WriteChannelConfiguration) csvWriterHandle + .findVarHandle(BigQueryCsvItemWriter.class, "writeChannelConfig", WriteChannelConfiguration.class) + .get(writer); - BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder() - .bigQuery(mockedBigQuery) - .writeChannelConfig(writeConfiguration) - .build(); + Consumer actualJobConsumer = (Consumer) baseWriterHandle + .findVarHandle(BigQueryBaseItemWriter.class, "jobConsumer", Consumer.class) + .get(writer); - writer.afterPropertiesSet(); + BigQuery actualBigQuery = (BigQuery) baseWriterHandle + .findVarHandle(BigQueryBaseItemWriter.class, "bigQuery", BigQuery.class) + .get(writer); - Assertions.assertNotNull(writer); - } + DatasetInfo actualDatasetInfo = (DatasetInfo) baseWriterHandle + .findVarHandle(BigQueryCsvItemWriter.class, "datasetInfo", DatasetInfo.class) + .get(writer); - private byte[] convertDtoToCsvByteArray(CsvMapper csvMapper, PersonDto dto) { - try { - return csvMapper.writerWithSchemaFor(PersonDto.class).writeValueAsBytes(dto); - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + Assertions.assertEquals(rowMapper, actualRowMapper); + Assertions.assertEquals(writeConfiguration, actualWriteChannelConfig); + Assertions.assertEquals(jobConsumer, actualJobConsumer); + Assertions.assertEquals(mockedBigQuery, actualBigQuery); + Assertions.assertEquals(datasetInfo, actualDatasetInfo); } } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java index 90973a96..e826f174 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,85 +16,77 @@ package org.springframework.batch.extensions.bigquery.unit.writer.builder; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.WriteChannelConfiguration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; +import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter; import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter; import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder; +import org.springframework.core.convert.converter.Converter; -class BigQueryJsonItemWriterBuilderTests extends AbstractBigQueryTest { - - private static final String TABLE = "persons_json"; +import java.lang.invoke.MethodHandles; +import java.util.function.Consumer; - private final Log logger = LogFactory.getLog(getClass()); +class BigQueryJsonItemWriterBuilderTests extends AbstractBigQueryTest { - /** - * Example how JSON writer is expected to be built without {@link org.springframework.context.annotation.Bean} annotation. - */ @Test - void testJsonWriterWithRowMapper() { + void testBuild() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup jsonWriterHandle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup baseWriterHandle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + + Converter rowMapper = source -> ""; + DatasetInfo datasetInfo = DatasetInfo.newBuilder(TestConstants.DATASET).setLocation("europe-west-2").build(); + Consumer jobConsumer = job -> {}; BigQuery mockedBigQuery = prepareMockedBigQuery(); - ObjectMapper objectMapper = new ObjectMapper(); WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(TestConstants.DATASET, TABLE)) + .newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), "persons_json")) .setFormatOptions(FormatOptions.json()) - .setSchema(Schema.of( - Field.newBuilder("name", StandardSQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build() - )) .build(); BigQueryJsonItemWriter writer = new BigQueryJsonItemWriterBuilder() - .bigQuery(mockedBigQuery) - .rowMapper(dto -> convertDtoToJsonByteArray(objectMapper, dto)) + .rowMapper(rowMapper) .writeChannelConfig(writeConfiguration) - .jobConsumer(job -> this.logger.debug("Job with id: {}" + job.getJobId() + " is created")) + .jobConsumer(jobConsumer) + .bigQuery(mockedBigQuery) + .datasetInfo(datasetInfo) .build(); - writer.afterPropertiesSet(); - Assertions.assertNotNull(writer); - } - @Test - void testCsvWriterWithJsonMapper() { - BigQuery mockedBigQuery = prepareMockedBigQuery(); + Converter actualRowMapper = (Converter) jsonWriterHandle + .findVarHandle(BigQueryJsonItemWriter.class, "rowMapper", Converter.class) + .get(writer); - WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(TestConstants.DATASET, TABLE)) - .setAutodetect(true) - .setFormatOptions(FormatOptions.json()) - .build(); + WriteChannelConfiguration actualWriteChannelConfig = (WriteChannelConfiguration) jsonWriterHandle + .findVarHandle(BigQueryJsonItemWriter.class, "writeChannelConfig", WriteChannelConfiguration.class) + .get(writer); - BigQueryJsonItemWriter writer = new BigQueryJsonItemWriterBuilder() - .bigQuery(mockedBigQuery) - .writeChannelConfig(writeConfiguration) - .build(); + Consumer actualJobConsumer = (Consumer) baseWriterHandle + .findVarHandle(BigQueryBaseItemWriter.class, "jobConsumer", Consumer.class) + .get(writer); - writer.afterPropertiesSet(); + BigQuery actualBigQuery = (BigQuery) baseWriterHandle + .findVarHandle(BigQueryBaseItemWriter.class, "bigQuery", BigQuery.class) + .get(writer); - Assertions.assertNotNull(writer); - } + DatasetInfo actualDatasetInfo = (DatasetInfo) baseWriterHandle + .findVarHandle(BigQueryJsonItemWriter.class, "datasetInfo", DatasetInfo.class) + .get(writer); - private byte[] convertDtoToJsonByteArray(ObjectMapper objectMapper, PersonDto dto) { - try { - return objectMapper.writeValueAsBytes(dto); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + Assertions.assertEquals(rowMapper, actualRowMapper); + Assertions.assertEquals(writeConfiguration, actualWriteChannelConfig); + Assertions.assertEquals(jobConsumer, actualJobConsumer); + Assertions.assertEquals(mockedBigQuery, actualBigQuery); + Assertions.assertEquals(datasetInfo, actualDatasetInfo); } } \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/resources/java-util-logging.properties b/spring-batch-bigquery/src/test/resources/java-util-logging.properties new file mode 100644 index 00000000..e351c373 --- /dev/null +++ b/spring-batch-bigquery/src/test/resources/java-util-logging.properties @@ -0,0 +1,2 @@ +handlers = org.slf4j.bridge.SLF4JBridgeHandler +com.google.level = FINEST \ No newline at end of file diff --git a/spring-batch-bigquery/src/test/resources/logback.xml b/spring-batch-bigquery/src/test/resources/logback.xml index d227b170..feb55f5f 100644 --- a/spring-batch-bigquery/src/test/resources/logback.xml +++ b/spring-batch-bigquery/src/test/resources/logback.xml @@ -12,6 +12,8 @@ + + diff --git a/spring-batch-bigquery/src/test/resources/test-data.yaml b/spring-batch-bigquery/src/test/resources/reader-test.yaml similarity index 93% rename from spring-batch-bigquery/src/test/resources/test-data.yaml rename to spring-batch-bigquery/src/test/resources/reader-test.yaml index ff477c56..308472e3 100644 --- a/spring-batch-bigquery/src/test/resources/test-data.yaml +++ b/spring-batch-bigquery/src/test/resources/reader-test.yaml @@ -9,7 +9,7 @@ projects: type: STRING mode: REQUIRED - name: age - type: INTEGER + type: INT64 mode: REQUIRED data: - name: Volodymyr diff --git a/spring-batch-bigquery/src/test/resources/writer-test.yaml b/spring-batch-bigquery/src/test/resources/writer-test.yaml new file mode 100644 index 00000000..86086c72 --- /dev/null +++ b/spring-batch-bigquery/src/test/resources/writer-test.yaml @@ -0,0 +1,13 @@ +projects: + - id: batch-test + datasets: + - id: spring_batch_extensions + tables: + - id: json + columns: + - name: name + type: STRING + mode: REQUIRED + - name: age + type: INT64 + mode: REQUIRED \ No newline at end of file