diff --git a/spring-batch-bigquery/README.adoc b/spring-batch-bigquery/README.adoc index bf8db52..aa9d5a4 100644 --- a/spring-batch-bigquery/README.adoc +++ b/spring-batch-bigquery/README.adoc @@ -4,12 +4,12 @@ Spring Batch extension which contains an `ItemWriter` and `ItemReader` implement `ItemWriter` support: -[cols="h,1,1"] +[cols="h,1,1, 1"] |=== -| |https://cloud.google.com/bigquery/docs/batch-loading-data[Load job] |https://cloud.google.com/bigquery/docs/write-api#committed_type[Write API (Commited)] +| |https://cloud.google.com/bigquery/docs/batch-loading-data[Load job] |https://cloud.google.com/bigquery/docs/write-api#committed_type[Write API (Commited)] | https://cloud.google.com/bigquery/docs/write-api#pending_type[Write API (Pending)] -|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported -|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported | +|https://en.wikipedia.org/wiki/JSON[JSON] |Supported |Supported | Supported +|https://en.wikipedia.org/wiki/Comma-separated_values[CSV] |Supported | | |=== `ItemReader` support: diff --git a/spring-batch-bigquery/pom.xml b/spring-batch-bigquery/pom.xml index 611f82f..95fb210 100644 --- a/spring-batch-bigquery/pom.xml +++ b/spring-batch-bigquery/pom.xml @@ -153,7 +153,7 @@ org.codehaus.mojo flatten-maven-plugin - 1.7.0 + 1.7.1 flatten diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java index bf17348..baa5429 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryItemWriterException.java @@ -35,4 +35,12 @@ public class BigQueryItemWriterException extends ItemWriterException { public BigQueryItemWriterException(String message, Throwable cause) { super(message, cause); } + + /** + * Create a new {@link BigQueryItemWriterException} based on a message. + * @param message the message for this {@link Exception} + */ + public BigQueryItemWriterException(String message) { + super(message); + } } \ No newline at end of file diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriter.java similarity index 92% rename from spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiJsonItemWriter.java rename to spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriter.java index 7311cbd..7c41069 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiJsonItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriter.java @@ -50,7 +50,7 @@ * @see Commited type storage write API * @since 0.2.0 */ -public class BigQueryWriteApiJsonItemWriter implements ItemWriter, InitializingBean { +public class BigQueryWriteApiCommitedJsonItemWriter implements ItemWriter, InitializingBean { /** * Logger that can be reused @@ -112,7 +112,10 @@ public void write(final Chunk chunk) throws Exception { throw new BigQueryItemWriterException("Error on write happened", e); } finally { if (StringUtils.hasText(streamName)) { - bigQueryWriteClient.finalizeWriteStream(streamName); + long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount(); + if (chunk.size() != rowCount) { + logger.warn("Finalized response row count=%d is not the same as chunk size=%d".formatted(rowCount, chunk.size())); + } } if (!writeFailed && logger.isDebugEnabled()) { @@ -164,7 +167,7 @@ public void setMarshaller(final JsonObjectMarshaller marshaller) { * {@link ApiFutureCallback} that will be called in case of successful of failed response. * * @param apiFutureCallback a callback - * @see BigQueryWriteApiJsonItemWriter#setExecutor(Executor) + * @see BigQueryWriteApiCommitedJsonItemWriter#setExecutor(Executor) */ public void setApiFutureCallback(final ApiFutureCallback apiFutureCallback) { this.apiFutureCallback = apiFutureCallback; @@ -174,7 +177,7 @@ public void setApiFutureCallback(final ApiFutureCallback api * An {@link Executor} that will be calling a {@link ApiFutureCallback}. * * @param executor an executor - * @see BigQueryWriteApiJsonItemWriter#setApiFutureCallback(ApiFutureCallback) + * @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback) */ public void setExecutor(final Executor executor) { this.executor = executor; diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriter.java new file mode 100644 index 0000000..b64fc20 --- /dev/null +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriter.java @@ -0,0 +1,202 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.writer.writeapi.json; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.json.JSONArray; +import org.json.JSONObject; +import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.json.JsonObjectMarshaller; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; + +/** + * JSON writer for BigQuery using Storage Write API. + * + * @param your DTO type + * @author Volodymyr Perebykivskyi + * @see JSON + * @see Pending type storage write API + * @since 0.2.0 + */ +public class BigQueryWriteApiPendingJsonItemWriter implements ItemWriter, InitializingBean { + + /** + * Logger that can be reused + */ + private final Log logger = LogFactory.getLog(getClass()); + + private final AtomicLong bigQueryWriteCounter = new AtomicLong(); + + private BigQueryWriteClient bigQueryWriteClient; + private TableName tableName; + private JsonObjectMarshaller marshaller; + private ApiFutureCallback apiFutureCallback; + private Executor executor; + + private boolean writeFailed; + + @Override + public void write(final Chunk chunk) throws Exception { + if (!chunk.isEmpty()) { + final List items = chunk.getItems(); + String streamName = null; + + try { + WriteStream writeStreamToCreate = WriteStream.newBuilder() + .setType(WriteStream.Type.PENDING) + .build(); + + CreateWriteStreamRequest createStreamRequest = CreateWriteStreamRequest.newBuilder() + .setParent(tableName.toString()) + .setWriteStream(writeStreamToCreate) + .build(); + + WriteStream writeStream = bigQueryWriteClient.createWriteStream(createStreamRequest); + streamName = writeStream.getName(); + + if (logger.isDebugEnabled()) { + logger.debug("Created a stream=" + streamName); + } + + try (final JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), bigQueryWriteClient).build()) { + if (logger.isDebugEnabled()) { + logger.debug(String.format("Mapping %d elements", items.size())); + } + final JSONArray array = new JSONArray(); + items.stream().map(marshaller::marshal).map(JSONObject::new).forEach(array::put); + + if (logger.isDebugEnabled()) { + logger.debug("Writing data to BigQuery"); + } + final ApiFuture future = writer.append(array); + + if (apiFutureCallback != null) { + ApiFutures.addCallback(future, apiFutureCallback, executor); + } + } + } catch (Exception e) { + writeFailed = true; + logger.error("BigQuery error", e); + throw new BigQueryItemWriterException("Error on write happened", e); + } finally { + if (StringUtils.hasText(streamName)) { + long rowCount = bigQueryWriteClient.finalizeWriteStream(streamName).getRowCount(); + if (chunk.size() != rowCount) { + logger.warn("Finalized response row count=%d is not the same as chunk size=%d".formatted(rowCount, chunk.size())); + } + + BatchCommitWriteStreamsRequest batchRequest = BatchCommitWriteStreamsRequest.newBuilder() + .setParent(tableName.toString()) + .addWriteStreams(streamName) + .build(); + BatchCommitWriteStreamsResponse batchResponse = bigQueryWriteClient.batchCommitWriteStreams(batchRequest); + + if (!batchResponse.hasCommitTime()) { + writeFailed = true; + logger.error("BigQuery error=" + batchResponse.getStreamErrorsList()); + } + } + + if (!writeFailed && logger.isDebugEnabled()) { + logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet()); + } + } + + if (writeFailed) { + throw new BigQueryItemWriterException("Error on write happened"); + } + } + } + + @Override + public void afterPropertiesSet() throws Exception { + Assert.notNull(this.bigQueryWriteClient, "BigQuery write client must be provided"); + Assert.notNull(this.tableName, "Table name must be provided"); + Assert.notNull(this.marshaller, "Marshaller must be provided"); + + if (this.apiFutureCallback != null) { + Assert.notNull(this.executor, "Executor must be provided"); + } + } + + /** + * GRPC client that wraps communication with BigQuery. + * + * @param bigQueryWriteClient a client + */ + public void setBigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) { + this.bigQueryWriteClient = bigQueryWriteClient; + } + + /** + * A full path to the BigQuery table. + * + * @param tableName a name + */ + public void setTableName(final TableName tableName) { + this.tableName = tableName; + } + + /** + * Converter that transforms a single row into a {@link String}. + * + * @param marshaller your JSON mapper + */ + public void setMarshaller(final JsonObjectMarshaller marshaller) { + this.marshaller = marshaller; + } + + /** + * {@link ApiFutureCallback} that will be called in case of successful of failed response. + * + * @param apiFutureCallback a callback + * @see BigQueryWriteApiPendingJsonItemWriter#setExecutor(Executor) + */ + public void setApiFutureCallback(final ApiFutureCallback apiFutureCallback) { + this.apiFutureCallback = apiFutureCallback; + } + + /** + * An {@link Executor} that will be calling a {@link ApiFutureCallback}. + * + * @param executor an executor + * @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback) + */ + public void setExecutor(final Executor executor) { + this.executor = executor; + } +} diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilder.java new file mode 100644 index 0000000..fa8bd88 --- /dev/null +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilder.java @@ -0,0 +1,130 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.writer.writeapi.json.builder; + +import com.google.api.core.ApiFutureCallback; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.common.util.concurrent.MoreExecutors; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiCommitedJsonItemWriter; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; +import org.springframework.batch.item.json.JsonObjectMarshaller; + +import java.io.IOException; +import java.util.concurrent.Executor; + +/** + * A builder for {@link BigQueryWriteApiCommitedJsonItemWriter}. + * + * @param your DTO type + * @author Volodymyr Perebykivskyi + * @since 0.2.0 + * @see Examples + */ +public class BigQueryWriteApiCommitedJsonItemWriterBuilder { + + private BigQueryWriteClient bigQueryWriteClient; + private TableName tableName; + private JsonObjectMarshaller marshaller; + private ApiFutureCallback apiFutureCallback; + private Executor executor; + + /** + * GRPC client that will be responsible for communication with BigQuery. + * + * @param bigQueryWriteClient a client + * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder} + * @see BigQueryWriteApiCommitedJsonItemWriter#setBigQueryWriteClient(BigQueryWriteClient) + */ + public BigQueryWriteApiCommitedJsonItemWriterBuilder bigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) { + this.bigQueryWriteClient = bigQueryWriteClient; + return this; + } + + /** + * A table name along with a full path. + * + * @param tableName a name + * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder} + * @see BigQueryWriteApiCommitedJsonItemWriter#setTableName(TableName) + */ + public BigQueryWriteApiCommitedJsonItemWriterBuilder tableName(final TableName tableName) { + this.tableName = tableName; + return this; + } + + /** + * Converts your DTO into a {@link String}. + * + * @param marshaller your mapper + * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder} + * @see BigQueryWriteApiCommitedJsonItemWriter#setMarshaller(JsonObjectMarshaller) + */ + public BigQueryWriteApiCommitedJsonItemWriterBuilder marshaller(final JsonObjectMarshaller marshaller) { + this.marshaller = marshaller; + return this; + } + + /** + * A {@link ApiFutureCallback} that will be called on successful or failed event. + * + * @param apiFutureCallback a callback + * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder} + * @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback) + */ + public BigQueryWriteApiCommitedJsonItemWriterBuilder apiFutureCallback(final ApiFutureCallback apiFutureCallback) { + this.apiFutureCallback = apiFutureCallback; + return this; + } + + /** + * {@link Executor} that will be used for {@link ApiFutureCallback}. + * + * @param executor an executor + * @return {@link BigQueryWriteApiCommitedJsonItemWriterBuilder} + * @see BigQueryWriteApiCommitedJsonItemWriter#setExecutor(Executor) + * @see BigQueryWriteApiCommitedJsonItemWriter#setApiFutureCallback(ApiFutureCallback) + */ + public BigQueryWriteApiCommitedJsonItemWriterBuilder executor(final Executor executor) { + this.executor = executor; + return this; + } + + /** + * Please remember about {@link BigQueryWriteApiCommitedJsonItemWriter#afterPropertiesSet()}. + * + * @return {@link BigQueryWriteApiCommitedJsonItemWriter} + * @throws IOException in case when {@link BigQueryWriteClient} failed to be created automatically + */ + public BigQueryWriteApiCommitedJsonItemWriter build() throws IOException { + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); + + writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller); + writer.setBigQueryWriteClient(this.bigQueryWriteClient == null ? BigQueryWriteClient.create() : this.bigQueryWriteClient); + + if (apiFutureCallback != null) { + writer.setApiFutureCallback(apiFutureCallback); + writer.setExecutor(this.executor == null ? MoreExecutors.directExecutor() : this.executor); + } + + writer.setTableName(tableName); + + return writer; + } + +} diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilder.java similarity index 63% rename from spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilder.java rename to spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilder.java index 92caf18..51986b4 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilder.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilder.java @@ -21,7 +21,7 @@ import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.common.util.concurrent.MoreExecutors; -import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiPendingJsonItemWriter; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import org.springframework.batch.item.json.JsonObjectMarshaller; @@ -29,14 +29,14 @@ import java.util.concurrent.Executor; /** - * A builder for {@link BigQueryWriteApiJsonItemWriter}. + * A builder for {@link BigQueryWriteApiPendingJsonItemWriter}. * * @param your DTO type * @author Volodymyr Perebykivskyi * @since 0.2.0 - * @see Examples + * @see Examples */ -public class BigQueryWriteApiJsonItemWriterBuilder { +public class BigQueryWriteApiPendingJsonItemWriterBuilder { private BigQueryWriteClient bigQueryWriteClient; private TableName tableName; @@ -48,10 +48,10 @@ public class BigQueryWriteApiJsonItemWriterBuilder { * GRPC client that will be responsible for communication with BigQuery. * * @param bigQueryWriteClient a client - * @return {@link BigQueryWriteApiJsonItemWriterBuilder} - * @see BigQueryWriteApiJsonItemWriter#setBigQueryWriteClient(BigQueryWriteClient) + * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder} + * @see BigQueryWriteApiPendingJsonItemWriter#setBigQueryWriteClient(BigQueryWriteClient) */ - public BigQueryWriteApiJsonItemWriterBuilder bigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) { + public BigQueryWriteApiPendingJsonItemWriterBuilder bigQueryWriteClient(final BigQueryWriteClient bigQueryWriteClient) { this.bigQueryWriteClient = bigQueryWriteClient; return this; } @@ -60,10 +60,10 @@ public BigQueryWriteApiJsonItemWriterBuilder bigQueryWriteClient(final BigQue * A table name along with a full path. * * @param tableName a name - * @return {@link BigQueryWriteApiJsonItemWriterBuilder} - * @see BigQueryWriteApiJsonItemWriter#setTableName(TableName) + * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder} + * @see BigQueryWriteApiPendingJsonItemWriter#setTableName(TableName) */ - public BigQueryWriteApiJsonItemWriterBuilder tableName(final TableName tableName) { + public BigQueryWriteApiPendingJsonItemWriterBuilder tableName(final TableName tableName) { this.tableName = tableName; return this; } @@ -72,10 +72,10 @@ public BigQueryWriteApiJsonItemWriterBuilder tableName(final TableName tableN * Converts your DTO into a {@link String}. * * @param marshaller your mapper - * @return {@link BigQueryWriteApiJsonItemWriterBuilder} - * @see BigQueryWriteApiJsonItemWriter#setMarshaller(JsonObjectMarshaller) + * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder} + * @see BigQueryWriteApiPendingJsonItemWriter#setMarshaller(JsonObjectMarshaller) */ - public BigQueryWriteApiJsonItemWriterBuilder marshaller(final JsonObjectMarshaller marshaller) { + public BigQueryWriteApiPendingJsonItemWriterBuilder marshaller(final JsonObjectMarshaller marshaller) { this.marshaller = marshaller; return this; } @@ -84,10 +84,10 @@ public BigQueryWriteApiJsonItemWriterBuilder marshaller(final JsonObjectMarsh * A {@link ApiFutureCallback} that will be called on successful or failed event. * * @param apiFutureCallback a callback - * @return {@link BigQueryWriteApiJsonItemWriterBuilder} - * @see BigQueryWriteApiJsonItemWriter#setApiFutureCallback(ApiFutureCallback) + * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder} + * @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback) */ - public BigQueryWriteApiJsonItemWriterBuilder apiFutureCallback(final ApiFutureCallback apiFutureCallback) { + public BigQueryWriteApiPendingJsonItemWriterBuilder apiFutureCallback(final ApiFutureCallback apiFutureCallback) { this.apiFutureCallback = apiFutureCallback; return this; } @@ -96,23 +96,23 @@ public BigQueryWriteApiJsonItemWriterBuilder apiFutureCallback(final ApiFutur * {@link Executor} that will be used for {@link ApiFutureCallback}. * * @param executor an executor - * @return {@link BigQueryWriteApiJsonItemWriterBuilder} - * @see BigQueryWriteApiJsonItemWriter#setExecutor(Executor) - * @see BigQueryWriteApiJsonItemWriter#setApiFutureCallback(ApiFutureCallback) + * @return {@link BigQueryWriteApiPendingJsonItemWriterBuilder} + * @see BigQueryWriteApiPendingJsonItemWriter#setExecutor(Executor) + * @see BigQueryWriteApiPendingJsonItemWriter#setApiFutureCallback(ApiFutureCallback) */ - public BigQueryWriteApiJsonItemWriterBuilder executor(final Executor executor) { + public BigQueryWriteApiPendingJsonItemWriterBuilder executor(final Executor executor) { this.executor = executor; return this; } /** - * Please remember about {@link BigQueryWriteApiJsonItemWriter#afterPropertiesSet()}. + * Please remember about {@link BigQueryWriteApiPendingJsonItemWriter#afterPropertiesSet()}. * - * @return {@link BigQueryWriteApiJsonItemWriter} + * @return {@link BigQueryWriteApiPendingJsonItemWriter} * @throws IOException in case when {@link BigQueryWriteClient} failed to be created automatically */ - public BigQueryWriteApiJsonItemWriter build() throws IOException { - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + public BigQueryWriteApiPendingJsonItemWriter build() throws IOException { + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); writer.setMarshaller(this.marshaller == null ? new JacksonJsonObjectMarshaller<>() : this.marshaller); writer.setBigQueryWriteClient(this.bigQueryWriteClient == null ? BigQueryWriteClient.create() : this.bigQueryWriteClient); diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java index e48e6ad..d81b6e9 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/writeapi/package-info.java @@ -20,6 +20,7 @@ * Supported types: *
    *
  • {@link com.google.cloud.bigquery.storage.v1.WriteStream.Type#COMMITTED}
  • + *
  • {@link com.google.cloud.bigquery.storage.v1.WriteStream.Type#PENDING}
  • *
* * Supported formats: diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiCommitedJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiCommitedJsonItemWriterTest.java new file mode 100644 index 0000000..ad069d7 --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiCommitedJsonItemWriterTest.java @@ -0,0 +1,56 @@ +package org.springframework.batch.extensions.bigquery.emulator.writer.writeapi.json; + +import com.google.api.core.ApiFutureCallback; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.TableName; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.batch.extensions.bigquery.common.NameUtils; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.ResultVerifier; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.emulator.writer.base.EmulatorBaseItemWriterTest; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiCommitedJsonItemWriter; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +class EmulatorBigQueryWriteApiCommitedJsonItemWriterTest extends EmulatorBaseItemWriterTest { + + @Test + void testWrite() throws Exception { + AtomicBoolean consumerCalled = new AtomicBoolean(); + TableId tableId = TableId.of(TestConstants.PROJECT, TestConstants.DATASET, NameUtils.generateTableName(TestConstants.JSON)); + TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema()); + bigQuery.create(TableInfo.of(tableId, tableDefinition)); + + Chunk expected = TestConstants.CHUNK; + + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); + writer.setBigQueryWriteClient(bigQueryWriteClient); + writer.setTableName(TableName.of(tableId.getProject(), tableId.getDataset(), tableId.getTable())); + writer.setMarshaller(new JacksonJsonObjectMarshaller<>()); + writer.setApiFutureCallback(new ApiFutureCallback<>() { + @Override + public void onFailure(Throwable t) {} + + @Override + public void onSuccess(AppendRowsResponse result) { + consumerCalled.set(true); + } + }); + writer.setExecutor(Executors.newSingleThreadExecutor()); + + writer.write(expected); + + ResultVerifier.verifyTableResult(expected, bigQuery.listTableData(tableId)); + Assertions.assertTrue(consumerCalled.get()); + } + +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiPendingJsonItemWriterTest.java similarity index 90% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiJsonItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiPendingJsonItemWriterTest.java index c3f42c2..75aef26 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/writeapi/json/EmulatorBigQueryWriteApiPendingJsonItemWriterTest.java @@ -14,14 +14,14 @@ import org.springframework.batch.extensions.bigquery.common.ResultVerifier; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.emulator.writer.base.EmulatorBaseItemWriterTest; -import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiPendingJsonItemWriter; import org.springframework.batch.item.Chunk; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -class EmulatorBigQueryWriteApiJsonItemWriterTest extends EmulatorBaseItemWriterTest { +class EmulatorBigQueryWriteApiPendingJsonItemWriterTest extends EmulatorBaseItemWriterTest { @Test void testWrite() throws Exception { @@ -32,7 +32,7 @@ void testWrite() throws Exception { Chunk expected = TestConstants.CHUNK; - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); writer.setBigQueryWriteClient(bigQueryWriteClient); writer.setTableName(TableName.of(tableId.getProject(), tableId.getDataset(), tableId.getTable())); writer.setMarshaller(new JacksonJsonObjectMarshaller<>()); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiCommitedJsonItemWriterTest.java similarity index 72% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiJsonItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiCommitedJsonItemWriterTest.java index 509de2f..db35366 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/gcloud/writer/writeapi/json/GcloudBigQueryWriteApiCommitedJsonItemWriterTest.java @@ -3,7 +3,7 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.extensions.bigquery.gcloud.writer.GcloudBaseBigQueryItemWriterTest; -public class GcloudBigQueryWriteApiJsonItemWriterTest extends GcloudBaseBigQueryItemWriterTest { +public class GcloudBigQueryWriteApiCommitedJsonItemWriterTest extends GcloudBaseBigQueryItemWriterTest { @Test void testWrite() { diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriterTest.java similarity index 76% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiJsonItemWriterTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriterTest.java index 659316f..bd28155 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiJsonItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiCommitedJsonItemWriterTest.java @@ -6,6 +6,7 @@ import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.WriteStream; @@ -16,7 +17,7 @@ import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException; -import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiCommitedJsonItemWriter; import org.springframework.batch.item.Chunk; import org.springframework.batch.item.json.GsonJsonObjectMarshaller; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; @@ -26,14 +27,14 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; -class BigQueryWriteApiJsonItemWriterTest { +class BigQueryWriteApiCommitedJsonItemWriterTest { private static final TableName TABLE_NAME = TableName.of(TestConstants.PROJECT, TestConstants.DATASET, TestConstants.JSON); @Test void testWrite_Empty() throws Exception { BigQueryWriteClient writeClient = Mockito.mock(BigQueryWriteClient.class); - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); writer.setBigQueryWriteClient(writeClient); writer.write(Chunk.of()); @@ -44,7 +45,7 @@ void testWrite_Empty() throws Exception { @Test void testWrite_Exception() { BigQueryItemWriterException ex = Assertions.assertThrows( - BigQueryItemWriterException.class, () -> new BigQueryWriteApiJsonItemWriter<>().write(TestConstants.CHUNK) + BigQueryItemWriterException.class, () -> new BigQueryWriteApiCommitedJsonItemWriter<>().write(TestConstants.CHUNK) ); Assertions.assertEquals("Error on write happened", ex.getMessage()); } @@ -61,8 +62,9 @@ void testWrite() throws Exception { Mockito.when(writeClient.createWriteStream(streamRequest)).thenReturn(generatedWriteStream); Mockito.when(writeClient.getWriteStream(Mockito.any(GetWriteStreamRequest.class))).thenReturn(generatedWriteStream); Mockito.when(writeClient.getSettings()).thenReturn(BigQueryWriteSettings.newBuilder().setCredentialsProvider(NoCredentialsProvider.create()).build()); + Mockito.when(writeClient.finalizeWriteStream(streamName.toString())).thenReturn(FinalizeWriteStreamResponse.newBuilder().build()); - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); writer.setTableName(TABLE_NAME); writer.setBigQueryWriteClient(writeClient); writer.setMarshaller(new JacksonJsonObjectMarshaller<>()); @@ -75,7 +77,7 @@ void testWrite() throws Exception { @Test void testAfterPropertiesSet() { - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); // bigQueryWriteClient IllegalArgumentException ex = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); @@ -104,69 +106,69 @@ void testAfterPropertiesSet() { @Test void testSetBigQueryWriteClient() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); BigQueryWriteClient expected = Mockito.mock(BigQueryWriteClient.class); writer.setBigQueryWriteClient(expected); BigQueryWriteClient actual = (BigQueryWriteClient) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "bigQueryWriteClient", BigQueryWriteClient.class) + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "bigQueryWriteClient", BigQueryWriteClient.class) .get(writer); Assertions.assertEquals(expected, actual); } @Test void testSetTableName() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); writer.setTableName(TABLE_NAME); TableName actual = (TableName) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "tableName", TableName.class) + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "tableName", TableName.class) .get(writer); Assertions.assertEquals(TABLE_NAME, actual); } @Test void testSetMarshaller() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); JsonObjectMarshaller expected = new JacksonJsonObjectMarshaller<>(); writer.setMarshaller(expected); JsonObjectMarshaller actual = (JsonObjectMarshaller) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) .get(writer); Assertions.assertEquals(expected, actual); } @Test void testSetApiFutureCallback() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); ApiFutureCallback expected = new TestCallback(); writer.setApiFutureCallback(expected); ApiFutureCallback actual = (ApiFutureCallback) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "apiFutureCallback", ApiFutureCallback .class) + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "apiFutureCallback", ApiFutureCallback .class) .get(writer); Assertions.assertEquals(expected, actual); } @Test void testSetExecutor() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriter<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriter<>(); Executor expected = Executors.newSingleThreadExecutor(); writer.setExecutor(expected); Executor actual = (Executor) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "executor", Executor.class) + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "executor", Executor.class) .get(writer); Assertions.assertEquals(expected, actual); } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriterTest.java new file mode 100644 index 0000000..a7eabec --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/BigQueryWriteApiPendingJsonItemWriterTest.java @@ -0,0 +1,182 @@ +package org.springframework.batch.extensions.bigquery.unit.writer.writeapi.json; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.cloud.bigquery.storage.v1.*; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiPendingJsonItemWriter; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.json.GsonJsonObjectMarshaller; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; +import org.springframework.batch.item.json.JsonObjectMarshaller; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +class BigQueryWriteApiPendingJsonItemWriterTest { + + private static final TableName TABLE_NAME = TableName.of(TestConstants.PROJECT, TestConstants.DATASET, TestConstants.JSON); + + @Test + void testWrite_Empty() throws Exception { + BigQueryWriteClient writeClient = Mockito.mock(BigQueryWriteClient.class); + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); + writer.setBigQueryWriteClient(writeClient); + + writer.write(Chunk.of()); + + Mockito.verifyNoInteractions(writeClient); + } + + @Test + void testWrite_Exception() { + BigQueryItemWriterException ex = Assertions.assertThrows( + BigQueryItemWriterException.class, () -> new BigQueryWriteApiPendingJsonItemWriter<>().write(TestConstants.CHUNK) + ); + Assertions.assertEquals("Error on write happened", ex.getMessage()); + } + + @Test + void testWrite() throws Exception { + WriteStreamName streamName = WriteStreamName.of(TABLE_NAME.getProject(), TABLE_NAME.getDataset(), TABLE_NAME.getTable(), "test-stream-1"); + + WriteStream writeStream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build(); + CreateWriteStreamRequest streamRequest = CreateWriteStreamRequest.newBuilder().setParent(TABLE_NAME.toString()).setWriteStream(writeStream).build(); + + BigQueryWriteClient writeClient = Mockito.mock(BigQueryWriteClient.class); + WriteStream generatedWriteStream = WriteStream.newBuilder().setName(streamName.toString()).setTableSchema(PersonDto.getWriteApiSchema()).build(); + Mockito.when(writeClient.createWriteStream(streamRequest)).thenReturn(generatedWriteStream); + Mockito.when(writeClient.getWriteStream(Mockito.any(GetWriteStreamRequest.class))).thenReturn(generatedWriteStream); + Mockito.when(writeClient.getSettings()).thenReturn(BigQueryWriteSettings.newBuilder().setCredentialsProvider(NoCredentialsProvider.create()).build()); + Mockito.when(writeClient.finalizeWriteStream(streamName.toString())).thenReturn(FinalizeWriteStreamResponse.newBuilder().build()); + + BatchCommitWriteStreamsResponse batchResponse = Mockito.mock(BatchCommitWriteStreamsResponse.class); + Mockito.when(batchResponse.hasCommitTime()).thenReturn(true); + + Mockito + .when(writeClient.batchCommitWriteStreams(Mockito.any(BatchCommitWriteStreamsRequest.class))) + .thenReturn(batchResponse); + + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); + writer.setTableName(TABLE_NAME); + writer.setBigQueryWriteClient(writeClient); + writer.setMarshaller(new JacksonJsonObjectMarshaller<>()); + + writer.write(TestConstants.CHUNK); + + Mockito.verify(writeClient).createWriteStream(streamRequest); + Mockito.verify(writeClient).finalizeWriteStream(streamName.toString()); + } + + @Test + void testAfterPropertiesSet() { + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); + + // bigQueryWriteClient + IllegalArgumentException ex = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("BigQuery write client must be provided", ex.getMessage()); + + // tableName + writer.setBigQueryWriteClient(Mockito.mock(BigQueryWriteClient.class)); + ex = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Table name must be provided", ex.getMessage()); + + // marshaller + writer.setTableName(TABLE_NAME); + ex = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Marshaller must be provided", ex.getMessage()); + + // executor + writer.setApiFutureCallback(new TestCallback()); + writer.setMarshaller(new GsonJsonObjectMarshaller<>()); + ex = Assertions.assertThrows(IllegalArgumentException.class, writer::afterPropertiesSet); + Assertions.assertEquals("Executor must be provided", ex.getMessage()); + + // All good + writer.setExecutor(Executors.newSingleThreadExecutor()); + Assertions.assertDoesNotThrow(writer::afterPropertiesSet); + } + + @Test + void testSetBigQueryWriteClient() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); + BigQueryWriteClient expected = Mockito.mock(BigQueryWriteClient.class); + + writer.setBigQueryWriteClient(expected); + + BigQueryWriteClient actual = (BigQueryWriteClient) handle + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "bigQueryWriteClient", BigQueryWriteClient.class) + .get(writer); + Assertions.assertEquals(expected, actual); + } + + @Test + void testSetTableName() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); + + writer.setTableName(TABLE_NAME); + + TableName actual = (TableName) handle + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "tableName", TableName.class) + .get(writer); + Assertions.assertEquals(TABLE_NAME, actual); + } + + @Test + void testSetMarshaller() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); + JsonObjectMarshaller expected = new JacksonJsonObjectMarshaller<>(); + + writer.setMarshaller(expected); + + JsonObjectMarshaller actual = (JsonObjectMarshaller) handle + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) + .get(writer); + Assertions.assertEquals(expected, actual); + } + + @Test + void testSetApiFutureCallback() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); + ApiFutureCallback expected = new TestCallback(); + + writer.setApiFutureCallback(expected); + + ApiFutureCallback actual = (ApiFutureCallback) handle + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "apiFutureCallback", ApiFutureCallback .class) + .get(writer); + Assertions.assertEquals(expected, actual); + } + + @Test + void testSetExecutor() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriter.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriter<>(); + Executor expected = Executors.newSingleThreadExecutor(); + + writer.setExecutor(expected); + + Executor actual = (Executor) handle + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "executor", Executor.class) + .get(writer); + Assertions.assertEquals(expected, actual); + } + + private static final class TestCallback implements ApiFutureCallback { + @Override + public void onFailure(Throwable t) {} + + @Override + public void onSuccess(AppendRowsResponse result) {} + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilderTest.java new file mode 100644 index 0000000..82b6f9a --- /dev/null +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiCommitedJsonItemWriterBuilderTest.java @@ -0,0 +1,172 @@ +/* + * Copyright 2002-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.extensions.bigquery.unit.writer.writeapi.json.builder; + +import com.google.api.core.ApiFutureCallback; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.TableName; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.springframework.batch.extensions.bigquery.common.PersonDto; +import org.springframework.batch.extensions.bigquery.common.TestConstants; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiCommitedJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.builder.BigQueryWriteApiCommitedJsonItemWriterBuilder; +import org.springframework.batch.item.json.GsonJsonObjectMarshaller; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; +import org.springframework.batch.item.json.JsonObjectMarshaller; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +class BigQueryWriteApiCommitedJsonItemWriterBuilderTest { + + @Test + void testBigQueryWriteClient() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriterBuilder builder = new BigQueryWriteApiCommitedJsonItemWriterBuilder<>(); + BigQueryWriteClient expected = Mockito.mock(BigQueryWriteClient.class); + + builder.bigQueryWriteClient(expected); + + BigQueryWriteClient actual = (BigQueryWriteClient) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, "bigQueryWriteClient", BigQueryWriteClient.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testTableName() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriterBuilder builder = new BigQueryWriteApiCommitedJsonItemWriterBuilder<>(); + TableName expected = TableName.of(TestConstants.PROJECT, TestConstants.DATASET, TestConstants.JSON); + + builder.tableName(expected); + + TableName actual = (TableName) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, "tableName", TableName.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testMarshaller() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriterBuilder builder = new BigQueryWriteApiCommitedJsonItemWriterBuilder<>(); + JsonObjectMarshaller expected = new GsonJsonObjectMarshaller<>(); + + builder.marshaller(expected); + + JsonObjectMarshaller actual = (JsonObjectMarshaller) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, "marshaller", JsonObjectMarshaller.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testApiFutureCallback() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriterBuilder builder = new BigQueryWriteApiCommitedJsonItemWriterBuilder<>(); + + ApiFutureCallback expected = new ApiFutureCallback<>() { + @Override + public void onFailure(Throwable t) {} + + @Override + public void onSuccess(AppendRowsResponse result) {} + }; + + builder.apiFutureCallback(expected); + + ApiFutureCallback actual = (ApiFutureCallback) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, "apiFutureCallback", ApiFutureCallback.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testExecutor() throws IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiCommitedJsonItemWriterBuilder builder = new BigQueryWriteApiCommitedJsonItemWriterBuilder<>(); + Executor expected = Executors.newSingleThreadExecutor(); + + builder.executor(expected); + + Executor actual = (Executor) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriterBuilder.class, "executor", Executor.class) + .get(builder); + Assertions.assertEquals(expected, actual); + } + + @Test + void testBuild() throws IOException, IllegalAccessException, NoSuchFieldException { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiCommitedJsonItemWriter.class, MethodHandles.lookup()); + JsonObjectMarshaller expectedMarshaller = new JacksonJsonObjectMarshaller<>(); + BigQueryWriteClient expectedWriteClient = Mockito.mock(BigQueryWriteClient.class); + Executor expectedExecutor = Executors.newCachedThreadPool(); + TableName expectedTableName = TableName.of(TestConstants.PROJECT, TestConstants.DATASET, TestConstants.JSON); + + ApiFutureCallback expectedCallback = new ApiFutureCallback<>() { + @Override + public void onFailure(Throwable t) { + } + + @Override + public void onSuccess(AppendRowsResponse result) { + } + }; + + BigQueryWriteApiCommitedJsonItemWriter writer = new BigQueryWriteApiCommitedJsonItemWriterBuilder() + .marshaller(expectedMarshaller) + .bigQueryWriteClient(expectedWriteClient) + .apiFutureCallback(expectedCallback) + .executor(expectedExecutor) + .tableName(expectedTableName) + .build(); + + Assertions.assertNotNull(writer); + + JsonObjectMarshaller actualMarshaller = (JsonObjectMarshaller) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) + .get(writer); + + BigQueryWriteClient actualWriteClient = (BigQueryWriteClient) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "bigQueryWriteClient", BigQueryWriteClient.class) + .get(writer); + + ApiFutureCallback actualCallback = (ApiFutureCallback) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "apiFutureCallback", ApiFutureCallback.class) + .get(writer); + + Executor actualExecutor = (Executor) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "executor", Executor.class) + .get(writer); + + TableName actualTableName = (TableName) handle + .findVarHandle(BigQueryWriteApiCommitedJsonItemWriter.class, "tableName", TableName.class) + .get(writer); + + Assertions.assertEquals(expectedMarshaller, actualMarshaller); + Assertions.assertEquals(expectedWriteClient, actualWriteClient); + Assertions.assertEquals(expectedCallback, actualCallback); + Assertions.assertEquals(expectedExecutor, actualExecutor); + Assertions.assertEquals(expectedTableName, actualTableName); + } +} diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilderTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilderTest.java similarity index 70% rename from spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilderTest.java rename to spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilderTest.java index f3d04ee..0e67871 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiJsonItemWriterBuilderTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/writeapi/json/builder/BigQueryWriteApiPendingJsonItemWriterBuilderTest.java @@ -25,8 +25,8 @@ import org.mockito.Mockito; import org.springframework.batch.extensions.bigquery.common.PersonDto; import org.springframework.batch.extensions.bigquery.common.TestConstants; -import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiJsonItemWriter; -import org.springframework.batch.extensions.bigquery.writer.writeapi.json.builder.BigQueryWriteApiJsonItemWriterBuilder; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.BigQueryWriteApiPendingJsonItemWriter; +import org.springframework.batch.extensions.bigquery.writer.writeapi.json.builder.BigQueryWriteApiPendingJsonItemWriterBuilder; import org.springframework.batch.item.json.GsonJsonObjectMarshaller; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; import org.springframework.batch.item.json.JsonObjectMarshaller; @@ -36,54 +36,54 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; -class BigQueryWriteApiJsonItemWriterBuilderTest { +class BigQueryWriteApiPendingJsonItemWriterBuilderTest { @Test void testBigQueryWriteClient() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriterBuilder builder = new BigQueryWriteApiPendingJsonItemWriterBuilder<>(); BigQueryWriteClient expected = Mockito.mock(BigQueryWriteClient.class); builder.bigQueryWriteClient(expected); BigQueryWriteClient actual = (BigQueryWriteClient) handle - .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "bigQueryWriteClient", BigQueryWriteClient.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriterBuilder.class, "bigQueryWriteClient", BigQueryWriteClient.class) .get(builder); Assertions.assertEquals(expected, actual); } @Test void testTableName() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriterBuilder builder = new BigQueryWriteApiPendingJsonItemWriterBuilder<>(); TableName expected = TableName.of(TestConstants.PROJECT, TestConstants.DATASET, TestConstants.JSON); builder.tableName(expected); TableName actual = (TableName) handle - .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "tableName", TableName.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriterBuilder.class, "tableName", TableName.class) .get(builder); Assertions.assertEquals(expected, actual); } @Test void testMarshaller() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriterBuilder builder = new BigQueryWriteApiPendingJsonItemWriterBuilder<>(); JsonObjectMarshaller expected = new GsonJsonObjectMarshaller<>(); builder.marshaller(expected); JsonObjectMarshaller actual = (JsonObjectMarshaller) handle - .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "marshaller", JsonObjectMarshaller.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriterBuilder.class, "marshaller", JsonObjectMarshaller.class) .get(builder); Assertions.assertEquals(expected, actual); } @Test void testApiFutureCallback() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriterBuilder builder = new BigQueryWriteApiPendingJsonItemWriterBuilder<>(); ApiFutureCallback expected = new ApiFutureCallback<>() { @Override @@ -96,28 +96,28 @@ public void onSuccess(AppendRowsResponse result) {} builder.apiFutureCallback(expected); ApiFutureCallback actual = (ApiFutureCallback) handle - .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "apiFutureCallback", ApiFutureCallback.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriterBuilder.class, "apiFutureCallback", ApiFutureCallback.class) .get(builder); Assertions.assertEquals(expected, actual); } @Test void testExecutor() throws IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriterBuilder.class, MethodHandles.lookup()); - BigQueryWriteApiJsonItemWriterBuilder builder = new BigQueryWriteApiJsonItemWriterBuilder<>(); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriterBuilder.class, MethodHandles.lookup()); + BigQueryWriteApiPendingJsonItemWriterBuilder builder = new BigQueryWriteApiPendingJsonItemWriterBuilder<>(); Executor expected = Executors.newSingleThreadExecutor(); builder.executor(expected); Executor actual = (Executor) handle - .findVarHandle(BigQueryWriteApiJsonItemWriterBuilder.class, "executor", Executor.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriterBuilder.class, "executor", Executor.class) .get(builder); Assertions.assertEquals(expected, actual); } @Test void testBuild() throws IOException, IllegalAccessException, NoSuchFieldException { - MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiJsonItemWriter.class, MethodHandles.lookup()); + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryWriteApiPendingJsonItemWriter.class, MethodHandles.lookup()); JsonObjectMarshaller expectedMarshaller = new JacksonJsonObjectMarshaller<>(); BigQueryWriteClient expectedWriteClient = Mockito.mock(BigQueryWriteClient.class); Executor expectedExecutor = Executors.newCachedThreadPool(); @@ -133,7 +133,7 @@ public void onSuccess(AppendRowsResponse result) { } }; - BigQueryWriteApiJsonItemWriter writer = new BigQueryWriteApiJsonItemWriterBuilder() + BigQueryWriteApiPendingJsonItemWriter writer = new BigQueryWriteApiPendingJsonItemWriterBuilder() .marshaller(expectedMarshaller) .bigQueryWriteClient(expectedWriteClient) .apiFutureCallback(expectedCallback) @@ -144,23 +144,23 @@ public void onSuccess(AppendRowsResponse result) { Assertions.assertNotNull(writer); JsonObjectMarshaller actualMarshaller = (JsonObjectMarshaller) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class) .get(writer); BigQueryWriteClient actualWriteClient = (BigQueryWriteClient) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "bigQueryWriteClient", BigQueryWriteClient.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "bigQueryWriteClient", BigQueryWriteClient.class) .get(writer); ApiFutureCallback actualCallback = (ApiFutureCallback) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "apiFutureCallback", ApiFutureCallback.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "apiFutureCallback", ApiFutureCallback.class) .get(writer); Executor actualExecutor = (Executor) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "executor", Executor.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "executor", Executor.class) .get(writer); TableName actualTableName = (TableName) handle - .findVarHandle(BigQueryWriteApiJsonItemWriter.class, "tableName", TableName.class) + .findVarHandle(BigQueryWriteApiPendingJsonItemWriter.class, "tableName", TableName.class) .get(writer); Assertions.assertEquals(expectedMarshaller, actualMarshaller);