Skip to content

[bq] introduce BigQuery Storage Write API (JSON) #179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions spring-batch-bigquery/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,41 @@

Spring Batch extension which contains an `ItemWriter` and `ItemReader` implementations for https://cloud.google.com/bigquery[BigQuery].

`ItemWriter` supports next formats (https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs]):
`ItemWriter` support:

[cols="h,1,1"]
|===
| |https://cloud.google.com/bigquery/docs/batch-loading-data[Load job] |https://cloud.google.com/bigquery/docs/write-api#committed_type[Write API (Commited)]

|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported
|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported |
|===

`ItemReader` support:

[cols="h,1"]
|===

|https://en.wikipedia.org/wiki/JSON[JSON] |Supported
|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported
|===

* https://en.wikipedia.org/wiki/Comma-separated_values[CSV]
* https://en.wikipedia.org/wiki/JSON[JSON]

Based on https://github.com/googleapis/java-bigquery[Java BigQuery].

== Example of `BigQueryCsvItemWriter`
== Example of `BigQueryLoadJobCsvItemWriter`

[source,java]
----
@Bean
BigQueryCsvItemWriter<MyDto> bigQueryCsvWriter() {
BigQueryLoadJobCsvItemWriter<MyDto> bigQueryCsvWriter() {
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
.newBuilder(TableId.of("csv_dataset", "csv_table"))
.setAutodetect(true)
.setFormatOptions(FormatOptions.csv())
.build();

return new BigQueryCsvItemWriterBuilder<MyDto>()
return new BigQueryLoadJobCsvItemWriterBuilder<MyDto>()
.bigQuery(bigQueryService)
.writeChannelConfig(writeConfiguration)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.batch.extensions.bigquery.reader.builder;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.QueryJobConfiguration;
import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader;
Expand Down Expand Up @@ -113,7 +114,7 @@ public BigQueryQueryItemReaderBuilder<T> targetType(final Class<T> targetType) {
public BigQueryQueryItemReader<T> build() {
final BigQueryQueryItemReader<T> reader = new BigQueryQueryItemReader<>();

reader.setBigQuery(this.bigQuery);
reader.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);

if (this.rowMapper == null) {
Assert.notNull(this.targetType, "No target type provided");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.writer;
package org.springframework.batch.extensions.bigquery.writer.loadjob;

import com.google.cloud.bigquery.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
Expand All @@ -34,13 +35,13 @@
import java.util.function.Consumer;

/**
* Base class that holds shared code for JSON and CSV writers.
* Base class that holds shared code for load job JSON and CSV writers.
*
* @param <T> your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.1.0
*/
public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T>, InitializingBean {
public abstract class BigQueryLoadJobBaseItemWriter<T> implements ItemWriter<T>, InitializingBean {

/** Logger that can be reused */
protected final Log logger = LogFactory.getLog(getClass());
Expand Down Expand Up @@ -71,7 +72,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T>, Initia
/**
* Fetches table from the provided configuration.
*
* @return {@link Table} that is described in {@link BigQueryBaseItemWriter#writeChannelConfig}
* @return {@link Table} that is described in {@link BigQueryLoadJobBaseItemWriter#writeChannelConfig}
*/
protected Table getTable() {
return this.bigQuery.getTable(this.writeChannelConfig.getDestinationTable());
Expand Down Expand Up @@ -119,8 +120,8 @@ public void write(final Chunk<? extends T> chunk) throws Exception {
final List<? extends T> items = chunk.getItems();
doInitializeProperties(items);

if (this.logger.isDebugEnabled()) {
this.logger.debug(String.format("Mapping %d elements", items.size()));
if (logger.isDebugEnabled()) {
logger.debug(String.format("Mapping %d elements", items.size()));
}

doWriteDataToBigQuery(mapDataToBigQueryFormat(items));
Expand All @@ -147,8 +148,8 @@ private ByteBuffer mapDataToBigQueryFormat(final List<? extends T> items) throws
}

private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Writing data to BigQuery");
if (logger.isDebugEnabled()) {
logger.debug("Writing data to BigQuery");
}

TableDataWriteChannel writeChannel = null;
Expand All @@ -174,8 +175,8 @@ private void doWriteDataToBigQuery(final ByteBuffer byteBuffer) {
}
}

if (this.logger.isDebugEnabled()) {
this.logger.debug(logMessage);
if (logger.isDebugEnabled()) {
logger.debug(logMessage);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.writer;
package org.springframework.batch.extensions.bigquery.writer.loadjob.csv;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Table;
import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
Expand All @@ -37,7 +38,7 @@
* @since 0.2.0
* @see <a href="https://en.wikipedia.org/wiki/Comma-separated_values">CSV</a>
*/
public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> {
public class BigQueryLoadJobCsvItemWriter<T> extends BigQueryLoadJobBaseItemWriter<T> {

private Converter<T, byte[]> rowMapper;
private ObjectWriter objectWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,20 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.writer.builder;
package org.springframework.batch.extensions.bigquery.writer.loadjob.csv.builder;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter;
import org.springframework.batch.extensions.bigquery.writer.loadjob.csv.BigQueryLoadJobCsvItemWriter;
import org.springframework.core.convert.converter.Converter;

import java.util.function.Consumer;

/**
* A builder for {@link BigQueryCsvItemWriter}.
* A builder for {@link BigQueryLoadJobCsvItemWriter}.
*
* @param <T> your DTO type
* @author Volodymyr Perebykivskyi
Expand All @@ -47,7 +48,7 @@ public class BigQueryCsvItemWriterBuilder<T> {
*
* @param rowMapper your row mapper
* @return {@link BigQueryCsvItemWriterBuilder}
* @see BigQueryCsvItemWriter#setRowMapper(Converter)
* @see BigQueryLoadJobCsvItemWriter#setRowMapper(Converter)
*/
public BigQueryCsvItemWriterBuilder<T> rowMapper(Converter<T, byte[]> rowMapper) {
this.rowMapper = rowMapper;
Expand All @@ -59,7 +60,7 @@ public BigQueryCsvItemWriterBuilder<T> rowMapper(Converter<T, byte[]> rowMapper)
*
* @param datasetInfo BigQuery dataset info
* @return {@link BigQueryCsvItemWriterBuilder}
* @see BigQueryCsvItemWriter#setDatasetInfo(DatasetInfo)
* @see BigQueryLoadJobCsvItemWriter#setDatasetInfo(DatasetInfo)
*/
public BigQueryCsvItemWriterBuilder<T> datasetInfo(DatasetInfo datasetInfo) {
this.datasetInfo = datasetInfo;
Expand All @@ -71,7 +72,7 @@ public BigQueryCsvItemWriterBuilder<T> datasetInfo(DatasetInfo datasetInfo) {
*
* @param consumer your consumer
* @return {@link BigQueryCsvItemWriterBuilder}
* @see BigQueryCsvItemWriter#setJobConsumer(Consumer)
* @see BigQueryLoadJobCsvItemWriter#setJobConsumer(Consumer)
*/
public BigQueryCsvItemWriterBuilder<T> jobConsumer(Consumer<Job> consumer) {
this.jobConsumer = consumer;
Expand All @@ -83,7 +84,7 @@ public BigQueryCsvItemWriterBuilder<T> jobConsumer(Consumer<Job> consumer) {
*
* @param configuration BigQuery channel configuration
* @return {@link BigQueryCsvItemWriterBuilder}
* @see BigQueryCsvItemWriter#setWriteChannelConfig(WriteChannelConfiguration)
* @see BigQueryLoadJobCsvItemWriter#setWriteChannelConfig(WriteChannelConfiguration)
*/
public BigQueryCsvItemWriterBuilder<T> writeChannelConfig(WriteChannelConfiguration configuration) {
this.writeChannelConfig = configuration;
Expand All @@ -95,25 +96,26 @@ public BigQueryCsvItemWriterBuilder<T> writeChannelConfig(WriteChannelConfigurat
*
* @param bigQuery BigQuery service
* @return {@link BigQueryCsvItemWriterBuilder}
* @see BigQueryCsvItemWriter#setBigQuery(BigQuery)
* @see BigQueryLoadJobCsvItemWriter#setBigQuery(BigQuery)
*/
public BigQueryCsvItemWriterBuilder<T> bigQuery(BigQuery bigQuery) {
this.bigQuery = bigQuery;
return this;
}

/**
* Please remember about {@link BigQueryCsvItemWriter#afterPropertiesSet()}.
* Please remember about {@link BigQueryLoadJobCsvItemWriter#afterPropertiesSet()}.
*
* @return {@link BigQueryCsvItemWriter}
* @return {@link BigQueryLoadJobCsvItemWriter}
*/
public BigQueryCsvItemWriter<T> build() {
BigQueryCsvItemWriter<T> writer = new BigQueryCsvItemWriter<>();
public BigQueryLoadJobCsvItemWriter<T> build() {
BigQueryLoadJobCsvItemWriter<T> writer = new BigQueryLoadJobCsvItemWriter<>();

writer.setBigQuery(this.bigQuery == null ? BigQueryOptions.getDefaultInstance().getService() : this.bigQuery);

writer.setRowMapper(this.rowMapper);
writer.setWriteChannelConfig(this.writeChannelConfig);
writer.setJobConsumer(this.jobConsumer);
writer.setBigQuery(this.bigQuery);
writer.setDatasetInfo(this.datasetInfo);

return writer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.writer;
package org.springframework.batch.extensions.bigquery.writer.loadjob.json;

import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Table;
import org.springframework.batch.extensions.bigquery.writer.loadjob.BigQueryLoadJobBaseItemWriter;
import org.springframework.batch.item.json.JsonObjectMarshaller;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
Expand All @@ -28,14 +29,14 @@
import java.util.function.Predicate;

/**
* JSON writer for BigQuery.
* JSON writer for BigQuery using Load Job.
*
* @param <T> your DTO type
* @author Volodymyr Perebykivskyi
* @since 0.2.0
* @see <a href="https://en.wikipedia.org/wiki/JSON">JSON</a>
*/
public class BigQueryJsonItemWriter<T> extends BigQueryBaseItemWriter<T> {
public class BigQueryLoadJobJsonItemWriter<T> extends BigQueryLoadJobBaseItemWriter<T> {

private static final String LF = "\n";

Expand All @@ -59,12 +60,12 @@ protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {

@Override
protected void performFormatSpecificChecks() {
Assert.notNull(this.marshaller, "Marshaller is mandatory");
Assert.notNull(this.marshaller, "Marshaller must be provided");

Table table = getTable();

if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) {
if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) {
if (tableHasDefinedSchema(table) && logger.isWarnEnabled()) {
logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
}
} else {
Expand Down
Loading