From fa4220e84b11c122f6e20e87fe681d3cd61968e5 Mon Sep 17 00:00:00 2001 From: jennychen Date: Mon, 2 Sep 2024 16:11:56 +0800 Subject: [PATCH 1/3] [ISSUE-154] Support Array[Tuple] clickhouse type --- .../converter/ClickHouseConverterUtils.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java index 0c79c00..081a1b9 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java @@ -21,7 +21,9 @@ import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.ArrayType; @@ -34,6 +36,8 @@ import com.clickhouse.data.value.UnsignedLong; import com.clickhouse.data.value.UnsignedShort; +import org.apache.flink.table.types.logical.RowType; + import java.math.BigDecimal; import java.math.BigInteger; import java.net.InetAddress; @@ -46,7 +50,9 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -119,9 +125,19 @@ public static Object toExternal(Object value, LogicalType type) { toExternal(valueGetter.getElementOrNull(valueArrayData, i), valueType)); } return objectMap; - case MULTISET: case ROW: + List result = new ArrayList<>(); + for (int i = 0; i < ((RowData) value).getArity(); i++) { + result.add( + toExternal( + RowData.createFieldGetter( + ((RowType) type).getTypeAt(i), i) + .getFieldOrNull((RowData) value), + ((RowType) type).getTypeAt(i))); + } + return result; case RAW: + case MULTISET: default: throw new UnsupportedOperationException("Unsupported type:" + type); } @@ -209,6 +225,12 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept } return new GenericMapData(internalMap); case ROW: + List row = (List) value; + GenericRowData rowData = new GenericRowData(row.size()); + for (int i = 0; i < row.size(); i++) { + rowData.setField(i, toInternal(row.get(i), type.getChildren().get(i))); + } + return rowData; case MULTISET: case RAW: default: From 995a6cb86a9d3f399f488c2499526264bb4cf044 Mon Sep 17 00:00:00 2001 From: jennychen Date: Tue, 3 Sep 2024 19:55:18 +0800 Subject: [PATCH 2/3] [ISSUE-154] Support Array[Tuple] clickhouse type --- .../internal/converter/ClickHouseConverterUtils.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java index 081a1b9..7219c2e 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java @@ -30,14 +30,13 @@ import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; import com.clickhouse.data.value.UnsignedByte; import com.clickhouse.data.value.UnsignedInteger; import com.clickhouse.data.value.UnsignedLong; import com.clickhouse.data.value.UnsignedShort; -import org.apache.flink.table.types.logical.RowType; - import java.math.BigDecimal; import java.math.BigInteger; import java.net.InetAddress; @@ -130,8 +129,7 @@ public static Object toExternal(Object value, LogicalType type) { for (int i = 0; i < ((RowData) value).getArity(); i++) { result.add( toExternal( - RowData.createFieldGetter( - ((RowType) type).getTypeAt(i), i) + RowData.createFieldGetter(((RowType) type).getTypeAt(i), i) .getFieldOrNull((RowData) value), ((RowType) type).getTypeAt(i))); } From 27839df452be726be5adddf4ae279af01aa8f34c Mon Sep 17 00:00:00 2001 From: jennychen Date: Wed, 11 Sep 2024 20:56:15 +0800 Subject: [PATCH 3/3] [ISSUE-154] Support Array[Tuple] clickhouse type for clickhousecatalog --- .../flink/connector/clickhouse/util/DataTypeUtil.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java index 57dc58d..1ba24b7 100644 --- a/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java +++ b/flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java @@ -17,6 +17,7 @@ package org.apache.flink.connector.clickhouse.util; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.types.DataType; @@ -25,6 +26,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.flink.table.types.logical.DecimalType.MAX_PRECISION; @@ -110,6 +112,12 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) { toFlinkType(clickHouseColumnInfo.getKeyInfo()), toFlinkType(clickHouseColumnInfo.getValueInfo())); case Tuple: + return DataTypes.ROW( + clickHouseColumnInfo.getNestedColumns().stream() + .map((col) -> new Tuple2<>(col, toFlinkType(col))) + .map(tuple -> DataTypes.FIELD(tuple.f0.getColumnName(), tuple.f1)) + .collect(Collectors.toList())); + case Nested: case AggregateFunction: default: