Skip to content

Commit 65aefbe

Browse files
committed
feat: introduce SubstraitSqlToCalcite and SubstraitSelectStatementParser
1 parent c5c652e commit 65aefbe

File tree

7 files changed

+164
-121
lines changed

7 files changed

+164
-121
lines changed
Lines changed: 17 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,16 @@
11
package io.substrait.isthmus;
22

3-
import com.google.common.annotations.VisibleForTesting;
43
import io.substrait.extension.ExtensionCollector;
5-
import io.substrait.isthmus.sql.SubstraitSqlValidator;
4+
import io.substrait.isthmus.sql.SubstraitSqlToCalcite;
65
import io.substrait.proto.Plan;
76
import io.substrait.proto.PlanRel;
87
import io.substrait.relation.RelProtoConverter;
98
import java.util.List;
10-
import org.apache.calcite.plan.hep.HepPlanner;
11-
import org.apache.calcite.plan.hep.HepProgram;
129
import org.apache.calcite.prepare.CalciteCatalogReader;
1310
import org.apache.calcite.prepare.Prepare;
1411
import org.apache.calcite.rel.RelRoot;
1512
import org.apache.calcite.schema.Schema;
16-
import org.apache.calcite.sql.SqlNode;
1713
import org.apache.calcite.sql.parser.SqlParseException;
18-
import org.apache.calcite.sql.parser.SqlParser;
19-
import org.apache.calcite.sql.validate.SqlValidator;
20-
import org.apache.calcite.sql2rel.SqlToRelConverter;
21-
import org.apache.calcite.sql2rel.StandardConvertletTable;
2214

2315
/** Take a SQL statement and a set of table definitions and return a substrait plan. */
2416
public class SqlToSubstrait extends SqlConverterBase {
@@ -33,84 +25,36 @@ public SqlToSubstrait(FeatureBoard features) {
3325

3426
public Plan execute(String sql, List<String> tables) throws SqlParseException {
3527
CalciteCatalogReader catalogReader = registerCreateTables(tables);
36-
SqlValidator validator = new SubstraitSqlValidator(catalogReader);
37-
return executeInner(sql, validator, catalogReader);
28+
return executeInner(sql, catalogReader);
3829
}
3930

4031
public Plan execute(String sql, String name, Schema schema) throws SqlParseException {
4132
CalciteCatalogReader catalogReader = registerSchema(name, schema);
42-
SqlValidator validator = new SubstraitSqlValidator(catalogReader);
43-
return executeInner(sql, validator, catalogReader);
33+
return executeInner(sql, catalogReader);
4434
}
4535

4636
public Plan execute(String sql, Prepare.CatalogReader catalogReader) throws SqlParseException {
47-
SqlValidator validator = new SubstraitSqlValidator(catalogReader);
48-
return executeInner(sql, validator, catalogReader);
37+
return executeInner(sql, catalogReader);
4938
}
5039

51-
// Package protected for testing
52-
List<RelRoot> sqlToRelNode(String sql, Prepare.CatalogReader catalogReader)
40+
private Plan executeInner(String sql, Prepare.CatalogReader catalogReader)
5341
throws SqlParseException {
54-
SqlValidator validator = new SubstraitSqlValidator(catalogReader);
55-
return sqlToRelNode(sql, validator, catalogReader);
56-
}
57-
58-
private Plan executeInner(String sql, SqlValidator validator, Prepare.CatalogReader catalogReader)
59-
throws SqlParseException {
60-
var plan = Plan.newBuilder();
6142
ExtensionCollector functionCollector = new ExtensionCollector();
6243
var relProtoConverter = new RelProtoConverter(functionCollector);
44+
45+
List<RelRoot> relRoots = SubstraitSqlToCalcite.convertSelects(sql, catalogReader);
46+
6347
// TODO: consider case in which one sql passes conversion while others don't
64-
sqlToRelNode(sql, validator, catalogReader)
65-
.forEach(
66-
root -> {
67-
plan.addRelations(
68-
PlanRel.newBuilder()
69-
.setRoot(
70-
relProtoConverter.toProto(
71-
SubstraitRelVisitor.convert(
72-
root, EXTENSION_COLLECTION, featureBoard))));
73-
});
48+
Plan.Builder plan = Plan.newBuilder();
49+
relRoots.forEach(
50+
root -> {
51+
plan.addRelations(
52+
PlanRel.newBuilder()
53+
.setRoot(
54+
relProtoConverter.toProto(
55+
SubstraitRelVisitor.convert(root, EXTENSION_COLLECTION, featureBoard))));
56+
});
7457
functionCollector.addExtensionsToPlan(plan);
7558
return plan.build();
7659
}
77-
78-
private List<RelRoot> sqlToRelNode(
79-
String sql, SqlValidator validator, Prepare.CatalogReader catalogReader)
80-
throws SqlParseException {
81-
SqlParser parser = SqlParser.create(sql, parserConfig);
82-
var parsedList = parser.parseStmtList();
83-
SqlToRelConverter converter = createSqlToRelConverter(validator, catalogReader);
84-
List<RelRoot> roots =
85-
parsedList.stream()
86-
.map(parsed -> getBestExpRelRoot(converter, parsed))
87-
.collect(java.util.stream.Collectors.toList());
88-
return roots;
89-
}
90-
91-
@VisibleForTesting
92-
SqlToRelConverter createSqlToRelConverter(
93-
SqlValidator validator, Prepare.CatalogReader catalogReader) {
94-
SqlToRelConverter converter =
95-
new SqlToRelConverter(
96-
null,
97-
validator,
98-
catalogReader,
99-
relOptCluster,
100-
StandardConvertletTable.INSTANCE,
101-
converterConfig);
102-
return converter;
103-
}
104-
105-
@VisibleForTesting
106-
static RelRoot getBestExpRelRoot(SqlToRelConverter converter, SqlNode parsed) {
107-
RelRoot root = converter.convertQuery(parsed, true, true);
108-
{
109-
var program = HepProgram.builder().build();
110-
HepPlanner hepPlanner = new HepPlanner(program);
111-
hepPlanner.setRoot(root.rel);
112-
root = root.withRel(hepPlanner.findBestExp());
113-
}
114-
return root;
115-
}
11660
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.substrait.isthmus.sql;
2+
3+
import java.util.List;
4+
import org.apache.calcite.avatica.util.Casing;
5+
import org.apache.calcite.sql.SqlNode;
6+
import org.apache.calcite.sql.parser.SqlParseException;
7+
import org.apache.calcite.sql.parser.SqlParser;
8+
import org.apache.calcite.sql.validate.SqlConformanceEnum;
9+
10+
/** Utility class for parsing SELECT statements to {@link org.apache.calcite.rel.RelRoot}s */
11+
public class SubstraitSelectStatementParser {
12+
13+
private static final SqlParser.Config PARSER_CONFIG =
14+
SqlParser.config()
15+
// TODO: switch to Casing.UNCHANGED
16+
.withUnquotedCasing(Casing.TO_UPPER)
17+
// use LENIENT conformance to allow for parsing a wide variety of dialects
18+
.withConformance(SqlConformanceEnum.LENIENT);
19+
20+
/** Parse one or more SELECT statements */
21+
public static List<SqlNode> parseSelectStatements(String selectStatements)
22+
throws SqlParseException {
23+
SqlParser parser = SqlParser.create(selectStatements, PARSER_CONFIG);
24+
return parser.parseStmtList();
25+
}
26+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.substrait.isthmus.sql;
2+
3+
import io.substrait.isthmus.SubstraitTypeSystem;
4+
import java.util.List;
5+
import java.util.stream.Collectors;
6+
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
7+
import org.apache.calcite.plan.RelOptCluster;
8+
import org.apache.calcite.plan.RelOptPlanner;
9+
import org.apache.calcite.plan.RelOptTable;
10+
import org.apache.calcite.plan.hep.HepPlanner;
11+
import org.apache.calcite.plan.hep.HepProgram;
12+
import org.apache.calcite.prepare.Prepare;
13+
import org.apache.calcite.rel.RelRoot;
14+
import org.apache.calcite.rex.RexBuilder;
15+
import org.apache.calcite.sql.SqlNode;
16+
import org.apache.calcite.sql.parser.SqlParseException;
17+
import org.apache.calcite.sql2rel.SqlToRelConverter;
18+
import org.apache.calcite.sql2rel.StandardConvertletTable;
19+
20+
public class SubstraitSqlToCalcite {
21+
22+
public static RelRoot convertSelect(String selectStatement, Prepare.CatalogReader catalogReader)
23+
throws SqlParseException {
24+
return convertSelect(selectStatement, catalogReader, createRelOptCluster());
25+
}
26+
27+
public static RelRoot convertSelect(
28+
String selectStatement, Prepare.CatalogReader catalogReader, RelOptCluster cluster)
29+
throws SqlParseException {
30+
List<SqlNode> sqlNodes = SubstraitSelectStatementParser.parseSelectStatements(selectStatement);
31+
if (sqlNodes.size() != 1) {
32+
throw new IllegalArgumentException(
33+
String.format("Expected one SELECT statement, found: %d", sqlNodes.size()));
34+
}
35+
List<RelRoot> relRoots = convert(sqlNodes, catalogReader, cluster);
36+
// as there was only 1 select statement, there should only be 1 root
37+
return relRoots.get(0);
38+
}
39+
40+
public static List<RelRoot> convertSelects(
41+
String selectStatements, Prepare.CatalogReader catalogReader) throws SqlParseException {
42+
return convertSelects(selectStatements, catalogReader, createRelOptCluster());
43+
}
44+
45+
public static List<RelRoot> convertSelects(
46+
String selectStatements, Prepare.CatalogReader catalogReader, RelOptCluster cluster)
47+
throws SqlParseException {
48+
List<SqlNode> sqlNodes = SubstraitSelectStatementParser.parseSelectStatements(selectStatements);
49+
return convert(sqlNodes, catalogReader, cluster);
50+
}
51+
52+
static List<RelRoot> convert(
53+
List<SqlNode> selectStatements, Prepare.CatalogReader catalogReader, RelOptCluster cluster) {
54+
RelOptTable.ViewExpander viewExpander = null;
55+
SqlToRelConverter converter =
56+
new SqlToRelConverter(
57+
viewExpander,
58+
new SubstraitSqlValidator(catalogReader),
59+
catalogReader,
60+
cluster,
61+
StandardConvertletTable.INSTANCE,
62+
SqlToRelConverter.CONFIG);
63+
// apply validation
64+
boolean needsValidation = true;
65+
// query is the root of the tree
66+
boolean top = true;
67+
return selectStatements.stream()
68+
.map(sqlNode -> converter.convertQuery(sqlNode, needsValidation, top))
69+
.collect(Collectors.toList());
70+
}
71+
72+
static RelOptCluster createRelOptCluster() {
73+
RexBuilder rexBuilder =
74+
new RexBuilder(new JavaTypeFactoryImpl(SubstraitTypeSystem.TYPE_SYSTEM));
75+
HepProgram program = HepProgram.builder().build();
76+
RelOptPlanner emptyPlanner = new HepPlanner(program);
77+
return RelOptCluster.create(emptyPlanner, rexBuilder);
78+
}
79+
}

isthmus/src/test/java/io/substrait/isthmus/ApplyJoinPlanTest.java

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,35 @@
11
package io.substrait.isthmus;
22

3-
import io.substrait.isthmus.sql.SubstraitSqlValidator;
3+
import io.substrait.isthmus.sql.SubstraitSqlToCalcite;
4+
import java.util.List;
45
import java.util.Map;
56
import org.apache.calcite.adapter.tpcds.TpcdsSchema;
7+
import org.apache.calcite.config.CalciteConnectionConfig;
8+
import org.apache.calcite.config.CalciteConnectionProperty;
9+
import org.apache.calcite.jdbc.CalciteSchema;
10+
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
611
import org.apache.calcite.prepare.CalciteCatalogReader;
712
import org.apache.calcite.rel.RelRoot;
813
import org.apache.calcite.rex.RexFieldAccess;
914
import org.apache.calcite.sql.parser.SqlParseException;
10-
import org.apache.calcite.sql.parser.SqlParser;
11-
import org.apache.calcite.sql2rel.SqlToRelConverter;
1215
import org.junit.jupiter.api.Assertions;
1316
import org.junit.jupiter.api.Test;
1417

15-
public class ApplyJoinPlanTest {
16-
17-
private static RelRoot getCalcitePlan(SqlToSubstrait s, TpcdsSchema schema, String sql)
18-
throws SqlParseException {
19-
CalciteCatalogReader catalogReader = s.registerSchema("tpcds", schema);
20-
SubstraitSqlValidator validator = new SubstraitSqlValidator(catalogReader);
21-
SqlToRelConverter converter = s.createSqlToRelConverter(validator, catalogReader);
22-
SqlParser parser = SqlParser.create(sql, s.parserConfig);
23-
return s.getBestExpRelRoot(converter, parser.parseQuery());
18+
public class ApplyJoinPlanTest extends PlanTestBase {
19+
static CalciteCatalogReader TPCDS_CATALOG;
20+
21+
static {
22+
TpcdsSchema tpcdsSchema = new TpcdsSchema(1.0);
23+
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false);
24+
rootSchema.add("tpcds", tpcdsSchema);
25+
26+
TPCDS_CATALOG =
27+
new CalciteCatalogReader(
28+
rootSchema,
29+
List.of("tpcds"),
30+
new JavaTypeFactoryImpl(SubstraitTypeSystem.TYPE_SYSTEM),
31+
CalciteConnectionConfig.DEFAULT.set(
32+
CalciteConnectionProperty.CASE_SENSITIVE, Boolean.FALSE.toString()));
2433
}
2534

2635
private static void validateOuterRef(
@@ -62,7 +71,7 @@ public void lateralJoinQuery() throws SqlParseException {
6271
*/
6372

6473
// validate outer reference map
65-
RelRoot root = getCalcitePlan(new SqlToSubstrait(), schema, sql);
74+
RelRoot root = SubstraitSqlToCalcite.convertSelect(sql, TPCDS_CATALOG);
6675
Map<RexFieldAccess, Integer> fieldAccessDepthMap = buildOuterFieldRefMap(root);
6776
Assertions.assertEquals(1, fieldAccessDepthMap.size());
6877
validateOuterRef(fieldAccessDepthMap, "$cor0", "SS_ITEM_SK", 1);
@@ -77,26 +86,23 @@ public void lateralJoinQuery() throws SqlParseException {
7786

7887
@Test
7988
public void outerApplyQuery() throws SqlParseException {
80-
TpcdsSchema schema = new TpcdsSchema(1.0);
8189
String sql;
8290
sql =
8391
"""
8492
SELECT ss_sold_date_sk, ss_item_sk, ss_customer_sk
8593
FROM store_sales OUTER APPLY
8694
(select i_item_sk from item where item.i_item_sk = store_sales.ss_item_sk)""";
87-
88-
FeatureBoard featureBoard = ImmutableFeatureBoard.builder().build();
89-
SqlToSubstrait s = new SqlToSubstrait(featureBoard);
90-
RelRoot root = getCalcitePlan(s, schema, sql);
95+
RelRoot root = SubstraitSqlToCalcite.convertSelect(sql, TPCDS_CATALOG);
9196

9297
Map<RexFieldAccess, Integer> fieldAccessDepthMap = buildOuterFieldRefMap(root);
9398
Assertions.assertEquals(1, fieldAccessDepthMap.size());
9499
validateOuterRef(fieldAccessDepthMap, "$cor0", "SS_ITEM_SK", 1);
95100

96101
// TODO validate end to end conversion
102+
SqlToSubstrait s = new SqlToSubstrait();
97103
Assertions.assertThrows(
98104
UnsupportedOperationException.class,
99-
() -> s.execute(sql, "tpcds", schema),
105+
() -> s.execute(sql, TPCDS_CATALOG),
100106
"APPLY is not supported");
101107
}
102108

@@ -127,9 +133,7 @@ public void nestedApplyJoinQuery() throws SqlParseException {
127133
LogicalFilter(condition=[AND(=($4, $cor0.I_ITEM_SK), =($4, $cor2.SS_ITEM_SK))])
128134
LogicalTableScan(table=[[tpcds, PROMOTION]])
129135
*/
130-
FeatureBoard featureBoard = ImmutableFeatureBoard.builder().build();
131-
SqlToSubstrait s = new SqlToSubstrait(featureBoard);
132-
RelRoot root = getCalcitePlan(s, schema, sql);
136+
RelRoot root = SubstraitSqlToCalcite.convertSelect(sql, TPCDS_CATALOG);
133137

134138
Map<RexFieldAccess, Integer> fieldAccessDepthMap = buildOuterFieldRefMap(root);
135139
Assertions.assertEquals(3, fieldAccessDepthMap.size());
@@ -138,29 +142,28 @@ public void nestedApplyJoinQuery() throws SqlParseException {
138142
validateOuterRef(fieldAccessDepthMap, "$cor0", "I_ITEM_SK", 1);
139143

140144
// TODO validate end to end conversion
145+
SqlToSubstrait s = new SqlToSubstrait();
141146
Assertions.assertThrows(
142147
UnsupportedOperationException.class,
143-
() -> s.execute(sql, "tpcds", schema),
148+
() -> s.execute(sql, TPCDS_CATALOG),
144149
"APPLY is not supported");
145150
}
146151

147152
@Test
148-
public void crossApplyQuery() throws SqlParseException {
149-
TpcdsSchema schema = new TpcdsSchema(1.0);
153+
public void crossApplyQuery() {
150154
String sql;
151155
sql =
152156
"""
153157
SELECT ss_sold_date_sk, ss_item_sk, ss_customer_sk
154158
FROM store_sales CROSS APPLY
155159
(select i_item_sk from item where item.i_item_sk = store_sales.ss_item_sk)""";
156160

157-
FeatureBoard featureBoard = ImmutableFeatureBoard.builder().build();
158-
SqlToSubstrait s = new SqlToSubstrait(featureBoard);
161+
SqlToSubstrait s = new SqlToSubstrait();
159162

160163
// TODO validate end to end conversion
161164
Assertions.assertThrows(
162165
UnsupportedOperationException.class,
163-
() -> s.execute(sql, "tpcds", schema),
166+
() -> s.execute(sql, TPCDS_CATALOG),
164167
"APPLY is not supported");
165168
}
166169
}

isthmus/src/test/java/io/substrait/isthmus/NameRoundtripTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static org.junit.jupiter.api.Assertions.assertEquals;
55

66
import io.substrait.isthmus.sql.SubstraitCreateStatementParser;
7+
import io.substrait.isthmus.sql.SubstraitSqlToCalcite;
78
import io.substrait.plan.Plan;
89
import io.substrait.relation.NamedScan;
910
import java.util.List;
@@ -26,10 +27,8 @@ void preserveNamesFromSql() throws Exception {
2627
""";
2728
List<String> expectedNames = List.of("a", "B");
2829

29-
List<org.apache.calcite.rel.RelRoot> calciteRelRoots = s.sqlToRelNode(query, catalogReader);
30-
assertEquals(1, calciteRelRoots.size());
31-
32-
org.apache.calcite.rel.RelRoot calciteRelRoot1 = calciteRelRoots.get(0);
30+
org.apache.calcite.rel.RelRoot calciteRelRoot1 =
31+
SubstraitSqlToCalcite.convertSelect(query, catalogReader);
3332
assertEquals(expectedNames, calciteRelRoot1.validatedRowType.getFieldNames());
3433

3534
io.substrait.plan.Plan.Root substraitRelRoot =

0 commit comments

Comments
 (0)