Skip to content

Commit 5d3abd8

Browse files
authored
feat: introduce SchemaCollector (#391)
SchemaCollector can be used to produce a CalciteSchema from a Substrait Rel BREAKING CHANGE: LookupCalciteSchema has been removed
1 parent f749cf8 commit 5d3abd8

File tree

9 files changed

+303
-117
lines changed

9 files changed

+303
-117
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package io.substrait.isthmus;
2+
3+
import io.substrait.isthmus.calcite.SubstraitSchema;
4+
import io.substrait.isthmus.calcite.SubstraitTable;
5+
import io.substrait.relation.NamedScan;
6+
import io.substrait.relation.Rel;
7+
import io.substrait.relation.RelCopyOnWriteVisitor;
8+
import io.substrait.type.NamedStruct;
9+
import java.util.HashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Optional;
13+
import org.apache.calcite.jdbc.CalciteSchema;
14+
import org.apache.calcite.rel.type.RelDataType;
15+
import org.apache.calcite.rel.type.RelDataTypeFactory;
16+
17+
/** For use in generating {@link CalciteSchema}s from Substrait {@link Rel}s */
18+
public class SchemaCollector {
19+
20+
private static final boolean CASE_SENSITIVE = false;
21+
22+
private final RelDataTypeFactory typeFactory;
23+
private final TypeConverter typeConverter;
24+
25+
public SchemaCollector(RelDataTypeFactory typeFactory, TypeConverter typeConverter) {
26+
this.typeFactory = typeFactory;
27+
this.typeConverter = typeConverter;
28+
}
29+
30+
public CalciteSchema toSchema(Rel rel) {
31+
// Create the root schema under which all tables and schemas will be nested.
32+
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
33+
34+
for (Map.Entry<List<String>, NamedStruct> entry : TableGatherer.gatherTables(rel).entrySet()) {
35+
List<String> names = entry.getKey();
36+
NamedStruct namedStruct = entry.getValue();
37+
38+
// The last name in names is the table name. All others are schema names.
39+
String tableName = names.get(names.size() - 1);
40+
41+
// Traverse all schemas, creating them if they are not present
42+
CalciteSchema schema = rootSchema;
43+
for (String schemaName : names.subList(0, names.size() - 1)) {
44+
CalciteSchema subSchema = schema.getSubSchema(schemaName, CASE_SENSITIVE);
45+
if (subSchema != null) {
46+
schema = subSchema;
47+
} else {
48+
SubstraitSchema newSubSchema = new SubstraitSchema();
49+
schema = schema.add(schemaName, newSubSchema);
50+
}
51+
}
52+
53+
// Create the table if it is not present
54+
CalciteSchema.TableEntry table = schema.getTable(tableName, CASE_SENSITIVE);
55+
if (table == null) {
56+
RelDataType rowType =
57+
typeConverter.toCalcite(typeFactory, namedStruct.struct(), namedStruct.names());
58+
schema.add(tableName, new SubstraitTable(tableName, rowType));
59+
}
60+
}
61+
62+
return rootSchema;
63+
}
64+
65+
static class TableGatherer extends RelCopyOnWriteVisitor<RuntimeException> {
66+
Map<List<String>, NamedStruct> tableMap;
67+
68+
private TableGatherer() {
69+
super();
70+
this.tableMap = new HashMap<>();
71+
}
72+
73+
/**
74+
* Gathers all tables defined in {@link NamedScan}s under the given {@link Rel}
75+
*
76+
* @param rootRel under which to search for {@link NamedScan}s
77+
* @return a map of qualified table names to their associated Substrait schemas
78+
*/
79+
public static Map<List<String>, NamedStruct> gatherTables(Rel rootRel) {
80+
var visitor = new TableGatherer();
81+
rootRel.accept(visitor);
82+
return visitor.tableMap;
83+
}
84+
85+
@Override
86+
public Optional<Rel> visit(NamedScan namedScan) {
87+
super.visit(namedScan);
88+
89+
List<String> tableName = namedScan.getNames();
90+
if (tableMap.containsKey(tableName)) {
91+
NamedStruct existingSchema = tableMap.get(tableName);
92+
if (!existingSchema.equals(namedScan.getInitialSchema())) {
93+
throw new IllegalArgumentException(
94+
String.format(
95+
"NamedScan for %s is present multiple times with different schemas", tableName));
96+
}
97+
}
98+
tableMap.put(tableName, namedScan.getInitialSchema());
99+
100+
return Optional.empty();
101+
}
102+
}
103+
}

isthmus/src/main/java/io/substrait/isthmus/SqlConverterBase.java

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.substrait.extension.SimpleExtension;
44
import io.substrait.isthmus.calcite.SubstraitOperatorTable;
5+
import io.substrait.isthmus.calcite.SubstraitTable;
56
import java.util.ArrayList;
67
import java.util.List;
78
import org.apache.calcite.config.CalciteConnectionConfig;
@@ -20,7 +21,6 @@
2021
import org.apache.calcite.rel.type.RelDataTypeFactory;
2122
import org.apache.calcite.rex.RexBuilder;
2223
import org.apache.calcite.schema.Schema;
23-
import org.apache.calcite.schema.impl.AbstractTable;
2424
import org.apache.calcite.sql.SqlNode;
2525
import org.apache.calcite.sql.SqlNodeList;
2626
import org.apache.calcite.sql.SqlOperatorTable;
@@ -79,8 +79,8 @@ CalciteCatalogReader registerCreateTables(List<String> tables) throws SqlParseEx
7979
SqlValidator validator = Validator.create(factory, catalogReader, SqlValidator.Config.DEFAULT);
8080
if (tables != null) {
8181
for (String tableDef : tables) {
82-
List<DefinedTable> tList = parseCreateTable(factory, validator, tableDef);
83-
for (DefinedTable t : tList) {
82+
List<SubstraitTable> tList = parseCreateTable(factory, validator, tableDef);
83+
for (SubstraitTable t : tList) {
8484
rootSchema.add(t.getName(), t);
8585
}
8686
}
@@ -97,10 +97,10 @@ CalciteCatalogReader registerSchema(String name, Schema schema) {
9797
return new CalciteCatalogReader(rootSchema, List.of(), factory, config);
9898
}
9999

100-
protected List<DefinedTable> parseCreateTable(
100+
protected List<SubstraitTable> parseCreateTable(
101101
RelDataTypeFactory factory, SqlValidator validator, String sql) throws SqlParseException {
102102
SqlParser parser = SqlParser.create(sql, parserConfig);
103-
List<DefinedTable> definedTableList = new ArrayList<>();
103+
List<SubstraitTable> tableList = new ArrayList<>();
104104

105105
SqlNodeList nodeList = parser.parseStmtList();
106106
for (SqlNode parsed : nodeList) {
@@ -140,12 +140,12 @@ protected List<DefinedTable> parseCreateTable(
140140
columnTypes.add(col.dataType.deriveType(validator));
141141
}
142142

143-
definedTableList.add(
144-
new DefinedTable(
145-
create.name.names.get(0), factory, factory.createStructType(columnTypes, names)));
143+
tableList.add(
144+
new SubstraitTable(
145+
create.name.names.get(0), factory.createStructType(columnTypes, names)));
146146
}
147147

148-
return definedTableList;
148+
return tableList;
149149
}
150150

151151
protected static SqlParseException fail(String text, SqlParserPos pos) {
@@ -173,30 +173,4 @@ public static Validator create(
173173
return new Validator(SubstraitOperatorTable.INSTANCE, validatorCatalog, factory, config);
174174
}
175175
}
176-
177-
/** A fully defined pre-specified table. */
178-
protected static final class DefinedTable extends AbstractTable {
179-
180-
private final String name;
181-
private final RelDataTypeFactory factory;
182-
private final RelDataType type;
183-
184-
public DefinedTable(String name, RelDataTypeFactory factory, RelDataType type) {
185-
this.name = name;
186-
this.factory = factory;
187-
this.type = type;
188-
}
189-
190-
@Override
191-
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
192-
// if (factory != typeFactory) {
193-
// throw new IllegalStateException("Different type factory than previously used.");
194-
// }
195-
return type;
196-
}
197-
198-
public String getName() {
199-
return name;
200-
}
201-
}
202176
}

isthmus/src/main/java/io/substrait/isthmus/SqlExpressionToSubstrait.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.substrait.extendedexpression.ImmutableExpressionReference;
66
import io.substrait.extendedexpression.ImmutableExtendedExpression;
77
import io.substrait.extension.SimpleExtension;
8+
import io.substrait.isthmus.calcite.SubstraitTable;
89
import io.substrait.isthmus.expression.RexExpressionConverter;
910
import io.substrait.isthmus.expression.ScalarFunctionConverter;
1011
import io.substrait.proto.ExtendedExpression;
@@ -145,8 +146,8 @@ private Result registerCreateTablesForExtendedExpression(List<String> tables)
145146
SqlValidator validator = Validator.create(factory, catalogReader, SqlValidator.Config.DEFAULT);
146147
if (tables != null) {
147148
for (String tableDef : tables) {
148-
List<DefinedTable> tList = parseCreateTable(factory, validator, tableDef);
149-
for (DefinedTable t : tList) {
149+
List<SubstraitTable> tList = parseCreateTable(factory, validator, tableDef);
150+
for (SubstraitTable t : tList) {
150151
rootSchema.add(t.getName(), t);
151152
for (RelDataTypeField field : t.getRowType(factory).getFieldList()) {
152153
nameToTypeMap.merge( // to validate the sql expression tree

isthmus/src/main/java/io/substrait/isthmus/SubstraitToCalcite.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,12 @@
1111
import java.util.List;
1212
import java.util.Map;
1313
import java.util.Optional;
14-
import java.util.function.Function;
1514
import org.apache.calcite.jdbc.CalciteSchema;
16-
import org.apache.calcite.jdbc.LookupCalciteSchema;
1715
import org.apache.calcite.rel.RelNode;
1816
import org.apache.calcite.rel.RelRoot;
1917
import org.apache.calcite.rel.type.RelDataType;
2018
import org.apache.calcite.rel.type.RelDataTypeFactory;
2119
import org.apache.calcite.rel.type.RelDataTypeField;
22-
import org.apache.calcite.schema.Table;
2320
import org.apache.calcite.sql.SqlKind;
2421
import org.apache.calcite.tools.Frameworks;
2522
import org.apache.calcite.tools.RelBuilder;
@@ -57,19 +54,8 @@ public SubstraitToCalcite(
5754
* <p>Override this method to customize schema extraction.
5855
*/
5956
protected CalciteSchema toSchema(Rel rel) {
60-
Map<List<String>, NamedStruct> tableMap = NamedStructGatherer.gatherTables(rel);
61-
Function<List<String>, Table> lookup =
62-
id -> {
63-
NamedStruct table = tableMap.get(id);
64-
if (table == null) {
65-
return null;
66-
}
67-
return new SqlConverterBase.DefinedTable(
68-
id.get(id.size() - 1),
69-
typeFactory,
70-
typeConverter.toCalcite(typeFactory, table.struct(), table.names()));
71-
};
72-
return LookupCalciteSchema.createRootSchema(lookup);
57+
SchemaCollector schemaCollector = new SchemaCollector(typeFactory, typeConverter);
58+
return schemaCollector.toSchema(rel);
7359
}
7460

7561
/**
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.substrait.isthmus.calcite;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import org.apache.calcite.schema.Schema;
6+
import org.apache.calcite.schema.Table;
7+
import org.apache.calcite.schema.impl.AbstractSchema;
8+
9+
/** Basic {@link AbstractSchema} implementation */
10+
public class SubstraitSchema extends AbstractSchema {
11+
12+
/** Map of table names to their associated tables */
13+
protected final Map<String, Table> tableMap;
14+
15+
/** Map of schema names to their associated schemas */
16+
protected final Map<String, Schema> schemaMap;
17+
18+
public SubstraitSchema() {
19+
this.tableMap = new HashMap<>();
20+
this.schemaMap = new HashMap<>();
21+
}
22+
23+
public SubstraitSchema(Map<String, Table> tableMap) {
24+
this.tableMap = tableMap;
25+
this.schemaMap = new HashMap<>();
26+
}
27+
28+
@Override
29+
public Map<String, Table> getTableMap() {
30+
return tableMap;
31+
}
32+
33+
@Override
34+
protected Map<String, Schema> getSubSchemaMap() {
35+
return schemaMap;
36+
}
37+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.substrait.isthmus.calcite;
2+
3+
import org.apache.calcite.rel.type.RelDataType;
4+
import org.apache.calcite.rel.type.RelDataTypeFactory;
5+
import org.apache.calcite.schema.impl.AbstractTable;
6+
7+
/** Basic {@link AbstractTable} implementation */
8+
public class SubstraitTable extends AbstractTable {
9+
10+
private final RelDataType rowType;
11+
private final String tableName;
12+
13+
public SubstraitTable(String tableName, RelDataType rowType) {
14+
this.tableName = tableName;
15+
this.rowType = rowType;
16+
}
17+
18+
public String getName() {
19+
return tableName;
20+
}
21+
22+
@Override
23+
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
24+
return rowType;
25+
}
26+
}

isthmus/src/main/java/org/apache/calcite/jdbc/LookupCalciteSchema.java

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)