From 1881005c7a5c8f02eb26966a86bb3165b6494e21 Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Wed, 6 Nov 2024 11:26:52 -0500 Subject: [PATCH] Update ClickHouseWriter.java --- .../connect/sink/db/ClickHouseWriter.java | 62 ------------------- 1 file changed, 62 deletions(-) diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 40deff79..87e27b81 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -206,64 +206,6 @@ public void doInsert(List records, QueryIdentifier queryId, ErrorReporte } } - protected boolean validateDataSchema(Table table, Record record, boolean onlyFieldsName) { - boolean validSchema = true; - for (Column col : table.getRootColumnsList()) { - String colName = col.getName(); - Type type = col.getType(); - boolean isNullable = col.isNullable(); - boolean hasDefault = col.hasDefault(); - if (!isNullable && !hasDefault) { - Map schemaMap = record.getFields().stream().collect(Collectors.toMap(Field::name, Field::schema)); - var objSchema = schemaMap.get(colName); - Data obj = record.getJsonMap().get(colName); - if (obj == null) { - validSchema = false; - LOGGER.error(String.format("Table column name [%s] was not found.", colName)); - } else if (!onlyFieldsName) { - String colTypeName = type.name(); - String dataTypeName = obj.getFieldType().getName().toUpperCase(); - // TODO: make extra validation for Map/Array type - LOGGER.debug(String.format("Column type name [%s] and data type name [%s]", colTypeName, dataTypeName)); - switch (colTypeName) { - case "Date": - case "Date32": - case "DateTime": - case "DateTime64": - case "UUID": - case "FIXED_STRING": - case "Enum8": - case "Enum16": - break;//I notice we just break here, rather than actually validate the type - default: - if (!colTypeName.equals(dataTypeName)) { - LOGGER.debug("Data schema name: {}", objSchema.name()); - - if (colTypeName.equals("STRING") && dataTypeName.equals("BYTES")) - continue; - - if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT")) - continue; - - if (colTypeName.equalsIgnoreCase("UINT8") - || colTypeName.equalsIgnoreCase("UINT16") - || colTypeName.equalsIgnoreCase("UINT32") - || colTypeName.equalsIgnoreCase("UINT64")) - continue; - - if (("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal"))) - continue; - - validSchema = false; - LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName)); - } - } - } - } - } - return validSchema; - } - protected void doWriteDates(Type type, OutputStream stream, Data value, int precision, String columnName) throws IOException { // TODO: develop more specific tests to have better coverage if (value.getObject() == null) { @@ -692,8 +634,6 @@ protected void doInsertRawBinaryV2(List records, Table table, QueryIdent Record first = records.get(0); String database = first.getDatabase(); - if (!validateDataSchema(table, first, false)) - throw new RuntimeException("Data schema validation failed."); // Let's test first record // Do we have all elements from the table inside the record @@ -746,8 +686,6 @@ protected void doInsertRawBinaryV1(List records, Table table, QueryIdent Record first = records.get(0); String database = first.getDatabase(); - if (!validateDataSchema(table, first, false)) - throw new RuntimeException("Data schema validation failed."); // Let's test first record // Do we have all elements from the table inside the record