Skip to content

Commit b655c99

Browse files
authored
Merge pull request #1446 from cloudsufi/cherry-pick/rc-0.23.3
[🍒][PLUGIN-430][PLUGIN-1803][PLUGIN-1805][PLUGIN-1742] GCS/BQ Patch
2 parents 546fc7b + 5d1701e commit b655c99

File tree

12 files changed

+137
-47
lines changed

12 files changed

+137
-47
lines changed

docs/BigQueryArgumentSetter-action.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323

2424
Properties
2525
----------
26-
**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
27-
2826
**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
2927
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
3028
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account

docs/BigQueryTable-batchsink.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ is ignored if the table already exists.
110110
**Time Partitioning Type**: Specifies the time partitioning type. Can either be Daily or Hourly or Monthly or Yearly.
111111
Default is Daily. Ignored when table already exists
112112

113+
> The table below shows the compatibility of different time schema types with various time partitioning types in BigQuery.
114+
115+
| Schema Type / Partion Type | Hourly | Daily | Monthly | Yearly |
116+
|-------------------------| ------- | ------- | ------- | ------- |
117+
| TIMESTAMP_MILLIS | ✓ | ✓ | ✓ | ✓ |
118+
| TIMESTAMP_MICROS | ✓ | ✓ | ✓ | ✓ |
119+
| DATETIME | ✓ | ✓ | ✓ | ✓ |
120+
| DATE | ✗ | ✓ | ✓ | ✓ |
121+
| TIME_MILLIS | ✗ | ✗ | ✗ | ✗ |
122+
| TIME_MICROS | ✗ | ✗ | ✗ | ✗ |
123+
113124
**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
114125
exist already, and partitioning type is set to Integer.
115126
* The start value is inclusive.

docs/GCSArgumentSetter-action.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ must be readable by all users running the job.
3737

3838
Properties
3939
----------
40-
**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
41-
4240
**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
4341
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
4442
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
<bigquery.connector.hadoop2.version>hadoop2-1.2.0</bigquery.connector.hadoop2.version>
7474
<commons.codec.version>1.4</commons.codec.version>
7575
<cdap.version>6.9.1</cdap.version>
76-
<cdap.plugin.version>2.11.1</cdap.plugin.version>
76+
<cdap.plugin.version>2.12.1</cdap.plugin.version>
7777
<dropwizard.metrics-core.version>3.2.6</dropwizard.metrics-core.version>
7878
<flogger.system.backend.version>0.7.1</flogger.system.backend.version>
7979
<gcs.connector.version>hadoop2-2.2.9</gcs.connector.version>
@@ -1245,7 +1245,7 @@
12451245
<dependency>
12461246
<groupId>io.cdap.tests.e2e</groupId>
12471247
<artifactId>cdap-e2e-framework</artifactId>
1248-
<version>0.3.1</version>
1248+
<version>0.3.2</version>
12491249
<scope>test</scope>
12501250
</dependency>
12511251
<dependency>

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -506,23 +506,21 @@ private void validateTimePartitioningColumn(String columnName, FailureCollector
506506

507507
boolean isTimestamp = logicalType == LogicalType.TIMESTAMP_MICROS || logicalType == LogicalType.TIMESTAMP_MILLIS;
508508
boolean isDate = logicalType == LogicalType.DATE;
509-
boolean isTimestampOrDate = isTimestamp || isDate;
510-
511-
// If timePartitioningType is HOUR, then logicalType cannot be DATE Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS
512-
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestamp) {
513-
collector.addFailure(
514-
String.format("Partition column '%s' is of invalid type '%s'.",
509+
boolean isDateTime = logicalType == LogicalType.DATETIME;
510+
boolean isTimestampOrDateOrDateTime = isTimestamp || isDate || isDateTime;
511+
boolean isTimestampOrDateTime = isTimestamp || isDateTime;
512+
// TimePartitioningType HOUR is supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATETIME
513+
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestampOrDateTime) {
514+
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
515515
columnName, fieldSchema.getDisplayName()),
516-
"Partition column must be a timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
517-
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
518-
519-
// For any other timePartitioningType (DAY, MONTH, YEAR) logicalType can be DATE, TIMESTAMP_MICROS, TIMESTAMP_MILLIS
520-
} else if (!isTimestampOrDate) {
521-
collector.addFailure(
522-
String.format("Partition column '%s' is of invalid type '%s'.",
516+
"Partition column must be of type TIMESTAMP or DATETIME")
517+
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
518+
// TimePartitioningType (DAY, MONTH, YEAR) are supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATE, DATETIME
519+
} else if (!isTimestampOrDateOrDateTime) {
520+
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
523521
columnName, fieldSchema.getDisplayName()),
524-
"Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
525-
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
522+
"Partition column must be of type TIMESTAMP, DATE or DATETIME")
523+
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
526524
}
527525
}
528526

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,15 +438,28 @@ public static ValidationFailure validateFieldSchemaMatches(Field bqField, Schema
438438
if (bqField.getMode() == Field.Mode.REPEATED) {
439439
fieldSchema = fieldSchema.getComponentSchema();
440440
type = fieldSchema.getType();
441+
logicalType = fieldSchema.getLogicalType();
441442
}
442443
}
443444

445+
String[] incompatibleFieldErrorMessage = {
446+
String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' in BigQuery table '%s.%s'.",
447+
field.getName(), fieldSchema.getDisplayName(), bqField.getName(),
448+
BQ_TYPE_MAP.get(bqField.getType()), dataset, table) ,
449+
String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType()))
450+
};
451+
if (logicalType != null) {
452+
if (LOGICAL_TYPE_MAP.get(logicalType) != null && !LOGICAL_TYPE_MAP.get(logicalType).contains(bqField.getType())) {
453+
return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]);
454+
}
455+
456+
// Return once logical types are validated. This is because logical types are represented as primitive types
457+
// internally.
458+
return null;
459+
}
460+
444461
if (TYPE_MAP.get(type) != null && !TYPE_MAP.get(type).contains(bqField.getType())) {
445-
return collector.addFailure(
446-
String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' " +
447-
"in BigQuery table '%s.%s'.", field.getName(), fieldSchema.getDisplayName(), bqField.getName(),
448-
BQ_TYPE_MAP.get(bqField.getType()), dataset, table),
449-
String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType())));
462+
return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]);
450463
}
451464
return null;
452465
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright © 2024 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*
16+
*/
17+
18+
package io.cdap.plugin.gcp.common;
19+
20+
import io.cdap.plugin.format.input.AbstractEmptyInputFormat;
21+
22+
23+
/**
24+
* An InputFormat that returns no data.
25+
* @param <K> the type of key
26+
* @param <V> the type of value
27+
*/
28+
public class GCSEmptyInputFormat<K, V> extends AbstractEmptyInputFormat<K, V> {
29+
// no-op
30+
}

src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.cdap.plugin.format.plugin.FileSourceProperties;
4444
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
4545
import io.cdap.plugin.gcp.common.GCPUtils;
46+
import io.cdap.plugin.gcp.common.GCSEmptyInputFormat;
4647
import io.cdap.plugin.gcp.crypto.EncryptedFileSystem;
4748
import io.cdap.plugin.gcp.gcs.GCSPath;
4849
import io.cdap.plugin.gcp.gcs.connector.GCSConnector;
@@ -77,6 +78,11 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
7778
super.configurePipeline(pipelineConfigurer);
7879
}
7980

81+
@Override
82+
protected String getEmptyInputFormatClassName() {
83+
return GCSEmptyInputFormat.class.getName();
84+
}
85+
8086
@Override
8187
public void prepareRun(BatchSourceContext context) throws Exception {
8288
// Get location of the source for lineage
@@ -268,11 +274,6 @@ public Long getMinSplitSize() {
268274
return minSplitSize;
269275
}
270276

271-
@Override
272-
public boolean shouldAllowEmptyInput() {
273-
return false;
274-
}
275-
276277
public boolean isCopyHeader() {
277278
return shouldCopyHeader();
278279
}

src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,34 @@ public void testValidateTimePartitioningColumnWithNullAndDate() throws
129129
Assert.assertEquals(0, collector.getValidationFailures().size());
130130
}
131131

132+
@Test
133+
public void testValidateTimePartitioningColumnWithMonthAndDateTime() throws
134+
InvocationTargetException, IllegalAccessException {
135+
136+
String columnName = "partitionFrom";
137+
Schema schema = Schema.of(Schema.LogicalType.DATETIME);
138+
139+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
140+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.MONTH;
141+
142+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
143+
Assert.assertEquals(0, collector.getValidationFailures().size());
144+
}
145+
146+
@Test
147+
public void testValidateTimePartitioningColumnWithHourAndDateTime() throws
148+
InvocationTargetException, IllegalAccessException {
149+
150+
String columnName = "partitionFrom";
151+
Schema schema = Schema.of(Schema.LogicalType.DATETIME);
152+
153+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
154+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR;
155+
156+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
157+
Assert.assertEquals(0, collector.getValidationFailures().size());
158+
}
159+
132160
@Test
133161
public void testValidateColumnNameWithValidColumnName() {
134162
String columnName = "test";

src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.cdap.cdap.etl.api.FailureCollector;
2424
import io.cdap.cdap.etl.api.validation.ValidationFailure;
2525
import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
26+
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig;
2627
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.BigNumeric;
2728
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric;
2829
import io.cdap.plugin.gcp.common.GCPUtils;
@@ -478,4 +479,32 @@ public void testConvertFieldTypeJsonToString() {
478479
Schema result = BigQueryUtil.convertFieldType(field, null, null);
479480
Assert.assertEquals(expectedSchema, result);
480481
}
482+
483+
@Test
484+
public void testValidateFieldSchemaMatchesDate() {
485+
MockFailureCollector collector = new MockFailureCollector();
486+
Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE)
487+
.setMode(Field.Mode.REPEATED).build();
488+
Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate",
489+
Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.LogicalType.DATE))));
490+
ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset",
491+
"table", BigQuerySourceConfig.SUPPORTED_TYPES, collector);
492+
Assert.assertNull(failure);
493+
Assert.assertEquals(0, collector.getValidationFailures().size());
494+
}
495+
496+
@Test
497+
public void testValidateFieldSchemaNotMatchesDate() {
498+
MockFailureCollector collector = new MockFailureCollector();
499+
Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE)
500+
.setMode(Field.Mode.REPEATED).build();
501+
Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate",
502+
Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING))));
503+
ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset",
504+
"table", BigQuerySourceConfig.SUPPORTED_TYPES, collector);
505+
Assert.assertNotNull(failure);
506+
Assert.assertEquals(1, collector.getValidationFailures().size());
507+
Assert.assertEquals("Field 'testFieldRepeatedDate' of type 'string' is incompatible with" +
508+
" column 'testFieldRepeatedDate' of type 'date' in BigQuery table 'dataset.table'.", failure.getMessage());
509+
}
481510
}

0 commit comments

Comments
 (0)