Skip to content

Commit 407b7c4

Browse files
committed
Fix timpe-partion datetime validation
1 parent f157898 commit 407b7c4

File tree

3 files changed

+52
-15
lines changed

3 files changed

+52
-15
lines changed

docs/BigQueryTable-batchsink.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ is ignored if the table already exists.
110110
**Time Partitioning Type**: Specifies the time partitioning type. Can either be Daily or Hourly or Monthly or Yearly.
111111
Default is Daily. Ignored when table already exists
112112

113+
> The table below shows the compatibility of different time schema types with various time partitioning types in BigQuery.
114+
115+
| Schema Type / Partion Type | Hourly | Daily | Monthly | Yearly |
116+
|-------------------------| ------- | ------- | ------- | ------- |
117+
| TIMESTAMP_MILLIS | ✓ | ✓ | ✓ | ✓ |
118+
| TIMESTAMP_MICROS | ✓ | ✓ | ✓ | ✓ |
119+
| DATETIME | ✓ | ✓ | ✓ | ✓ |
120+
| DATE | ✗ | ✓ | ✓ | ✓ |
121+
| TIME_MILLIS | ✗ | ✗ | ✗ | ✗ |
122+
| TIME_MICROS | ✗ | ✗ | ✗ | ✗ |
123+
113124
**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
114125
exist already, and partitioning type is set to Integer.
115126
* The start value is inclusive.

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -526,23 +526,21 @@ private void validateTimePartitioningColumn(String columnName, FailureCollector
526526

527527
boolean isTimestamp = logicalType == LogicalType.TIMESTAMP_MICROS || logicalType == LogicalType.TIMESTAMP_MILLIS;
528528
boolean isDate = logicalType == LogicalType.DATE;
529-
boolean isTimestampOrDate = isTimestamp || isDate;
530-
531-
// If timePartitioningType is HOUR, then logicalType cannot be DATE Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS
532-
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestamp) {
533-
collector.addFailure(
534-
String.format("Partition column '%s' is of invalid type '%s'.",
529+
boolean isDateTime = logicalType == LogicalType.DATETIME;
530+
boolean isTimestampOrDateOrDateTime = isTimestamp || isDate || isDateTime;
531+
boolean isTimestampOrDateTime = isTimestamp || isDateTime;
532+
// TimePartitioningType HOUR is supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATETIME
533+
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestampOrDateTime) {
534+
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
535535
columnName, fieldSchema.getDisplayName()),
536-
"Partition column must be a timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
537-
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
538-
539-
// For any other timePartitioningType (DAY, MONTH, YEAR) logicalType can be DATE, TIMESTAMP_MICROS, TIMESTAMP_MILLIS
540-
} else if (!isTimestampOrDate) {
541-
collector.addFailure(
542-
String.format("Partition column '%s' is of invalid type '%s'.",
536+
"Partition column must be of type TIMESTAMP or DATETIME")
537+
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
538+
// TimePartitioningType (DAY, MONTH, YEAR) are supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATE, DATETIME
539+
} else if (!isTimestampOrDateOrDateTime) {
540+
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
543541
columnName, fieldSchema.getDisplayName()),
544-
"Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
545-
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
542+
"Partition column must be of type TIMESTAMP, DATE or DATETIME")
543+
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
546544
}
547545
}
548546

src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,34 @@ public void testValidateTimePartitioningColumnWithNullAndDate() throws
129129
Assert.assertEquals(0, collector.getValidationFailures().size());
130130
}
131131

132+
@Test
133+
public void testValidateTimePartitioningColumnWithMonthAndDateTime() throws
134+
InvocationTargetException, IllegalAccessException {
135+
136+
String columnName = "partitionFrom";
137+
Schema schema = Schema.of(Schema.LogicalType.DATETIME);
138+
139+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
140+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.MONTH;
141+
142+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
143+
Assert.assertEquals(0, collector.getValidationFailures().size());
144+
}
145+
146+
@Test
147+
public void testValidateTimePartitioningColumnWithHourAndDateTime() throws
148+
InvocationTargetException, IllegalAccessException {
149+
150+
String columnName = "partitionFrom";
151+
Schema schema = Schema.of(Schema.LogicalType.DATETIME);
152+
153+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
154+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR;
155+
156+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
157+
Assert.assertEquals(0, collector.getValidationFailures().size());
158+
}
159+
132160
@Test
133161
public void testValidateColumnNameWithValidColumnName() {
134162
String columnName = "test";

0 commit comments

Comments
 (0)