diff --git a/spring-batch-bigquery/pom.xml b/spring-batch-bigquery/pom.xml
index fe71c08..4e7fc75 100644
--- a/spring-batch-bigquery/pom.xml
+++ b/spring-batch-bigquery/pom.xml
@@ -100,7 +100,7 @@
org.wiremock
wiremock-standalone
- 3.13.0
+ 3.13.1
test
@@ -108,7 +108,6 @@
jul-to-slf4j
test
-
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java
index 44bc914..e5c068c 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java
@@ -16,12 +16,9 @@
package org.springframework.batch.extensions.bigquery.writer;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Table;
-import org.springframework.core.convert.converter.Converter;
+import org.springframework.batch.item.json.JsonObjectMarshaller;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
@@ -42,41 +39,27 @@ public class BigQueryJsonItemWriter extends BigQueryBaseItemWriter {
private static final String LF = "\n";
- private Converter rowMapper;
- private ObjectWriter objectWriter;
- private Class> itemClass;
+ private JsonObjectMarshaller marshaller;
@Override
protected void doInitializeProperties(List extends T> items) {
- if (this.itemClass == null) {
- T firstItem = items.stream().findFirst().orElseThrow(() -> {
- logger.warn("Class type was not found");
- return new IllegalStateException("Class type was not found");
- });
- this.itemClass = firstItem.getClass();
-
- if (this.rowMapper == null) {
- this.objectWriter = new ObjectMapper().writerFor(this.itemClass);
- }
-
- logger.debug("Writer setup is completed");
- }
+ // Unused
}
/**
* Converter that transforms a single row into a {@link String}.
*
- * @param rowMapper your JSON row mapper
+ * @param marshaller your JSON mapper
*/
- public void setRowMapper(Converter rowMapper) {
- this.rowMapper = rowMapper;
+ public void setMarshaller(JsonObjectMarshaller marshaller) {
+ this.marshaller = marshaller;
}
@Override
protected List convertObjectsToByteArrays(List extends T> items) {
return items
.stream()
- .map(this::mapItemToJson)
+ .map(marshaller::marshal)
.filter(Predicate.not(ObjectUtils::isEmpty))
.map(this::convertToNdJson)
.map(row -> row.getBytes(StandardCharsets.UTF_8))
@@ -85,6 +68,8 @@ protected List convertObjectsToByteArrays(List extends T> items) {
@Override
protected void performFormatSpecificChecks() {
+ Assert.notNull(this.marshaller, "Marshaller is mandatory");
+
Table table = getTable();
if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) {
@@ -106,16 +91,6 @@ protected void performFormatSpecificChecks() {
Assert.isTrue(Objects.equals(format, super.writeChannelConfig.getFormat()), "Only %s format is allowed".formatted(format));
}
- private String mapItemToJson(T t) {
- try {
- return rowMapper == null ? objectWriter.writeValueAsString(t) : rowMapper.convert(t);
- }
- catch (JsonProcessingException e) {
- logger.error("Error during processing of the line: ", e);
- return null;
- }
- }
-
/**
* BigQuery uses ndjson.
* It is expected that to pass here JSON line generated by
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java
index 911f0c8..9814a00 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/builder/BigQueryJsonItemWriterBuilder.java
@@ -21,7 +21,7 @@
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
-import org.springframework.core.convert.converter.Converter;
+import org.springframework.batch.item.json.JsonObjectMarshaller;
import java.util.function.Consumer;
@@ -35,8 +35,7 @@
*/
public class BigQueryJsonItemWriterBuilder {
- private Converter rowMapper;
-
+ private JsonObjectMarshaller marshaller;
private Consumer jobConsumer;
private DatasetInfo datasetInfo;
private WriteChannelConfiguration writeChannelConfig;
@@ -45,12 +44,12 @@ public class BigQueryJsonItemWriterBuilder {
/**
* Converts your DTO into a {@link String}.
*
- * @param rowMapper your mapping
+ * @param marshaller your mapper
* @return {@link BigQueryJsonItemWriter}
- * @see BigQueryJsonItemWriter#setRowMapper(Converter)
+ * @see BigQueryJsonItemWriter#setMarshaller(JsonObjectMarshaller)
*/
- public BigQueryJsonItemWriterBuilder rowMapper(Converter rowMapper) {
- this.rowMapper = rowMapper;
+ public BigQueryJsonItemWriterBuilder marshaller(JsonObjectMarshaller marshaller) {
+ this.marshaller = marshaller;
return this;
}
@@ -110,7 +109,7 @@ public BigQueryJsonItemWriterBuilder bigQuery(BigQuery bigQuery) {
public BigQueryJsonItemWriter build() {
BigQueryJsonItemWriter writer = new BigQueryJsonItemWriter<>();
- writer.setRowMapper(this.rowMapper);
+ writer.setMarshaller(this.marshaller);
writer.setWriteChannelConfig(this.writeChannelConfig);
writer.setJobConsumer(this.jobConsumer);
writer.setBigQuery(this.bigQuery);
diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java
index 3c48ff4..0e6d7ba 100644
--- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java
+++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/emulator/writer/BigQueryEmulatorJsonItemWriterTest.java
@@ -31,6 +31,7 @@
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder;
import org.springframework.batch.item.Chunk;
+import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import java.util.stream.Stream;
@@ -52,6 +53,7 @@ void testWrite(String table, boolean autodetect) throws Exception {
BigQueryJsonItemWriter writer = new BigQueryJsonItemWriterBuilder()
.bigQuery(bigQuery)
.writeChannelConfig(channelConfig)
+ .marshaller(new JacksonJsonObjectMarshaller<>())
.build();
writer.afterPropertiesSet();
diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java
index a4c86de..c19f9fe 100644
--- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java
+++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryJsonItemWriterTest.java
@@ -16,8 +16,6 @@
package org.springframework.batch.extensions.bigquery.unit.writer;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FormatOptions;
@@ -36,7 +34,9 @@
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest;
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
-import org.springframework.core.convert.converter.Converter;
+import org.springframework.batch.item.json.GsonJsonObjectMarshaller;
+import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
+import org.springframework.batch.item.json.JsonObjectMarshaller;
import java.lang.invoke.MethodHandles;
import java.util.List;
@@ -47,34 +47,15 @@ class BigQueryJsonItemWriterTest extends AbstractBigQueryTest {
private static final TableId TABLE_ID = TableId.of("1", "2");
@Test
- void testDoInitializeProperties() throws IllegalAccessException, NoSuchFieldException {
- TestWriter writer = new TestWriter();
- List items = TestConstants.CHUNK.getItems();
- MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup());
-
- // Exception
- Assertions.assertThrows(IllegalStateException.class, () -> writer.testInitializeProperties(List.of()));
-
- // No exception
- writer.testInitializeProperties(items);
- Assertions.assertEquals(
- PersonDto.class.getSimpleName(),
- ((Class) handle.findVarHandle(BigQueryJsonItemWriter.class, "itemClass", Class.class).get(writer)).getSimpleName()
- );
- ObjectWriter objectWriter = (ObjectWriter) handle.findVarHandle(BigQueryJsonItemWriter.class, "objectWriter", ObjectWriter.class).get(writer);
- Assertions.assertInstanceOf(JsonFactory.class, objectWriter.getFactory());
- }
-
- @Test
- void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException {
+ void testSetMarshaller() throws IllegalAccessException, NoSuchFieldException {
BigQueryJsonItemWriter reader = new BigQueryJsonItemWriter<>();
MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup());
- Converter expected = source -> null;
+ JsonObjectMarshaller expected = new JacksonJsonObjectMarshaller<>();
- reader.setRowMapper(expected);
+ reader.setMarshaller(expected);
- Converter actual = (Converter) handle
- .findVarHandle(BigQueryJsonItemWriter.class, "rowMapper", Converter.class)
+ JsonObjectMarshaller actual = (JsonObjectMarshaller) handle
+ .findVarHandle(BigQueryJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class)
.get(reader);
Assertions.assertEquals(expected, actual);
@@ -83,14 +64,23 @@ void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException {
@Test
void testConvertObjectsToByteArrays() {
TestWriter writer = new TestWriter();
+ writer.setMarshaller(new JacksonJsonObjectMarshaller<>());
// Empty
Assertions.assertTrue(writer.testConvert(List.of()).isEmpty());
// Not empty
- writer.setRowMapper(Record::toString);
+ writer.setMarshaller(Record::toString);
List actual = writer.testConvert(TestConstants.CHUNK.getItems());
- List expected = TestConstants.CHUNK.getItems().stream().map(PersonDto::toString).map(s -> s.concat("\n")).map(String::getBytes).toList();
+
+ List expected = TestConstants.CHUNK
+ .getItems()
+ .stream()
+ .map(PersonDto::toString)
+ .map(s -> s.concat("\n"))
+ .map(String::getBytes)
+ .toList();
+
Assertions.assertEquals(expected.size(), actual.size());
for (int i = 0; i < actual.size(); i++) {
@@ -112,10 +102,15 @@ void testPerformFormatSpecificChecks() {
BigQuery bigQuery = prepareMockedBigQuery();
Mockito.when(bigQuery.getTable(Mockito.any(TableId.class))).thenReturn(table);
+ // marshaller
+ IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks);
+ Assertions.assertEquals("Marshaller is mandatory", actual.getMessage());
+
// schema
+ writer.setMarshaller(new JacksonJsonObjectMarshaller<>());
writer.setBigQuery(bigQuery);
writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID, FormatOptions.csv()));
- IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks);
+ actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks);
Assertions.assertEquals("Schema must be provided", actual.getMessage());
// schema equality
@@ -144,6 +139,7 @@ void testPerformFormatSpecificChecks_Format(FormatOptions formatOptions) {
TestWriter writer = new TestWriter();
writer.setBigQuery(bigQuery);
+ writer.setMarshaller(new GsonJsonObjectMarshaller<>());
writer.setWriteChannelConfig(WriteChannelConfiguration.newBuilder(TABLE_ID).setAutodetect(true).setFormatOptions(formatOptions).build());
IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks);
@@ -164,9 +160,6 @@ static Stream invalidFormats() {
}
private static final class TestWriter extends BigQueryJsonItemWriter {
- public void testInitializeProperties(List items) {
- doInitializeProperties(items);
- }
public List testConvert(List items) {
return convertObjectsToByteArrays(items);
diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java
index e826f17..b0364eb 100644
--- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java
+++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryJsonItemWriterBuilderTests.java
@@ -30,7 +30,8 @@
import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter;
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder;
-import org.springframework.core.convert.converter.Converter;
+import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
+import org.springframework.batch.item.json.JsonObjectMarshaller;
import java.lang.invoke.MethodHandles;
import java.util.function.Consumer;
@@ -42,7 +43,7 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException {
MethodHandles.Lookup jsonWriterHandle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup());
MethodHandles.Lookup baseWriterHandle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup());
- Converter rowMapper = source -> "";
+ JsonObjectMarshaller marshaller = new JacksonJsonObjectMarshaller<>();
DatasetInfo datasetInfo = DatasetInfo.newBuilder(TestConstants.DATASET).setLocation("europe-west-2").build();
Consumer jobConsumer = job -> {};
BigQuery mockedBigQuery = prepareMockedBigQuery();
@@ -53,7 +54,7 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException {
.build();
BigQueryJsonItemWriter writer = new BigQueryJsonItemWriterBuilder()
- .rowMapper(rowMapper)
+ .marshaller(marshaller)
.writeChannelConfig(writeConfiguration)
.jobConsumer(jobConsumer)
.bigQuery(mockedBigQuery)
@@ -62,8 +63,8 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException {
Assertions.assertNotNull(writer);
- Converter actualRowMapper = (Converter) jsonWriterHandle
- .findVarHandle(BigQueryJsonItemWriter.class, "rowMapper", Converter.class)
+ JsonObjectMarshaller actualMarshaller = (JsonObjectMarshaller) jsonWriterHandle
+ .findVarHandle(BigQueryJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class)
.get(writer);
WriteChannelConfiguration actualWriteChannelConfig = (WriteChannelConfiguration) jsonWriterHandle
@@ -82,7 +83,7 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException {
.findVarHandle(BigQueryJsonItemWriter.class, "datasetInfo", DatasetInfo.class)
.get(writer);
- Assertions.assertEquals(rowMapper, actualRowMapper);
+ Assertions.assertEquals(marshaller, actualMarshaller);
Assertions.assertEquals(writeConfiguration, actualWriteChannelConfig);
Assertions.assertEquals(jobConsumer, actualJobConsumer);
Assertions.assertEquals(mockedBigQuery, actualBigQuery);