Skip to content

Commit 1d2e583

Browse files
committed
Fix timpe-partion datetime validation
1 parent adb5a6d commit 1d2e583

File tree

3 files changed

+52
-15
lines changed

3 files changed

+52
-15
lines changed

docs/BigQueryTable-batchsink.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ is ignored if the table already exists.
110110
**Time Partitioning Type**: Specifies the time partitioning type. Can either be Daily or Hourly or Monthly or Yearly.
111111
Default is Daily. Ignored when table already exists
112112

113+
> The table below shows the compatibility of different time schema types with various time partitioning types in BigQuery.
114+
115+
| Schema Type / Partion Type | Hourly | Daily | Monthly | Yearly |
116+
|-------------------------| ------- | ------- | ------- | ------- |
117+
| TIMESTAMP_MILLIS | ✓ | ✓ | ✓ | ✓ |
118+
| TIMESTAMP_MICROS | ✓ | ✓ | ✓ | ✓ |
119+
| DATETIME | ✓ | ✓ | ✓ | ✓ |
120+
| DATE | ✗ | ✓ | ✓ | ✓ |
121+
| TIME_MILLIS | ✗ | ✗ | ✗ | ✗ |
122+
| TIME_MICROS | ✗ | ✗ | ✗ | ✗ |
123+
113124
**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
114125
exist already, and partitioning type is set to Integer.
115126
* The start value is inclusive.

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -506,23 +506,21 @@ private void validateTimePartitioningColumn(String columnName, FailureCollector
506506

507507
boolean isTimestamp = logicalType == LogicalType.TIMESTAMP_MICROS || logicalType == LogicalType.TIMESTAMP_MILLIS;
508508
boolean isDate = logicalType == LogicalType.DATE;
509-
boolean isTimestampOrDate = isTimestamp || isDate;
510-
511-
// If timePartitioningType is HOUR, then logicalType cannot be DATE Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS
512-
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestamp) {
513-
collector.addFailure(
514-
String.format("Partition column '%s' is of invalid type '%s'.",
509+
boolean isDateTime = logicalType == LogicalType.DATETIME;
510+
boolean isTimestampOrDateOrDateTime = isTimestamp || isDate || isDateTime;
511+
boolean isTimestampOrDateTime = isTimestamp || isDateTime;
512+
// TimePartitioningType HOUR is supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATETIME
513+
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestampOrDateTime) {
514+
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
515515
columnName, fieldSchema.getDisplayName()),
516-
"Partition column must be a timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
517-
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
518-
519-
// For any other timePartitioningType (DAY, MONTH, YEAR) logicalType can be DATE, TIMESTAMP_MICROS, TIMESTAMP_MILLIS
520-
} else if (!isTimestampOrDate) {
521-
collector.addFailure(
522-
String.format("Partition column '%s' is of invalid type '%s'.",
516+
"Partition column must be of type TIMESTAMP or DATETIME")
517+
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
518+
// TimePartitioningType (DAY, MONTH, YEAR) are supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATE, DATETIME
519+
} else if (!isTimestampOrDateOrDateTime) {
520+
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
523521
columnName, fieldSchema.getDisplayName()),
524-
"Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
525-
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
522+
"Partition column must be of type TIMESTAMP, DATE or DATETIME")
523+
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
526524
}
527525
}
528526

src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,34 @@ public void testValidateTimePartitioningColumnWithNullAndDate() throws
129129
Assert.assertEquals(0, collector.getValidationFailures().size());
130130
}
131131

132+
@Test
133+
public void testValidateTimePartitioningColumnWithMonthAndDateTime() throws
134+
InvocationTargetException, IllegalAccessException {
135+
136+
String columnName = "partitionFrom";
137+
Schema schema = Schema.of(Schema.LogicalType.DATETIME);
138+
139+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
140+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.MONTH;
141+
142+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
143+
Assert.assertEquals(0, collector.getValidationFailures().size());
144+
}
145+
146+
@Test
147+
public void testValidateTimePartitioningColumnWithHourAndDateTime() throws
148+
InvocationTargetException, IllegalAccessException {
149+
150+
String columnName = "partitionFrom";
151+
Schema schema = Schema.of(Schema.LogicalType.DATETIME);
152+
153+
Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
154+
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR;
155+
156+
validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
157+
Assert.assertEquals(0, collector.getValidationFailures().size());
158+
}
159+
132160
@Test
133161
public void testValidateColumnNameWithValidColumnName() {
134162
String columnName = "test";

0 commit comments

Comments
 (0)