Skip to content

[bq] introduce BigQuery Storage Pending Write API (JSON) #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions spring-batch-bigquery/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ Spring Batch extension which contains an `ItemWriter` and `ItemReader` implement

`ItemWriter` support:

[cols="h,1,1"]
[cols="h,1,1, 1"]
|===
| |https://cloud.google.com/bigquery/docs/batch-loading-data[Load job] |https://cloud.google.com/bigquery/docs/write-api#committed_type[Write API (Commited)]
| |https://cloud.google.com/bigquery/docs/batch-loading-data[Load job] |https://cloud.google.com/bigquery/docs/write-api#committed_type[Write API (Commited)] | https://cloud.google.com/bigquery/docs/write-api#pending_type[Write API (Pending)]

|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported
|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported |
|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported | Supported
|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported | |
|===

`ItemReader` support:
Expand Down
2 changes: 1 addition & 1 deletion spring-batch-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
<version>1.7.0</version>
<version>1.7.1</version>
<executions>
<execution>
<id>flatten</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ public class BigQueryItemWriterException extends ItemWriterException {
public BigQueryItemWriterException(String message, Throwable cause) {
super(message, cause);
}

/**
* Create a new {@link BigQueryItemWriterException} based on a message.
* @param message the message for this {@link Exception}
*/
public BigQueryItemWriterException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
* @see <a href="https://cloud.google.com/bigquery/docs/write-api#committed_type">Commited type storage write API</a>
* @since 0.2.0
*/
public class BigQueryWriteApiJsonItemWriter<T> implements ItemWriter<T>, InitializingBean {
public class BigQueryWriteApiCommitedJsonItemWriter<T> implements ItemWriter<T>, InitializingBean {

/**
* Logger that can be reused
Expand Down Expand Up @@ -112,7 +112,10 @@ public void write(final Chunk<? extends T> chunk) throws Exception {
throw new BigQueryItemWriterException("Error on write happened", e);
} finally {
if (StringUtils.hasText(streamName)) {
bigQueryWriteClient.finalizeWriteStream(streamName);
long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount();
if (chunk.size() != rowCount) {
logger.warn("Finalized response row count=%d is not the same as chunk size=%d".formatted(rowCount, chunk.size()));
}
}

if (!writeFailed && logger.isDebugEnabled()) {
Expand Down Expand Up @@ -164,7 +167,7 @@ public void setMarshaller(final JsonObjectMarshaller<T> marshaller) {
* {@link ApiFutureCallback} that will be called in case of successful of failed response.
*
* @param apiFutureCallback a callback
* @see BigQueryWriteApiJsonItemWriter#setExecutor(Executor)
* @see BigQueryWriteApiCommitedJsonItemWriter#setExecutor(Executor)
*/
public void setApiFutureCallback(final ApiFutureCallback<AppendRowsResponse> apiFutureCallback) {
this.apiFutureCallback = apiFutureCallback;
Expand All @@ -174,7 +177,7 @@ public void setApiFutureCallback(final ApiFutureCallback<AppendRowsResponse> api
* An {@link Executor} that will be calling a {@link ApiFutureCallback}.
*
* @param executor an executor
* @see BigQueryWriteApiJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
* @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
*/
public void setExecutor(final Executor executor) {
this.executor = executor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.writer.writeapi.json;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.json.JsonObjectMarshaller;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;

/**
* JSON writer for BigQuery using Storage Write API.
*
* @param <T> your DTO type
* @author Volodymyr Perebykivskyi
* @see <a href="https://en.wikipedia.org/wiki/JSON">JSON</a>
* @see <a href="https://cloud.google.com/bigquery/docs/write-api#pending_type">Pending type storage write API</a>
* @since 0.2.0
*/
public class BigQueryWriteApiPendingJsonItemWriter<T> implements ItemWriter<T>, InitializingBean {

/**
* Logger that can be reused
*/
private final Log logger = LogFactory.getLog(getClass());

private final AtomicLong bigQueryWriteCounter = new AtomicLong();

private BigQueryWriteClient bigQueryWriteClient;
private TableName tableName;
private JsonObjectMarshaller<T> marshaller;
private ApiFutureCallback<AppendRowsResponse> apiFutureCallback;
private Executor executor;

private boolean writeFailed;

@Override
public void write(final Chunk<? extends T> chunk) throws Exception {
if (!chunk.isEmpty()) {
final List<? extends T> items = chunk.getItems();
String streamName = null;

try {
WriteStream writeStreamToCreate = WriteStream.newBuilder()
.setType(WriteStream.Type.PENDING)
.build();

CreateWriteStreamRequest createStreamRequest = CreateWriteStreamRequest.newBuilder()
.setParent(tableName.toString())
.setWriteStream(writeStreamToCreate)
.build();

WriteStream writeStream = bigQueryWriteClient.createWriteStream(createStreamRequest);
streamName = writeStream.getName();

if (logger.isDebugEnabled()) {
logger.debug("Created a stream=" + streamName);
}

try (final JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), bigQueryWriteClient).build()) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("Mapping %d elements", items.size()));
}
final JSONArray array = new JSONArray();
items.stream().map(marshaller::marshal).map(JSONObject::new).forEach(array::put);

if (logger.isDebugEnabled()) {
logger.debug("Writing data to BigQuery");
}
final ApiFuture<AppendRowsResponse> future = writer.append(array);

if (apiFutureCallback != null) {
ApiFutures.addCallback(future, apiFutureCallback, executor);
}
}
} catch (Exception e) {
writeFailed = true;
logger.error("BigQuery error", e);
throw new BigQueryItemWriterException("Error on write happened", e);
} finally {
if (StringUtils.hasText(streamName)) {
long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount();
if (chunk.size() != rowCount) {
logger.warn("Finalized response row count=%d is not the same as chunk size=%d".formatted(rowCount, chunk.size()));
}

BatchCommitWriteStreamsRequest batchRequest = BatchCommitWriteStreamsRequest.newBuilder()
.setParent(tableName.toString())
.addWriteStreams(streamName)
.build();
BatchCommitWriteStreamsResponse batchResponse = bigQueryWriteClient.batchCommitWriteStreams(batchRequest);

if (!batchResponse.hasCommitTime()) {
writeFailed = true;
logger.error("BigQuery error=" + batchResponse.getStreamErrorsList());
}
}

if (!writeFailed && logger.isDebugEnabled()) {
logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet());
}
}

if (writeFailed) {
throw new BigQueryItemWriterException("Error on write happened");
}
}
}

@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(this.bigQueryWriteClient, "BigQuery write client must be provided");
Assert.notNull(this.tableName, "Table name must be provided");
Assert.notNull(this.marshaller, "Marshaller must be provided");

if (this.apiFutureCallback != null) {
Assert.notNull(this.executor, "Executor must be provided");
}
}

/**
* GRPC client that wraps communication with BigQuery.
*
* @param bigQueryWriteClient a client
*/
public void setBigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) {
this.bigQueryWriteClient = bigQueryWriteClient;
}

/**
* A full path to the BigQuery table.
*
* @param tableName a name
*/
public void setTableName(final TableName tableName) {
this.tableName = tableName;
}

/**
* Converter that transforms a single row into a {@link String}.
*
* @param marshaller your JSON mapper
*/
public void setMarshaller(final JsonObjectMarshaller<T> marshaller) {
this.marshaller = marshaller;
}

/**
* {@link ApiFutureCallback} that will be called in case of successful of failed response.
*
* @param apiFutureCallback a callback
* @see BigQueryWriteApiPendingJsonItemWriter#setExecutor(Executor)
*/
public void setApiFutureCallback(final ApiFutureCallback<AppendRowsResponse> apiFutureCallback) {
this.apiFutureCallback = apiFutureCallback;
}

/**
* An {@link Executor} that will be calling a {@link ApiFutureCallback}.
*
* @param executor an executor
* @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback)
*/
public void setExecutor(final Executor executor) {
this.executor = executor;
}
}
Loading