Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,12 @@ public static boolean clpWildcardBoolColumn()
{
throw new UnsupportedOperationException("CLP_WILDCARD_BOOL_COLUMN is a placeholder function without implementation.");
}

@ScalarFunction(value = "CLP_GET_JSON_STRING", deterministic = false)
@Description("Converts an entire log record into a JSON string.")
@SqlType(StandardTypes.VARCHAR)
public static Slice clpGetJSONString()
{
throw new UnsupportedOperationException("CLP_GET_JSON_STRING is a placeholder function without implementation.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
public final class ClpUdfRewriter
implements ConnectorPlanOptimizer
{
public static final String JSON_STRING_PLACEHOLDER = "__json_string";
private final FunctionMetadataManager functionManager;

public ClpUdfRewriter(FunctionMetadataManager functionManager)
Expand Down Expand Up @@ -123,7 +124,7 @@ public PlanNode visitProject(ProjectNode node, RewriteContext<Void> context)
for (Map.Entry<VariableReferenceExpression, RowExpression> entry : node.getAssignments().getMap().entrySet()) {
newAssignments.put(
entry.getKey(),
rewriteClpUdfs(entry.getValue(), functionManager, variableAllocator));
rewriteClpUdfs(entry.getValue(), functionManager, variableAllocator, true));
}

PlanNode newSource = rewritePlanSubtree(node.getSource());
Expand All @@ -148,20 +149,32 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
* @param expression the input expression to analyze and possibly rewrite
* @param functionManager function manager used to resolve function metadata
* @param variableAllocator variable allocator used to create new variable references
* @param inProjectNode whether the CLP UDFs are in a {@link ProjectNode}
* @return a possibly rewritten {@link RowExpression} with <code>CLP_GET_*</code> calls
* replaced
*/
private RowExpression rewriteClpUdfs(
RowExpression expression,
FunctionMetadataManager functionManager,
VariableAllocator variableAllocator)
VariableAllocator variableAllocator,
boolean inProjectNode)
{
// Handle CLP_GET_* function calls
if (expression instanceof CallExpression) {
CallExpression call = (CallExpression) expression;
String functionName = functionManager.getFunctionMetadata(call.getFunctionHandle()).getName().getObjectName().toUpperCase();

if (functionName.startsWith("CLP_GET_")) {
if (inProjectNode && functionName.equals("CLP_GET_JSON_STRING")) {
VariableReferenceExpression newValue = variableAllocator.newVariable(
expression.getSourceLocation(),
JSON_STRING_PLACEHOLDER,
call.getType());
ClpColumnHandle targetHandle = new ClpColumnHandle(JSON_STRING_PLACEHOLDER, call.getType());

globalColumnVarMap.put(targetHandle, newValue);
return newValue;
}
else if (functionName.startsWith("CLP_GET_")) {
if (call.getArguments().size() != 1 || !(call.getArguments().get(0) instanceof ConstantExpression)) {
throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION,
"CLP_GET_* UDF must have a single constant string argument");
Expand All @@ -187,7 +200,7 @@ private RowExpression rewriteClpUdfs(

// Recurse into arguments
List<RowExpression> rewrittenArgs = call.getArguments().stream()
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator))
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator, inProjectNode))
.collect(toImmutableList());

return new CallExpression(call.getDisplayName(), call.getFunctionHandle(), call.getType(), rewrittenArgs);
Expand All @@ -198,7 +211,7 @@ private RowExpression rewriteClpUdfs(
SpecialFormExpression special = (SpecialFormExpression) expression;

List<RowExpression> rewrittenArgs = special.getArguments().stream()
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator))
.map(arg -> rewriteClpUdfs(arg, functionManager, variableAllocator, inProjectNode))
.collect(toImmutableList());

return new SpecialFormExpression(special.getSourceLocation(), special.getForm(), special.getType(), rewrittenArgs);
Expand Down Expand Up @@ -298,7 +311,7 @@ private TableScanNode buildNewTableScanNode(TableScanNode node)
*/
private FilterNode buildNewFilterNode(FilterNode node)
{
RowExpression newPredicate = rewriteClpUdfs(node.getPredicate(), functionManager, variableAllocator);
RowExpression newPredicate = rewriteClpUdfs(node.getPredicate(), functionManager, variableAllocator, false);
PlanNode newSource = rewritePlanSubtree(node.getSource());
return new FilterNode(node.getSourceLocation(), idAllocator.getNextId(), newSource, newPredicate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Float;
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer;
import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString;
import static com.facebook.presto.plugin.clp.optimization.ClpUdfRewriter.JSON_STRING_PLACEHOLDER;
import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH;
import static com.facebook.presto.sql.planner.assertions.MatchResult.match;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
Expand Down Expand Up @@ -140,7 +141,7 @@ public void tearDown()
}

@Test
public void testScanFilter()
public void testClpGetScanFilter()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
Expand Down Expand Up @@ -179,7 +180,7 @@ public void testScanFilter()
}

@Test
public void testScanProject()
public void testClpGetScanProject()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
Expand Down Expand Up @@ -229,7 +230,7 @@ public void testScanProject()
}

@Test
public void testScanProjectFilter()
public void testClpGetScanProjectFilter()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();
Expand Down Expand Up @@ -265,6 +266,38 @@ public void testScanProjectFilter()
city))))));
}

@Test
public void testClpGetJsonString()
{
TransactionId transactionId = localQueryRunner.getTransactionManager().beginTransaction(false);
Session session = testSessionBuilder().setCatalog("clp").setSchema("default").setTransactionId(transactionId).build();

Plan plan = localQueryRunner.createPlan(
session,
"SELECT CLP_GET_JSON_STRING() from test WHERE CLP_GET_BIGINT('user_id') = 0",
WarningCollector.NOOP);
ClpUdfRewriter udfRewriter = new ClpUdfRewriter(functionAndTypeManager);
PlanNode optimizedPlan = udfRewriter.optimize(plan.getRoot(), session.toConnectorSession(), variableAllocator, planNodeIdAllocator);
ClpComputePushDown optimizer = new ClpComputePushDown(functionAndTypeManager, functionResolution, splitFilterProvider);
optimizedPlan = optimizer.optimize(optimizedPlan, session.toConnectorSession(), variableAllocator, planNodeIdAllocator);

PlanAssert.assertPlan(
session,
localQueryRunner.getMetadata(),
(node, sourceStats, lookup, s, types) -> PlanNodeStatsEstimate.unknown(),
new Plan(optimizedPlan, plan.getTypes(), StatsAndCosts.empty()),
anyTree(
project(
ImmutableMap.of(
"clp_get_json_string",
PlanMatchPattern.expression(JSON_STRING_PLACEHOLDER)),
ClpTableScanMatcher.clpTableScanPattern(
new ClpTableLayoutHandle(table, Optional.of("user_id: 0"), Optional.empty()),
ImmutableSet.of(
new ClpColumnHandle("user_id", BIGINT),
new ClpColumnHandle(JSON_STRING_PLACEHOLDER, VARCHAR))))));
}

private static final class ClpTableScanMatcher
implements Matcher
{
Expand Down
Loading