Skip to content

Break hard dependency on Jackson for JSON processing #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions spring-batch-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,14 @@
<dependency>
<groupId>org.wiremock</groupId>
<artifactId>wiremock-standalone</artifactId>
<version>3.13.0</version>
<version>3.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@

package org.springframework.batch.extensions.bigquery.writer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Table;
import org.springframework.core.convert.converter.Converter;
import org.springframework.batch.item.json.JsonObjectMarshaller;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

Expand All @@ -42,41 +39,27 @@ public class BigQueryJsonItemWriter<T> extends BigQueryBaseItemWriter<T> {

private static final String LF = "\n";

private Converter<T, String> rowMapper;
private ObjectWriter objectWriter;
private Class<?> itemClass;
private JsonObjectMarshaller<T> marshaller;

@Override
protected void doInitializeProperties(List<? extends T> items) {
if (this.itemClass == null) {
T firstItem = items.stream().findFirst().orElseThrow(() -> {
logger.warn("Class type was not found");
return new IllegalStateException("Class type was not found");
});
this.itemClass = firstItem.getClass();

if (this.rowMapper == null) {
this.objectWriter = new ObjectMapper().writerFor(this.itemClass);
}

logger.debug("Writer setup is completed");
}
// Unused
}

/**
* Converter that transforms a single row into a {@link String}.
*
* @param rowMapper your JSON row mapper
* @param marshaller your JSON mapper
*/
public void setRowMapper(Converter<T, String> rowMapper) {
this.rowMapper = rowMapper;
public void setMarshaller(JsonObjectMarshaller<T> marshaller) {
this.marshaller = marshaller;
}

@Override
protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
return items
.stream()
.map(this::mapItemToJson)
.map(marshaller::marshal)
.filter(Predicate.not(ObjectUtils::isEmpty))
.map(this::convertToNdJson)
.map(row -> row.getBytes(StandardCharsets.UTF_8))
Expand All @@ -85,6 +68,8 @@ protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {

@Override
protected void performFormatSpecificChecks() {
Assert.notNull(this.marshaller, "Marshaller is mandatory");

Table table = getTable();

if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) {
Expand All @@ -106,16 +91,6 @@ protected void performFormatSpecificChecks() {
Assert.isTrue(Objects.equals(format, super.writeChannelConfig.getFormat()), "Only %s format is allowed".formatted(format));
}

private String mapItemToJson(T t) {
try {
return rowMapper == null ? objectWriter.writeValueAsString(t) : rowMapper.convert(t);
}
catch (JsonProcessingException e) {
logger.error("Error during processing of the line: ", e);
return null;
}
}

/**
* BigQuery uses <a href="https://github.com/ndjson/ndjson-spec">ndjson</a>.
* It is expected that to pass here JSON line generated by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.batch.item.json.JsonObjectMarshaller;

import java.util.function.Consumer;

Expand All @@ -35,8 +35,7 @@
*/
public class BigQueryJsonItemWriterBuilder<T> {

private Converter<T, String> rowMapper;

private JsonObjectMarshaller<T> marshaller;
private Consumer<Job> jobConsumer;
private DatasetInfo datasetInfo;
private WriteChannelConfiguration writeChannelConfig;
Expand All @@ -45,12 +44,12 @@ public class BigQueryJsonItemWriterBuilder<T> {
/**
* Converts your DTO into a {@link String}.
*
* @param rowMapper your mapping
* @param marshaller your mapper
* @return {@link BigQueryJsonItemWriter}
* @see BigQueryJsonItemWriter#setRowMapper(Converter)
* @see BigQueryJsonItemWriter#setMarshaller(JsonObjectMarshaller)
*/
public BigQueryJsonItemWriterBuilder<T> rowMapper(Converter<T, String> rowMapper) {
this.rowMapper = rowMapper;
public BigQueryJsonItemWriterBuilder<T> marshaller(JsonObjectMarshaller<T> marshaller) {
this.marshaller = marshaller;
return this;
}

Expand Down Expand Up @@ -110,7 +109,7 @@ public BigQueryJsonItemWriterBuilder<T> bigQuery(BigQuery bigQuery) {
public BigQueryJsonItemWriter<T> build() {
BigQueryJsonItemWriter<T> writer = new BigQueryJsonItemWriter<>();

writer.setRowMapper(this.rowMapper);
writer.setMarshaller(this.marshaller);
writer.setWriteChannelConfig(this.writeChannelConfig);
writer.setJobConsumer(this.jobConsumer);
writer.setBigQuery(this.bigQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;

import java.util.stream.Stream;

Expand All @@ -52,6 +53,7 @@ void testWrite(String table, boolean autodetect) throws Exception {
BigQueryJsonItemWriter<PersonDto> writer = new BigQueryJsonItemWriterBuilder<PersonDto>()
.bigQuery(bigQuery)
.writeChannelConfig(channelConfig)
.marshaller(new JacksonJsonObjectMarshaller<>())
.build();
writer.afterPropertiesSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.springframework.batch.extensions.bigquery.unit.writer;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FormatOptions;
Expand All @@ -36,7 +34,9 @@
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest;
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.batch.item.json.GsonJsonObjectMarshaller;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonObjectMarshaller;

import java.lang.invoke.MethodHandles;
import java.util.List;
Expand All @@ -47,34 +47,15 @@ class BigQueryJsonItemWriterTest extends AbstractBigQueryTest {
private static final TableId TABLE_ID = TableId.of("1", "2");

@Test
void testDoInitializeProperties() throws IllegalAccessException, NoSuchFieldException {
TestWriter writer = new TestWriter();
List<PersonDto> items = TestConstants.CHUNK.getItems();
MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup());

// Exception
Assertions.assertThrows(IllegalStateException.class, () -> writer.testInitializeProperties(List.of()));

// No exception
writer.testInitializeProperties(items);
Assertions.assertEquals(
PersonDto.class.getSimpleName(),
((Class<PersonDto>) handle.findVarHandle(BigQueryJsonItemWriter.class, "itemClass", Class.class).get(writer)).getSimpleName()
);
ObjectWriter objectWriter = (ObjectWriter) handle.findVarHandle(BigQueryJsonItemWriter.class, "objectWriter", ObjectWriter.class).get(writer);
Assertions.assertInstanceOf(JsonFactory.class, objectWriter.getFactory());
}

@Test
void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException {
void testSetMarshaller() throws IllegalAccessException, NoSuchFieldException {
BigQueryJsonItemWriter<PersonDto> reader = new BigQueryJsonItemWriter<>();
MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup());
Converter<PersonDto, String> expected = source -> null;
JsonObjectMarshaller<PersonDto> expected = new JacksonJsonObjectMarshaller<>();

reader.setRowMapper(expected);
reader.setMarshaller(expected);

Converter<PersonDto, String> actual = (Converter<PersonDto, String>) handle
.findVarHandle(BigQueryJsonItemWriter.class, "rowMapper", Converter.class)
JsonObjectMarshaller<PersonDto> actual = (JsonObjectMarshaller<PersonDto>) handle
.findVarHandle(BigQueryJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class)
.get(reader);

Assertions.assertEquals(expected, actual);
Expand All @@ -83,14 +64,23 @@ void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException {
@Test
void testConvertObjectsToByteArrays() {
TestWriter writer = new TestWriter();
writer.setMarshaller(new JacksonJsonObjectMarshaller<>());

// Empty
Assertions.assertTrue(writer.testConvert(List.of()).isEmpty());

// Not empty
writer.setRowMapper(Record::toString);
writer.setMarshaller(Record::toString);
List<byte[]> actual = writer.testConvert(TestConstants.CHUNK.getItems());
List<byte[]> expected = TestConstants.CHUNK.getItems().stream().map(PersonDto::toString).map(s -> s.concat("\n")).map(String::getBytes).toList();

List<byte[]> expected = TestConstants.CHUNK
.getItems()
.stream()
.map(PersonDto::toString)
.map(s -> s.concat("\n"))
.map(String::getBytes)
.toList();

Assertions.assertEquals(expected.size(), actual.size());

for (int i = 0; i < actual.size(); i++) {
Expand All @@ -112,10 +102,15 @@ void testPerformFormatSpecificChecks() {
BigQuery bigQuery = prepareMockedBigQuery();
Mockito.when(bigQuery.getTable(Mockito.any(TableId.class))).thenReturn(table);

// marshaller
IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks);
Assertions.assertEquals("Marshaller is mandatory", actual.getMessage());

// schema
writer.setMarshaller(new JacksonJsonObjectMarshaller<>());
writer.setBigQuery(bigQuery);
writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID, FormatOptions.csv()));
IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks);
actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks);
Assertions.assertEquals("Schema must be provided", actual.getMessage());

// schema equality
Expand Down Expand Up @@ -144,6 +139,7 @@ void testPerformFormatSpecificChecks_Format(FormatOptions formatOptions) {

TestWriter writer = new TestWriter();
writer.setBigQuery(bigQuery);
writer.setMarshaller(new GsonJsonObjectMarshaller<>());

writer.setWriteChannelConfig(WriteChannelConfiguration.newBuilder(TABLE_ID).setAutodetect(true).setFormatOptions(formatOptions).build());
IllegalArgumentException actual = Assertions.assertThrows(IllegalArgumentException.class, writer::testPerformFormatSpecificChecks);
Expand All @@ -164,9 +160,6 @@ static Stream<FormatOptions> invalidFormats() {
}

private static final class TestWriter extends BigQueryJsonItemWriter<PersonDto> {
public void testInitializeProperties(List<PersonDto> items) {
doInitializeProperties(items);
}

public List<byte[]> testConvert(List<PersonDto> items) {
return convertObjectsToByteArrays(items);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter;
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder;
import org.springframework.core.convert.converter.Converter;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JsonObjectMarshaller;

import java.lang.invoke.MethodHandles;
import java.util.function.Consumer;
Expand All @@ -42,7 +43,7 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException {
MethodHandles.Lookup jsonWriterHandle = MethodHandles.privateLookupIn(BigQueryJsonItemWriter.class, MethodHandles.lookup());
MethodHandles.Lookup baseWriterHandle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup());

Converter<PersonDto, String> rowMapper = source -> "";
JsonObjectMarshaller<PersonDto> marshaller = new JacksonJsonObjectMarshaller<>();
DatasetInfo datasetInfo = DatasetInfo.newBuilder(TestConstants.DATASET).setLocation("europe-west-2").build();
Consumer<Job> jobConsumer = job -> {};
BigQuery mockedBigQuery = prepareMockedBigQuery();
Expand All @@ -53,7 +54,7 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException {
.build();

BigQueryJsonItemWriter<PersonDto> writer = new BigQueryJsonItemWriterBuilder<PersonDto>()
.rowMapper(rowMapper)
.marshaller(marshaller)
.writeChannelConfig(writeConfiguration)
.jobConsumer(jobConsumer)
.bigQuery(mockedBigQuery)
Expand All @@ -62,8 +63,8 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException {

Assertions.assertNotNull(writer);

Converter<PersonDto, String> actualRowMapper = (Converter<PersonDto, String>) jsonWriterHandle
.findVarHandle(BigQueryJsonItemWriter.class, "rowMapper", Converter.class)
JsonObjectMarshaller<PersonDto> actualMarshaller = (JsonObjectMarshaller<PersonDto>) jsonWriterHandle
.findVarHandle(BigQueryJsonItemWriter.class, "marshaller", JsonObjectMarshaller.class)
.get(writer);

WriteChannelConfiguration actualWriteChannelConfig = (WriteChannelConfiguration) jsonWriterHandle
Expand All @@ -82,7 +83,7 @@ void testBuild() throws IllegalAccessException, NoSuchFieldException {
.findVarHandle(BigQueryJsonItemWriter.class, "datasetInfo", DatasetInfo.class)
.get(writer);

Assertions.assertEquals(rowMapper, actualRowMapper);
Assertions.assertEquals(marshaller, actualMarshaller);
Assertions.assertEquals(writeConfiguration, actualWriteChannelConfig);
Assertions.assertEquals(jobConsumer, actualJobConsumer);
Assertions.assertEquals(mockedBigQuery, actualBigQuery);
Expand Down