Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions presto-clp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main-base</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;

import java.io.File;
import java.io.FileOutputStream;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiFunction;

import static com.facebook.presto.plugin.clp.ClpConfig.SplitFilterProviderType;
import static com.facebook.presto.plugin.clp.ClpConfig.SplitProviderType;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class ClpQueryRunner
{
Expand All @@ -43,7 +52,6 @@ public static DistributedQueryRunner createQueryRunner(
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
throws Exception
{
log.info("Creating CLP query runner with default session");
return createQueryRunner(
createDefaultSession(),
metadataDbUrl,
Expand All @@ -54,6 +62,29 @@ public static DistributedQueryRunner createQueryRunner(
externalWorkerLauncher);
}

public static DistributedQueryRunner createQueryRunner(
String metadataDbUrl,
String metadataDbUser,
String metadataDbPassword,
String metadataDbTablePrefix,
Optional<String> splitFilterProvider,
Optional<String> splitFilterConfigPath,
Optional<Integer> workerCount,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
throws Exception
{
return createQueryRunner(
createDefaultSession(),
metadataDbUrl,
metadataDbUser,
metadataDbPassword,
metadataDbTablePrefix,
splitFilterProvider,
splitFilterConfigPath,
workerCount,
externalWorkerLauncher);
}

public static DistributedQueryRunner createQueryRunner(
Session session,
String metadataDbUrl,
Expand All @@ -63,25 +94,98 @@ public static DistributedQueryRunner createQueryRunner(
Optional<Integer> workerCount,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
throws Exception
{
return createQueryRunner(
session,
metadataDbUrl,
metadataDbUser,
metadataDbPassword,
metadataDbTablePrefix,
Optional.empty(),
Optional.empty(),
workerCount,
externalWorkerLauncher);
}

public static DistributedQueryRunner createQueryRunner(
Session session,
String metadataDbUrl,
String metadataDbUser,
String metadataDbPassword,
String metadataDbTablePrefix,
Optional<String> splitFilterProvider,
Optional<String> splitFilterConfigPath,
Optional<Integer> workerCount,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
throws Exception
{
DistributedQueryRunner clpQueryRunner = DistributedQueryRunner.builder(session)
.setNodeCount(workerCount.orElse(DEFAULT_NUM_OF_WORKERS))
.setExternalWorkerLauncher(externalWorkerLauncher)
.build();
Map<String, String> clpProperties = ImmutableMap.<String, String>builder()
.setNodeCount(workerCount.orElse(DEFAULT_NUM_OF_WORKERS))
.setExternalWorkerLauncher(externalWorkerLauncher)
.build();
ImmutableMap.Builder<String, String> clpProperties = ImmutableMap.<String, String>builder()
.put("clp.metadata-provider-type", "mysql")
.put("clp.metadata-db-url", metadataDbUrl)
.put("clp.metadata-db-user", metadataDbUser)
.put("clp.metadata-db-password", metadataDbPassword)
.put("clp.metadata-table-prefix", metadataDbTablePrefix)
.put("clp.split-provider-type", "mysql")
.build();
.put("clp.split-provider-type", SplitProviderType.MYSQL.name())
.put("clp.split-filter-provider-type", splitFilterProvider.orElse(SplitFilterProviderType.MYSQL.name()));
splitFilterConfigPath.ifPresent(s -> clpProperties.put("clp.split-filter-config", s));

Map<String, String> clpPropertiesMap = clpProperties.build();
log.info("Creating query runner with clp properties:");
for (Map.Entry<String, String> entry : clpPropertiesMap.entrySet()) {
log.info("\t%s=%s", entry.getKey(), entry.getValue());
}
clpQueryRunner.installPlugin(new ClpPlugin());
clpQueryRunner.createCatalog(CLP_CATALOG, CLP_CONNECTOR, clpProperties);
clpQueryRunner.createCatalog(CLP_CATALOG, CLP_CONNECTOR, clpPropertiesMap);
return clpQueryRunner;
}

/**
* Create a config file with the given literal string of the configuration.
*
* @param configuration the given literal string of the configuration, which will be written into the config file
* @return the string of the config file path
*/
public static String createConfigFile(String configuration)
{
UUID uuid = UUID.randomUUID();
File file = new File(System.getProperty("java.io.tmpdir"), format("config-%s", uuid));
try {
boolean fileCreated = file.createNewFile();
assertTrue(fileCreated);
}
catch (Exception e) {
fail(e.getMessage());
}

if (!file.exists() || !file.canWrite()) {
fail("Cannot create split filter config file");
}
try (FileOutputStream fos = new FileOutputStream(file)) {
fos.write(configuration.getBytes(UTF_8));
}
catch (Exception e) {
fail(e.getMessage());
}
return file.getAbsolutePath();
}

/**
* Delete a config file by the given path.
*
* @param configFilePath the path of the config file
*/
public static void deleteConfigFile(String configFilePath)
{
File splitFilterConfigFile = new File(configFilePath);
if (splitFilterConfigFile.exists()) {
assertTrue(splitFilterConfigFile.delete());
}
}

/**
* Creates a default mock session for query use.
*
Expand Down
Loading
Loading