Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ repositories {
mavenCentral()
maven("https://packages.confluent.io/maven/")
maven("https://jitpack.io")
maven{ url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
}

extra.apply {
set("clickHouseDriverVersion", "0.7.0-SNAPSHOT")
set("clickHouseDriverVersion", "0.7.1")
set("kafkaVersion", "2.7.0")
set("avroVersion", "1.9.2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ClickHouseTestHelpers {
public static final String HTTPS_PORT = "8443";
public static final String DATABASE_DEFAULT = "default";
public static final String USERNAME_DEFAULT = "default";

private static final int CLOUD_TIMEOUT_VALUE = 600;
public static String getClickhouseVersion() {
String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION");
if (clickHouseVersion == null) {
Expand All @@ -60,7 +62,7 @@ public static void query(ClickHouseHelperClient chc, String query) {
public static void dropTable(ClickHouseHelperClient chc, String tableName) {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName);
try {
chc.getClient().queryRecords(dropTable).get(10, TimeUnit.SECONDS);
chc.getClient().queryRecords(dropTable).get(CLOUD_TIMEOUT_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException | TimeoutException e) {
Expand All @@ -87,7 +89,7 @@ public static OperationMetrics createTable(ClickHouseHelperClient chc, String ta
settings.setOption(entry.getKey(), entry.getValue());
}
try {
Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(120, java.util.concurrent.TimeUnit.SECONDS);
Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(CLOUD_TIMEOUT_VALUE, java.util.concurrent.TimeUnit.SECONDS);
return records.getMetrics();
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -114,29 +116,26 @@ public static List<JSONObject> getAllRowsAsJson(ClickHouseHelperClient chc, Stri
} catch (IOException e) {
throw new RuntimeException(e);
}
// try (ClickHouseClient client = ClickHouseClient.builder()
// .options(chc.getDefaultClientOptions())
// .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
// .build();
// ClickHouseResponse response = client.read(chc.getServer())
// .query(query)
// .format(ClickHouseFormat.JSONEachRow)
// .executeAndWait()) {
//
// return StreamSupport.stream(response.records().spliterator(), false)
// .map(record -> record.getValue(0).asString())
// .map(JSONObject::new)
// .collect(Collectors.toList());
// } catch (ClickHouseException e) {
// throw new RuntimeException(e);
// }
}


public static OperationMetrics optimizeTable(ClickHouseHelperClient chc, String tableName) {
String queryCount = String.format("OPTIMIZE TABLE `%s`", tableName);

try {
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, TimeUnit.SECONDS);
return records.getMetrics();
} catch (Exception e) {
return null;
}
}

public static int countRows(ClickHouseHelperClient chc, String tableName) {
String queryCount = String.format("SELECT COUNT(*) FROM `%s`", tableName);

try {
Records records = chc.getClient().queryRecords(queryCount).get(120, TimeUnit.SECONDS);
optimizeTable(chc, tableName);
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, TimeUnit.SECONDS);
// Note we probrbly need asInteger() here
String value = records.iterator().next().getString(1);
return Integer.parseInt(value);
Expand All @@ -152,25 +151,21 @@ public static int countRows(ClickHouseHelperClient chc, String tableName) {
public static int sumRows(ClickHouseHelperClient chc, String tableName, String column) {
String queryCount = String.format("SELECT SUM(`%s`) FROM `%s`", column, tableName);
try {
Records records = chc.getClient().queryRecords(queryCount).get();
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, TimeUnit.SECONDS);
String value = records.iterator().next().getString(1);
return (int)(Float.parseFloat(value));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static int countRowsWithEmojis(ClickHouseHelperClient chc, String tableName) {
String queryCount = "SELECT COUNT(*) FROM `" + tableName + "` WHERE str LIKE '%\uD83D\uDE00%'";
try {
Records records = chc.getClient().queryRecords(queryCount).get();
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, TimeUnit.SECONDS);
String value = records.iterator().next().getString(1);
return (int)(Float.parseFloat(value));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand All @@ -180,7 +175,7 @@ public static boolean validateRows(ClickHouseHelperClient chc, String topic, Col
try {
QuerySettings querySettings = new QuerySettings();
querySettings.setFormat(ClickHouseFormat.JSONStringsEachRow);
QueryResponse queryResponse = chc.getClient().query(String.format("SELECT * FROM `%s`", topic), querySettings).get();
QueryResponse queryResponse = chc.getClient().query(String.format("SELECT * FROM `%s`", topic), querySettings).get(CLOUD_TIMEOUT_VALUE, TimeUnit.SECONDS);
Gson gson = new Gson();

List<String> records = new ArrayList<>();
Expand Down Expand Up @@ -218,11 +213,7 @@ public static boolean validateRows(ClickHouseHelperClient chc, String topic, Col
}

LOGGER.info("Match? {}", match);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (IOException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}

Expand Down
Loading