Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .github/workflows/pre-release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Release

on:
workflow_dispatch:

permissions:
contents: write

jobs:
build_release:
name: build_release
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set env
run: echo "RELEASE_VERSION=$(cat VERSION)" >> $GITHUB_ENV
- name: Test
run: |
echo $RELEASE_VERSION
echo ${{ env.RELEASE_VERSION }}
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'adopt'
architecture: x64
- name: Setup and execute Gradle 'createConfluentArchive' task
uses: gradle/gradle-build-action@v2
with:
arguments: createConfluentArchive
gradle-version: '7.4.2'
- name: release
uses: actions/create-release@v1
id: create_release
with:
draft: false
prerelease: true
release_name: ${{ env.RELEASE_VERSION }}
tag_name: ${{ env.RELEASE_VERSION }}
env:
GITHUB_TOKEN: ${{ github.token }}
- name: upload release artifact
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./build/confluent/clickhouse-kafka-connect-${{ env.RELEASE_VERSION }}.zip
asset_name: clickhouse-kafka-connect-${{ env.RELEASE_VERSION }}.zip
asset_content_type: application/zip
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 1.2.4
* Adjusting underlying client version to 0.7.0
* Bugfix for UINT handling

# 1.2.3
* Tweaking schema validation to allow for UINT

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.2.3
v1.2.4
4 changes: 3 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ java {
buildscript {
repositories {
mavenCentral()
maven{ url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
}
}

Expand All @@ -44,10 +45,11 @@ repositories {
mavenCentral()
maven("https://packages.confluent.io/maven/")
maven("https://jitpack.io")
maven{ url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
}

extra.apply {
set("clickHouseDriverVersion", "0.6.3")
set("clickHouseDriverVersion", "0.7.0-SNAPSHOT")
set("kafkaVersion", "2.7.0")
set("avroVersion", "1.9.2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -244,16 +245,10 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie
if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT"))
continue;

if (colTypeName.equalsIgnoreCase("UINT8") && dataTypeName.equals("INT8"))
continue;

if (colTypeName.equalsIgnoreCase("UINT16") && dataTypeName.equals("INT16"))
continue;

if (colTypeName.equalsIgnoreCase("UINT32") && dataTypeName.equals("INT32"))
continue;

if (colTypeName.equalsIgnoreCase("UINT64") && dataTypeName.equals("INT64"))
if (colTypeName.equalsIgnoreCase("UINT8")
|| colTypeName.equalsIgnoreCase("UINT16")
|| colTypeName.equalsIgnoreCase("UINT32")
|| colTypeName.equalsIgnoreCase("UINT64"))
continue;

if (("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal")))
Expand Down Expand Up @@ -713,15 +708,15 @@ protected void doInsertRawBinaryV2(List<Record> records, Table table, QueryIdent
insertSettings.setQueryId(queryId.getQueryId());

for (String clickhouseSetting : csc.getClickhouseSettings().keySet()) {//THIS ASSUMES YOU DON'T ADD insert_deduplication_token
insertSettings.setOption(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
insertSettings.serverSetting(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
}
// insertSettings.setOption(ClickHouseClientOption.WRITE_BUFFER_SIZE.name(), 8192);

ByteArrayOutputStream stream = new ByteArrayOutputStream();
for (Record record : records) {
if (record.getSinkRecord().value() != null) {
for (Column col : table.getRootColumnsList()) {
System.out.println("Writing column: " + col.getName());
LOGGER.debug("Writing column: {}", col.getName());
long beforePushStream = System.currentTimeMillis();
doWriteCol(record, col, stream, supportDefaults);
pushStreamTime += System.currentTimeMillis() - beforePushStream;
Expand Down Expand Up @@ -897,7 +892,7 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
insertSettings.setQueryId(queryId.getQueryId());

for (String clickhouseSetting : csc.getClickhouseSettings().keySet()) {//THIS ASSUMES YOU DON'T ADD insert_deduplication_token
insertSettings.setOption(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
insertSettings.serverSetting(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
}
//insertSettings.setOption(ClickHouseClientOption.WRITE_BUFFER_SIZE.name(), 8192);

Expand Down Expand Up @@ -1032,7 +1027,7 @@ protected void doInsertStringV2(List<Record> records, Table table, QueryIdentifi
insertSettings.setQueryId(queryId.getQueryId());

for (String clickhouseSetting : csc.getClickhouseSettings().keySet()) {//THIS ASSUMES YOU DON'T ADD insert_deduplication_token
insertSettings.setOption(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
insertSettings.serverSetting(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
}
// insertSettings.setOption(ClickHouseClientOption.WRITE_BUFFER_SIZE.name(), 8192);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public Table describeTableV2(String database, String tableName) {
Table table = new Table(database, tableName);
try {
QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.JSONEachRow);
settings.setOption("describe_include_subcolumns", true);
settings.serverSetting("describe_include_subcolumns", "1");
settings.setDatabase(database);
QueryResponse queryResponse = client.query(describeQuery, settings).get();
try (BufferedReader br = new BufferedReader(new InputStreamReader(queryResponse.getInputStream()))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.kafka.connect.sink.SinkRecord;
import org.json.JSONObject;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
Expand Down Expand Up @@ -528,6 +529,7 @@ public void supportEnumTest() {
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
}

@Disabled("Disabled because it requires a flag on the instance.")
@Test
@SinceClickHouseVersion("24.1")
public void schemaWithTupleOfMapsWithVariantTest() {
Expand Down Expand Up @@ -586,10 +588,14 @@ public void schemaWithTupleOfMapsWithVariantTest() {
}
}

@Disabled("Disabled because it requires a flag on the instance.")
@Test
@SinceClickHouseVersion("24.1")
public void schemaWithNestedTupleMapArrayAndVariant() {
Assumptions.assumeFalse(isCloud, "Skip test since experimental is not available in cloud");
if (isCloud) {
LOGGER.warn("Skip test since experimental is not available in cloud");
return;
}
Map<String, String> props = createProps();
ClickHouseHelperClient chc = createClient(props);
String topic = "nested-tuple-map-array-and-variant-table-test";
Expand Down
Loading