Skip to content

Commit ca4bbd8

Browse files
authored
Merge pull request #1120 from data-integrations/PLUGIN-1367
[PLUGIN-1367] Fixed null pointer exception when sink schema is not set and SQL Engine Output is initialized.
2 parents baaf0b0 + bfaf653 commit ca4bbd8

File tree

2 files changed

+38
-12
lines changed

2 files changed

+38
-12
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ void initSQLEngineOutput(BatchSinkContext context,
161161
String tableName,
162162
@Nullable Schema tableSchema,
163163
FailureCollector collector) {
164+
// Sink Pushdown is not supported if the sink schema is not defined.
165+
if (tableSchema == null) {
166+
LOG.debug("BigQuery SQL Engine Output was not initialized. Schema was empty.");
167+
return;
168+
}
169+
164170
List<BigQueryTableFieldSchema> fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema,
165171
getConfig().isAllowSchemaRelaxation(),
166172
config.getDatasetProject(), config.getDataset(), config.isTruncateTableSet(), collector);
@@ -175,7 +181,7 @@ void initSQLEngineOutput(BatchSinkContext context,
175181
arguments
176182
.put(BigQueryWrite.SQL_OUTPUT_JOB_ID, jobId + "_write")
177183
.put(BigQueryWrite.SQL_OUTPUT_CONFIG, GSON.toJson(config))
178-
.put(BigQueryWrite.SQL_OUTPUT_SCHEMA, tableSchema != null ? GSON.toJson(tableSchema) : null)
184+
.put(BigQueryWrite.SQL_OUTPUT_SCHEMA, GSON.toJson(tableSchema))
179185
.put(BigQueryWrite.SQL_OUTPUT_FIELDS, GSON.toJson(fieldNames));
180186

181187
context.addOutput(new SQLEngineOutput(outputName,

src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,13 @@
4343

4444
import java.util.List;
4545

46+
import static org.mockito.ArgumentMatchers.any;
4647
import static org.mockito.ArgumentMatchers.anyInt;
4748
import static org.mockito.ArgumentMatchers.anyString;
4849
import static org.mockito.Mockito.mock;
50+
import static org.mockito.Mockito.never;
4951
import static org.mockito.Mockito.times;
52+
import static org.mockito.Mockito.verify;
5053
import static org.mockito.Mockito.when;
5154

5255
/**
@@ -87,19 +90,21 @@ public void testBigQuerySinkInvalidConfig() {
8790

8891
@Test
8992
public void testBigQueryTimePartitionConfig() {
90-
Schema schema = Schema.recordOf("record",
91-
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
92-
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
93-
Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)),
94-
Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))),
95-
Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)),
96-
Schema.Field.of("timestamp",
97-
Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))));
93+
Schema schema =
94+
Schema.recordOf("record",
95+
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
96+
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
97+
Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)),
98+
Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))),
99+
Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)),
100+
Schema.Field.of("timestamp",
101+
Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))));
98102

99-
BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(),
100-
"TIME", 0L, 100L, 10L, null);
103+
BigQuerySinkConfig config =
104+
new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(),
105+
"TIME", 0L, 100L, 10L, null);
101106
config.partitionByField = "dt";
102-
107+
103108
MockFailureCollector collector = new MockFailureCollector("bqsink");
104109
config.validate(collector);
105110
Assert.assertEquals(0, collector.getValidationFailures().size());
@@ -382,4 +387,19 @@ public void testDatasetWithSpecialCharacters() {
382387
Assert.assertEquals("new_table_2020", multiSink.sanitizeOutputName("new!table?2020"));
383388
Assert.assertEquals("new_table_2020", multiSink.sanitizeOutputName("new^table|2020"));
384389
}
390+
391+
@Test
392+
public void testInitSQLEngineOutputDoesNotInitOutputWithNullSchema() throws Exception {
393+
BatchSinkContext sinkContext = mock(BatchSinkContext.class);
394+
MockFailureCollector collector = new MockFailureCollector("bqsink");
395+
396+
BigQuerySinkConfig config =
397+
new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", null,
398+
null, null, null, null, null);
399+
BigQuery bigQueryMock = mock(BigQuery.class);
400+
401+
BigQuerySink sink = new BigQuerySink(config);
402+
sink.initSQLEngineOutput(sinkContext, bigQueryMock, "sink", "sink", "table", null, collector);
403+
verify(sinkContext, never()).addOutput(any());
404+
}
385405
}

0 commit comments

Comments
 (0)