Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ repositories {
mavenCentral()
maven("https://packages.confluent.io/maven/")
maven("https://jitpack.io")
maven{ url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
}

extra.apply {
set("clickHouseDriverVersion", "0.7.0-SNAPSHOT")
set("clickHouseDriverVersion", "0.7.1")
set("kafkaVersion", "2.7.0")
set("avroVersion", "1.9.2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class ClickHouseTestHelpers {
public static final String HTTPS_PORT = "8443";
public static final String DATABASE_DEFAULT = "default";
public static final String USERNAME_DEFAULT = "default";

private static final int CLOUD_TIMEOUT_VALUE = 900;
private static final TimeUnit CLOUD_TIMEOUT_UNIT = TimeUnit.SECONDS;

public static String getClickhouseVersion() {
String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION");
if (clickHouseVersion == null) {
Expand All @@ -57,16 +61,38 @@ public static void query(ClickHouseHelperClient chc, String query) {
chc.queryV1(query);
}
}
public static void dropTable(ClickHouseHelperClient chc, String tableName) {

public static OperationMetrics dropTable(ClickHouseHelperClient chc, String tableName) {
for (int i = 0; i < 5; i++) {
try {
OperationMetrics operationMetrics = dropTableLoop(chc, tableName);
if (operationMetrics != null) {
return operationMetrics;
}
} catch (Exception e) {
LOGGER.error("Error while sleeping", e);
}

try {
Thread.sleep(30000);//Sleep for 30 seconds
} catch (InterruptedException e) {
LOGGER.error("Error while sleeping", e);
}
}

return null;
}
private static OperationMetrics dropTableLoop(ClickHouseHelperClient chc, String tableName) {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName);
try {
chc.getClient().queryRecords(dropTable).get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException | TimeoutException e) {
return chc.getClient().queryRecords(dropTable).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT).getMetrics();
} catch (Exception e) {
throw new RuntimeException(e);
}
}



public static OperationMetrics createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) {
LOGGER.info("Creating table: {}, Query: {}", tableName, createTableQuery);
OperationMetrics operationMetrics = createTable(chc, tableName, createTableQuery, new HashMap<>());
Expand All @@ -81,18 +107,38 @@ public static OperationMetrics createTable(ClickHouseHelperClient chc, String ta
}

public static OperationMetrics createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery, Map<String, Serializable> clientSettings) {
for (int i = 0; i < 5; i++) {
try {
OperationMetrics operationMetrics = createTableLoop(chc, tableName, createTableQuery, clientSettings);
if (operationMetrics != null) {
return operationMetrics;
}
} catch (Exception e) {
LOGGER.error("Error while sleeping", e);
}

try {
Thread.sleep(30000);//Sleep for 30 seconds
} catch (InterruptedException e) {
LOGGER.error("Error while sleeping", e);
}
}

return null;
}
private static OperationMetrics createTableLoop(ClickHouseHelperClient chc, String tableName, String createTableQuery, Map<String, Serializable> clientSettings) {
final String createTableQueryTmp = String.format(createTableQuery, tableName);
QuerySettings settings = new QuerySettings();
for (Map.Entry<String, Serializable> entry : clientSettings.entrySet()) {
settings.setOption(entry.getKey(), entry.getValue());
}
try {
Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(120, java.util.concurrent.TimeUnit.SECONDS);
return records.getMetrics();
return chc.getClient().queryRecords(createTableQueryTmp, settings).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT).getMetrics();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static List<JSONObject> getAllRowsAsJson(ClickHouseHelperClient chc, String tableName) {
String query = String.format("SELECT * FROM `%s`", tableName);
QuerySettings querySettings = new QuerySettings();
Expand All @@ -114,29 +160,26 @@ public static List<JSONObject> getAllRowsAsJson(ClickHouseHelperClient chc, Stri
} catch (IOException e) {
throw new RuntimeException(e);
}
// try (ClickHouseClient client = ClickHouseClient.builder()
// .options(chc.getDefaultClientOptions())
// .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
// .build();
// ClickHouseResponse response = client.read(chc.getServer())
// .query(query)
// .format(ClickHouseFormat.JSONEachRow)
// .executeAndWait()) {
//
// return StreamSupport.stream(response.records().spliterator(), false)
// .map(record -> record.getValue(0).asString())
// .map(JSONObject::new)
// .collect(Collectors.toList());
// } catch (ClickHouseException e) {
// throw new RuntimeException(e);
// }
}


public static OperationMetrics optimizeTable(ClickHouseHelperClient chc, String tableName) {
String queryCount = String.format("OPTIMIZE TABLE `%s`", tableName);

try {
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
return records.getMetrics();
} catch (Exception e) {
return null;
}
}

public static int countRows(ClickHouseHelperClient chc, String tableName) {
String queryCount = String.format("SELECT COUNT(*) FROM `%s`", tableName);

try {
Records records = chc.getClient().queryRecords(queryCount).get(120, TimeUnit.SECONDS);
optimizeTable(chc, tableName);
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
// Note we probrbly need asInteger() here
String value = records.iterator().next().getString(1);
return Integer.parseInt(value);
Expand All @@ -152,25 +195,21 @@ public static int countRows(ClickHouseHelperClient chc, String tableName) {
public static int sumRows(ClickHouseHelperClient chc, String tableName, String column) {
String queryCount = String.format("SELECT SUM(`%s`) FROM `%s`", column, tableName);
try {
Records records = chc.getClient().queryRecords(queryCount).get();
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
String value = records.iterator().next().getString(1);
return (int)(Float.parseFloat(value));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static int countRowsWithEmojis(ClickHouseHelperClient chc, String tableName) {
String queryCount = "SELECT COUNT(*) FROM `" + tableName + "` WHERE str LIKE '%\uD83D\uDE00%'";
try {
Records records = chc.getClient().queryRecords(queryCount).get();
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
String value = records.iterator().next().getString(1);
return (int)(Float.parseFloat(value));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Expand All @@ -180,7 +219,7 @@ public static boolean validateRows(ClickHouseHelperClient chc, String topic, Col
try {
QuerySettings querySettings = new QuerySettings();
querySettings.setFormat(ClickHouseFormat.JSONStringsEachRow);
QueryResponse queryResponse = chc.getClient().query(String.format("SELECT * FROM `%s`", topic), querySettings).get();
QueryResponse queryResponse = chc.getClient().query(String.format("SELECT * FROM `%s`", topic), querySettings).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
Gson gson = new Gson();

List<String> records = new ArrayList<>();
Expand Down Expand Up @@ -218,11 +257,7 @@ public static boolean validateRows(ClickHouseHelperClient chc, String topic, Col
}

LOGGER.info("Match? {}", match);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (IOException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}

Expand Down
Loading