Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,64 +206,6 @@ public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporte
}
}

protected boolean validateDataSchema(Table table, Record record, boolean onlyFieldsName) {
boolean validSchema = true;
for (Column col : table.getRootColumnsList()) {
String colName = col.getName();
Type type = col.getType();
boolean isNullable = col.isNullable();
boolean hasDefault = col.hasDefault();
if (!isNullable && !hasDefault) {
Map<String, Schema> schemaMap = record.getFields().stream().collect(Collectors.toMap(Field::name, Field::schema));
var objSchema = schemaMap.get(colName);
Data obj = record.getJsonMap().get(colName);
if (obj == null) {
validSchema = false;
LOGGER.error(String.format("Table column name [%s] was not found.", colName));
} else if (!onlyFieldsName) {
String colTypeName = type.name();
String dataTypeName = obj.getFieldType().getName().toUpperCase();
// TODO: make extra validation for Map/Array type
LOGGER.debug(String.format("Column type name [%s] and data type name [%s]", colTypeName, dataTypeName));
switch (colTypeName) {
case "Date":
case "Date32":
case "DateTime":
case "DateTime64":
case "UUID":
case "FIXED_STRING":
case "Enum8":
case "Enum16":
break;//I notice we just break here, rather than actually validate the type
default:
if (!colTypeName.equals(dataTypeName)) {
LOGGER.debug("Data schema name: {}", objSchema.name());

if (colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))
continue;

if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT"))
continue;

if (colTypeName.equalsIgnoreCase("UINT8")
|| colTypeName.equalsIgnoreCase("UINT16")
|| colTypeName.equalsIgnoreCase("UINT32")
|| colTypeName.equalsIgnoreCase("UINT64"))
continue;

if (("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal")))
continue;

validSchema = false;
LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName));
}
}
}
}
}
return validSchema;
}

protected void doWriteDates(Type type, OutputStream stream, Data value, int precision, String columnName) throws IOException {
// TODO: develop more specific tests to have better coverage
if (value.getObject() == null) {
Expand Down Expand Up @@ -692,8 +634,6 @@ protected void doInsertRawBinaryV2(List<Record> records, Table table, QueryIdent
Record first = records.get(0);
String database = first.getDatabase();

if (!validateDataSchema(table, first, false))
throw new RuntimeException("Data schema validation failed.");
// Let's test first record
// Do we have all elements from the table inside the record

Expand Down Expand Up @@ -746,8 +686,6 @@ protected void doInsertRawBinaryV1(List<Record> records, Table table, QueryIdent
Record first = records.get(0);
String database = first.getDatabase();

if (!validateDataSchema(table, first, false))
throw new RuntimeException("Data schema validation failed.");
// Let's test first record
// Do we have all elements from the table inside the record

Expand Down
Loading