Skip to content

Improve test coverage #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions spring-batch-bigquery/README.adoc
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
= spring-batch-bigquery

Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery].
It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs].
Spring Batch extension which contains an `ItemWriter` and `ItemReader` implementations for https://cloud.google.com/bigquery[BigQuery].

== Configuration of `BigQueryCsvItemWriter`
`ItemWriter` supports next formats (https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs]):

Next to the https://docs.spring.io/spring-batch/reference/html/configureJob.html[configuration of Spring Batch] one needs to configure the `BigQueryCsvItemWriter`.
* https://en.wikipedia.org/wiki/Comma-separated_values[CSV]
* https://en.wikipedia.org/wiki/JSON[JSON]

Based on https://github.com/googleapis/java-bigquery[Java BigQuery].

== Example of `BigQueryCsvItemWriter`

[source,java]
----
Expand All @@ -17,24 +21,25 @@ BigQueryCsvItemWriter<MyDto> bigQueryCsvWriter() {
.setFormatOptions(FormatOptions.csv())
.build();

BigQueryCsvItemWriter<MyDto> writer = new BigQueryCsvItemWriterBuilder<MyDto>()
.bigQuery(mockedBigQuery)
return new BigQueryCsvItemWriterBuilder<MyDto>()
.bigQuery(bigQueryService)
.writeChannelConfig(writeConfiguration)
.build();
}
----

Additional examples could be found in https://github.com/spring-projects/spring-batch-extensions/blob/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/writer/builder/[here].
== Example of `BigQueryItemReader`

== Configuration properties
[cols="1,1,4"]
.Properties for an item writer
|===
| Property | Required | Description
[source,java]
----
@Bean
BigQueryItemReader<PersonDto> bigQueryReader() {
return new BigQueryQueryItemReaderBuilder<PersonDto>()
.bigQuery(bigQueryService)
.rowMapper(res -> new PersonDto(res.get("name").getStringValue()))
.query("SELECT p.name FROM persons p")
.build();
}
----

| `bigQuery` | yes | BigQuery object that provided by BigQuery Java Library. Responsible for connection with BigQuery.
| `writeChannelConfig` | yes | BigQuery write channel config provided by BigQuery Java Library. Responsible for configuring data type, data channel, jobs that will be sent to BigQuery.
| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to a byte array.
| `datasetInfo` | no | Your way to customize how to create BigQuery dataset.
| `jobConsumer` | no | Your custom handler for BigQuery Job provided by BigQuery Java Library.
|===
Additional examples could be found in the https://github.com/spring-projects/spring-batch-extensions/tree/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery[test folder].
37 changes: 26 additions & 11 deletions spring-batch-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.5</version>
<version>3.5.0</version>
<relativePath/>
</parent>

Expand Down Expand Up @@ -62,11 +64,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.45.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>2.51.0</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
Expand All @@ -84,6 +82,11 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand All @@ -94,6 +97,18 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wiremock</groupId>
<artifactId>wiremock-standalone</artifactId>
<version>3.13.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down Expand Up @@ -123,23 +138,23 @@
</execution>
</executions>
</plugin>
<!-- Runs tests -->
<!-- Run tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<!-- Google cloud tests are omitted because they are designed to be run locally -->
<!-- Google Cloud tests are omitted because they are designed to be run locally -->
<!-- BigQuery Docker emulator tests are omitted because it is not stable yet -->
<include>**/unit/**</include>
<include>**/emulator/**</include>
</includes>
</configuration>
</plugin>
<!-- Generates a flattened version of the pom.xml, used instead of the original -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
<version>1.6.0</version>
<version>1.7.0</version>
<executions>
<execution>
<id>flatten</id>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,7 +64,7 @@ public void setBigQuery(BigQuery bigQuery) {
}

/**
* Row mapper which transforms single BigQuery row into desired type.
* Row mapper which transforms single BigQuery row into a desired type.
*
* @param rowMapper your row mapper
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,10 +19,10 @@
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.QueryJobConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* A builder for {@link BigQueryQueryItemReader}.
Expand Down Expand Up @@ -103,7 +103,7 @@ public BigQueryQueryItemReader<T> build() {
reader.setRowMapper(this.rowMapper);

if (this.jobConfiguration == null) {
Assert.isTrue(StringUtils.isNotBlank(this.query), "No query provided");
Assert.isTrue(StringUtils.hasText(this.query), "No query provided");
reader.setJobConfiguration(QueryJobConfiguration.newBuilder(this.query).build());
} else {
reader.setJobConfiguration(this.jobConfiguration);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,21 +16,12 @@

package org.springframework.batch.extensions.bigquery.writer;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.apache.commons.lang3.BooleanUtils;
import com.google.cloud.bigquery.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;

import java.io.ByteArrayOutputStream;
Expand All @@ -41,7 +32,6 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Base class that holds shared code for JSON and CSV writers.
Expand All @@ -50,7 +40,7 @@
* @author Volodymyr Perebykivskyi
* @since 0.1.0
*/
public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T>, InitializingBean {

/** Logger that can be reused */
protected final Log logger = LogFactory.getLog(getClass());
Expand All @@ -76,6 +66,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {

private BigQuery bigQuery;

private boolean writeFailed;

/**
* Fetches table from the provided configuration.
Expand Down Expand Up @@ -168,18 +159,25 @@ private void doWriteDataToBigQuery(ByteBuffer byteBuffer) throws IOException {
writer.write(byteBuffer);
writeChannel = writer;
}
catch (Exception e) {
writeFailed = true;
logger.error("BigQuery error", e);
throw new BigQueryItemWriterException("Error on write happened", e);
}
finally {
String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet();

if (writeChannel != null) {
logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob();
if (this.jobConsumer != null) {
this.jobConsumer.accept(writeChannel.getJob());
if (!writeFailed) {
String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet();

if (writeChannel != null) {
logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob();
if (this.jobConsumer != null) {
this.jobConsumer.accept(writeChannel.getJob());
}
}
}

if (this.logger.isDebugEnabled()) {
this.logger.debug(logMessage);
if (this.logger.isDebugEnabled()) {
this.logger.debug(logMessage);
}
}
}
}
Expand All @@ -194,23 +192,22 @@ private TableDataWriteChannel getWriteChannel() {

/**
* Performs common validation for CSV and JSON types.
*
* @param formatSpecificChecks supplies type-specific validation
*/
protected void baseAfterPropertiesSet(Supplier<Void> formatSpecificChecks) {
@Override
public void afterPropertiesSet() {
Assert.notNull(this.bigQuery, "BigQuery service must be provided");
Assert.notNull(this.writeChannelConfig, "Write channel configuration must be provided");
Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided");

Assert.isTrue(BooleanUtils.isFalse(isBigtable()), "Google BigTable is not supported");
Assert.isTrue(BooleanUtils.isFalse(isGoogleSheets()), "Google Sheets is not supported");
Assert.isTrue(BooleanUtils.isFalse(isDatastore()), "Google Datastore is not supported");
Assert.isTrue(BooleanUtils.isFalse(isParquet()), "Parquet is not supported");
Assert.isTrue(BooleanUtils.isFalse(isOrc()), "Orc is not supported");
Assert.isTrue(BooleanUtils.isFalse(isAvro()), "Avro is not supported");

formatSpecificChecks.get();
Assert.isTrue(!isBigtable(), "Google BigTable is not supported");
Assert.isTrue(!isGoogleSheets(), "Google Sheets is not supported");
Assert.isTrue(!isDatastore(), "Google Datastore is not supported");
Assert.isTrue(!isParquet(), "Parquet is not supported");
Assert.isTrue(!isOrc(), "Orc is not supported");
Assert.isTrue(!isAvro(), "Avro is not supported");
Assert.isTrue(!isIceberg(), "Iceberg is not supported");

Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided");
performFormatSpecificChecks();

String dataset = this.writeChannelConfig.getDestinationTable().getDataset();
if (this.datasetInfo == null) {
Expand Down Expand Up @@ -262,6 +259,10 @@ private boolean isDatastore() {
return FormatOptions.datastoreBackup().getType().equals(this.writeChannelConfig.getFormat());
}

private boolean isIceberg() {
return FormatOptions.iceberg().getType().equals(this.writeChannelConfig.getFormat());
}

/**
* Schema can be computed on the BigQuery side during upload,
* so it is good to know when schema is supplied by user manually.
Expand Down Expand Up @@ -294,4 +295,9 @@ protected boolean tableHasDefinedSchema(Table table) {
*/
protected abstract List<byte[]> convertObjectsToByteArrays(List<? extends T> items);

/**
* Performs specific checks that are unique to the format.
*/
protected abstract void performFormatSpecificChecks();

}
Loading