diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java index 91dc7a2..f47984c 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryBaseItemWriter.java @@ -123,8 +123,7 @@ public void write(final Chunk chunk) throws Exception { this.logger.debug(String.format("Mapping %d elements", items.size())); } - final ByteBuffer byteBuffer = mapDataToBigQueryFormat(items); - doWriteDataToBigQuery(byteBuffer); + doWriteDataToBigQuery(mapDataToBigQueryFormat(items)); } } diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java index 904bf0b..f4b2e7c 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryCsvItemWriter.java @@ -63,15 +63,6 @@ protected synchronized void doInitializeProperties(List items) { } } - /** - * Row mapper which transforms single BigQuery row into a desired type. - * - * @param rowMapper your row mapper - */ - public void setRowMapper(Converter rowMapper) { - this.rowMapper = rowMapper; - } - @Override protected List convertObjectsToByteArrays(List items) { return items @@ -105,6 +96,15 @@ protected void performFormatSpecificChecks() { } + /** + * Row mapper which transforms single BigQuery row into a desired type. + * + * @param rowMapper your row mapper + */ + public void setRowMapper(Converter rowMapper) { + this.rowMapper = rowMapper; + } + private byte[] mapItemToCsv(T t) { try { return rowMapper == null ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t); diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java index e5c068c..9200f7b 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/writer/BigQueryJsonItemWriter.java @@ -46,15 +46,6 @@ protected void doInitializeProperties(List items) { // Unused } - /** - * Converter that transforms a single row into a {@link String}. - * - * @param marshaller your JSON mapper - */ - public void setMarshaller(JsonObjectMarshaller marshaller) { - this.marshaller = marshaller; - } - @Override protected List convertObjectsToByteArrays(List items) { return items @@ -91,6 +82,15 @@ protected void performFormatSpecificChecks() { Assert.isTrue(Objects.equals(format, super.writeChannelConfig.getFormat()), "Only %s format is allowed".formatted(format)); } + /** + * Converter that transforms a single row into a {@link String}. + * + * @param marshaller your JSON mapper + */ + public void setMarshaller(JsonObjectMarshaller marshaller) { + this.marshaller = marshaller; + } + /** * BigQuery uses ndjson. * It is expected that to pass here JSON line generated by diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java index 17af485..9c9a75a 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryBaseItemWriterTest.java @@ -24,6 +24,7 @@ import org.springframework.batch.extensions.bigquery.common.TestConstants; import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest; import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter; +import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException; import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; @@ -144,6 +145,46 @@ void testWrite() throws Exception { Mockito.verifyNoMoreInteractions(channel); } + @Test + void testWrite_Exception() throws Exception { + MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup()); + AtomicBoolean consumerCalled = new AtomicBoolean(); + + Job job = Mockito.mock(Job.class); + Mockito.when(job.getJobId()).thenReturn(JobId.newBuilder().build()); + + TableDataWriteChannel channel = Mockito.mock(TableDataWriteChannel.class); + Mockito.when(channel.getJob()).thenReturn(job); + Mockito.when(channel.write(Mockito.any(ByteBuffer.class))).thenThrow(BigQueryException.class); + + BigQuery bigQuery = prepareMockedBigQuery(); + Mockito.when(bigQuery.writer(Mockito.any(WriteChannelConfiguration.class))).thenReturn(channel); + + TestWriter writer = new TestWriter(); + writer.setBigQuery(bigQuery); + writer.setJobConsumer(j -> consumerCalled.set(true)); + writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID)); + + BigQueryItemWriterException actual = Assertions.assertThrows(BigQueryItemWriterException.class, () -> writer.write(TestConstants.CHUNK)); + Assertions.assertEquals("Error on write happened", actual.getMessage()); + + AtomicLong actualCounter = (AtomicLong) handle + .findVarHandle(BigQueryBaseItemWriter.class, "bigQueryWriteCounter", AtomicLong.class) + .get(writer); + + boolean writeFailed = (Boolean) handle + .findVarHandle(BigQueryBaseItemWriter.class, "writeFailed", boolean.class) + .get(writer); + + Assertions.assertEquals(0L, actualCounter.get()); + Assertions.assertTrue(writeFailed); + Assertions.assertFalse(consumerCalled.get()); + + Mockito.verify(channel).write(Mockito.any(ByteBuffer.class)); + Mockito.verify(channel).close(); + Mockito.verifyNoMoreInteractions(channel); + } + @Test void testBaseAfterPropertiesSet_Exception() { TestWriter writer = new TestWriter(); diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java index 7c55408..4f801a7 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/BigQueryCsvItemWriterTest.java @@ -16,8 +16,10 @@ package org.springframework.batch.extensions.bigquery.unit.writer; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.dataformat.csv.CsvFactory; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FormatOptions; @@ -83,14 +85,38 @@ void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException { @Test void testConvertObjectsToByteArrays() { TestWriter writer = new TestWriter(); + List items = TestConstants.CHUNK.getItems(); // Empty Assertions.assertTrue(writer.testConvert(List.of()).isEmpty()); - // Not empty + // Not empty (row mapper) writer.setRowMapper(source -> source.toString().getBytes()); - List actual = writer.testConvert(TestConstants.CHUNK.getItems()); - List expected = TestConstants.CHUNK.getItems().stream().map(PersonDto::toString).map(String::getBytes).toList(); + List actual = writer.testConvert(items); + List expected = items.stream().map(PersonDto::toString).map(String::getBytes).toList(); + Assertions.assertEquals(expected.size(), actual.size()); + + for (int i = 0; i < actual.size(); i++) { + Assertions.assertArrayEquals(expected.get(i), actual.get(i)); + } + + // Not empty (object writer) + ObjectWriter csvWriter = new CsvMapper().writerWithTypedSchemaFor(PersonDto.class); + writer.setRowMapper(null); + writer.testInitializeProperties(items); + actual = writer.testConvert(items); + + expected = items + .stream() + .map(pd -> { + try { + return csvWriter.writeValueAsBytes(pd); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }) + .toList(); + Assertions.assertEquals(expected.size(), actual.size()); for (int i = 0; i < actual.size(); i++) {