Skip to content

[bq] improve tests coverage #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ public void write(final Chunk<? extends T> chunk) throws Exception {
this.logger.debug(String.format("Mapping %d elements", items.size()));
}

final ByteBuffer byteBuffer = mapDataToBigQueryFormat(items);
doWriteDataToBigQuery(byteBuffer);
doWriteDataToBigQuery(mapDataToBigQueryFormat(items));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,6 @@ protected synchronized void doInitializeProperties(List<? extends T> items) {
}
}

/**
* Row mapper which transforms single BigQuery row into a desired type.
*
* @param rowMapper your row mapper
*/
public void setRowMapper(Converter<T, byte[]> rowMapper) {
this.rowMapper = rowMapper;
}

@Override
protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
return items
Expand Down Expand Up @@ -105,6 +96,15 @@ protected void performFormatSpecificChecks() {

}

/**
* Row mapper which transforms single BigQuery row into a desired type.
*
* @param rowMapper your row mapper
*/
public void setRowMapper(Converter<T, byte[]> rowMapper) {
this.rowMapper = rowMapper;
}

private byte[] mapItemToCsv(T t) {
try {
return rowMapper == null ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,6 @@ protected void doInitializeProperties(List<? extends T> items) {
// Unused
}

/**
* Converter that transforms a single row into a {@link String}.
*
* @param marshaller your JSON mapper
*/
public void setMarshaller(JsonObjectMarshaller<T> marshaller) {
this.marshaller = marshaller;
}

@Override
protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
return items
Expand Down Expand Up @@ -91,6 +82,15 @@ protected void performFormatSpecificChecks() {
Assert.isTrue(Objects.equals(format, super.writeChannelConfig.getFormat()), "Only %s format is allowed".formatted(format));
}

/**
* Converter that transforms a single row into a {@link String}.
*
* @param marshaller your JSON mapper
*/
public void setMarshaller(JsonObjectMarshaller<T> marshaller) {
this.marshaller = marshaller;
}

/**
* BigQuery uses <a href="https://github.com/ndjson/ndjson-spec">ndjson</a>.
* It is expected that to pass here JSON line generated by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest;
import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter;
import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException;

import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -144,6 +145,46 @@ void testWrite() throws Exception {
Mockito.verifyNoMoreInteractions(channel);
}

@Test
void testWrite_Exception() throws Exception {
MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup());
AtomicBoolean consumerCalled = new AtomicBoolean();

Job job = Mockito.mock(Job.class);
Mockito.when(job.getJobId()).thenReturn(JobId.newBuilder().build());

TableDataWriteChannel channel = Mockito.mock(TableDataWriteChannel.class);
Mockito.when(channel.getJob()).thenReturn(job);
Mockito.when(channel.write(Mockito.any(ByteBuffer.class))).thenThrow(BigQueryException.class);

BigQuery bigQuery = prepareMockedBigQuery();
Mockito.when(bigQuery.writer(Mockito.any(WriteChannelConfiguration.class))).thenReturn(channel);

TestWriter writer = new TestWriter();
writer.setBigQuery(bigQuery);
writer.setJobConsumer(j -> consumerCalled.set(true));
writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID));

BigQueryItemWriterException actual = Assertions.assertThrows(BigQueryItemWriterException.class, () -> writer.write(TestConstants.CHUNK));
Assertions.assertEquals("Error on write happened", actual.getMessage());

AtomicLong actualCounter = (AtomicLong) handle
.findVarHandle(BigQueryBaseItemWriter.class, "bigQueryWriteCounter", AtomicLong.class)
.get(writer);

boolean writeFailed = (Boolean) handle
.findVarHandle(BigQueryBaseItemWriter.class, "writeFailed", boolean.class)
.get(writer);

Assertions.assertEquals(0L, actualCounter.get());
Assertions.assertTrue(writeFailed);
Assertions.assertFalse(consumerCalled.get());

Mockito.verify(channel).write(Mockito.any(ByteBuffer.class));
Mockito.verify(channel).close();
Mockito.verifyNoMoreInteractions(channel);
}

@Test
void testBaseAfterPropertiesSet_Exception() {
TestWriter writer = new TestWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package org.springframework.batch.extensions.bigquery.unit.writer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.dataformat.csv.CsvFactory;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FormatOptions;
Expand Down Expand Up @@ -83,14 +85,38 @@ void testSetRowMapper() throws IllegalAccessException, NoSuchFieldException {
@Test
void testConvertObjectsToByteArrays() {
TestWriter writer = new TestWriter();
List<PersonDto> items = TestConstants.CHUNK.getItems();

// Empty
Assertions.assertTrue(writer.testConvert(List.of()).isEmpty());

// Not empty
// Not empty (row mapper)
writer.setRowMapper(source -> source.toString().getBytes());
List<byte[]> actual = writer.testConvert(TestConstants.CHUNK.getItems());
List<byte[]> expected = TestConstants.CHUNK.getItems().stream().map(PersonDto::toString).map(String::getBytes).toList();
List<byte[]> actual = writer.testConvert(items);
List<byte[]> expected = items.stream().map(PersonDto::toString).map(String::getBytes).toList();
Assertions.assertEquals(expected.size(), actual.size());

for (int i = 0; i < actual.size(); i++) {
Assertions.assertArrayEquals(expected.get(i), actual.get(i));
}

// Not empty (object writer)
ObjectWriter csvWriter = new CsvMapper().writerWithTypedSchemaFor(PersonDto.class);
writer.setRowMapper(null);
writer.testInitializeProperties(items);
actual = writer.testConvert(items);

expected = items
.stream()
.map(pd -> {
try {
return csvWriter.writeValueAsBytes(pd);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.toList();

Assertions.assertEquals(expected.size(), actual.size());

for (int i = 0; i < actual.size(); i++) {
Expand Down