Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions presto-clp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main-base</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;

import java.io.File;
import java.io.FileOutputStream;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiFunction;

import static com.facebook.presto.plugin.clp.ClpConfig.SplitFilterProviderType;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class ClpQueryRunner
{
Expand All @@ -43,7 +50,6 @@ public static DistributedQueryRunner createQueryRunner(
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
throws Exception
{
log.info("Creating CLP query runner with default session");
return createQueryRunner(
createDefaultSession(),
metadataDbUrl,
Expand All @@ -54,6 +60,29 @@ public static DistributedQueryRunner createQueryRunner(
externalWorkerLauncher);
}

public static DistributedQueryRunner createQueryRunner(
String metadataDbUrl,
String metadataDbUser,
String metadataDbPassword,
String metadataDbTablePrefix,
Optional<String> splitFilterProvider,
Optional<String> splitFilterConfigPath,
Optional<Integer> workerCount,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
throws Exception
{
return createQueryRunner(
createDefaultSession(),
metadataDbUrl,
metadataDbUser,
metadataDbPassword,
metadataDbTablePrefix,
splitFilterProvider,
splitFilterConfigPath,
workerCount,
externalWorkerLauncher);
}

public static DistributedQueryRunner createQueryRunner(
Session session,
String metadataDbUrl,
Expand All @@ -63,25 +92,97 @@ public static DistributedQueryRunner createQueryRunner(
Optional<Integer> workerCount,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
throws Exception
{
return createQueryRunner(
session,
metadataDbUrl,
metadataDbUser,
metadataDbPassword,
metadataDbTablePrefix,
Optional.empty(),
Optional.empty(),
workerCount,
externalWorkerLauncher);
}

public static DistributedQueryRunner createQueryRunner(
Session session,
String metadataDbUrl,
String metadataDbUser,
String metadataDbPassword,
String metadataDbTablePrefix,
Optional<String> splitFilterProvider,
Optional<String> splitFilterConfigPath,
Optional<Integer> workerCount,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
throws Exception
{
DistributedQueryRunner clpQueryRunner = DistributedQueryRunner.builder(session)
.setNodeCount(workerCount.orElse(DEFAULT_NUM_OF_WORKERS))
.setExternalWorkerLauncher(externalWorkerLauncher)
.build();
Map<String, String> clpProperties = ImmutableMap.<String, String>builder()
.setNodeCount(workerCount.orElse(DEFAULT_NUM_OF_WORKERS))
.setExternalWorkerLauncher(externalWorkerLauncher)
.build();
ImmutableMap.Builder<String, String> clpProperties = ImmutableMap.<String, String>builder()
.put("clp.metadata-provider-type", "mysql")
.put("clp.metadata-db-url", metadataDbUrl)
.put("clp.metadata-db-user", metadataDbUser)
.put("clp.metadata-db-password", metadataDbPassword)
.put("clp.metadata-table-prefix", metadataDbTablePrefix)
.put("clp.split-provider-type", "mysql")
.build();
.put("clp.split-provider-type", splitFilterProvider.orElse(SplitFilterProviderType.MYSQL.name()));
splitFilterConfigPath.ifPresent(s -> clpProperties.put("clp.split-filter-config", s));

Map<String, String> clpPropertiesMap = clpProperties.build();
log.info("Creating query runner with clp properties:");
for (Map.Entry<String, String> entry : clpPropertiesMap.entrySet()) {
log.info("\t%s=%s", entry.getKey(), entry.getValue());
}
clpQueryRunner.installPlugin(new ClpPlugin());
clpQueryRunner.createCatalog(CLP_CATALOG, CLP_CONNECTOR, clpProperties);
clpQueryRunner.createCatalog(CLP_CATALOG, CLP_CONNECTOR, clpPropertiesMap);
return clpQueryRunner;
}

/**
* Create a config file with the given literal string of the configuration.
*
* @param configuration the given literal string of the configuration, which will be written into the config file
* @return the string of the config file path
*/
public static String createConfigFile(String configuration)
{
UUID uuid = UUID.randomUUID();
File file = new File(format("/tmp/config-%s", uuid));
try {
boolean fileCreated = file.createNewFile();
assertTrue(fileCreated);
}
catch (Exception e) {
fail(e.getMessage());
}

if (!file.exists() || !file.canWrite()) {
fail("Cannot create split filter config file");
}
try (FileOutputStream fos = new FileOutputStream(file)) {
fos.write(configuration.getBytes());
}
catch (Exception e) {
fail(e.getMessage());
}
return file.getAbsolutePath();
}

/**
* Delete a config file by the given path.
*
* @param configFilePath the path of the config file
*/
public static void deleteConfigFile(String configFilePath)
{
File splitFilterConfigFile = new File(configFilePath);
if (splitFilterConfigFile.exists()) {
assertTrue(splitFilterConfigFile.delete());
}
}

/**
* Creates a default mock session for query use.
*
Expand Down
Loading
Loading