-
Notifications
You must be signed in to change notification settings - Fork 92
feat: introduce SchemaCollector #391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| package io.substrait.isthmus; | ||
|
|
||
| import io.substrait.isthmus.calcite.SubstraitSchema; | ||
| import io.substrait.isthmus.calcite.SubstraitTable; | ||
| import io.substrait.relation.NamedScan; | ||
| import io.substrait.relation.Rel; | ||
| import io.substrait.relation.RelCopyOnWriteVisitor; | ||
| import io.substrait.type.NamedStruct; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import org.apache.calcite.jdbc.CalciteSchema; | ||
| import org.apache.calcite.rel.type.RelDataType; | ||
| import org.apache.calcite.rel.type.RelDataTypeFactory; | ||
|
|
||
| /** For use in generating {@link CalciteSchema}s from Substrait {@link Rel}s */ | ||
| public class SchemaCollector { | ||
|
|
||
| private static final boolean CASE_SENSITIVE = false; | ||
|
|
||
| private final RelDataTypeFactory typeFactory; | ||
| private final TypeConverter typeConverter; | ||
|
|
||
| public SchemaCollector(RelDataTypeFactory typeFactory, TypeConverter typeConverter) { | ||
| this.typeFactory = typeFactory; | ||
| this.typeConverter = typeConverter; | ||
| } | ||
|
|
||
| public CalciteSchema toSchema(Rel rel) { | ||
| // Create the root schema under which all tables and schemas will be nested. | ||
| CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false); | ||
|
|
||
| for (Map.Entry<List<String>, NamedStruct> entry : TableGatherer.gatherTables(rel).entrySet()) { | ||
| List<String> names = entry.getKey(); | ||
| NamedStruct namedStruct = entry.getValue(); | ||
|
|
||
| // The last name in names is the table name. All others are schema names. | ||
| String tableName = names.get(names.size() - 1); | ||
|
|
||
| // Traverse all schemas, creating them if they are not present | ||
| CalciteSchema schema = rootSchema; | ||
| for (String schemaName : names.subList(0, names.size() - 1)) { | ||
| CalciteSchema subSchema = schema.getSubSchema(schemaName, CASE_SENSITIVE); | ||
| if (subSchema != null) { | ||
| schema = subSchema; | ||
| } else { | ||
| SubstraitSchema newSubSchema = new SubstraitSchema(); | ||
| schema = schema.add(schemaName, newSubSchema); | ||
| } | ||
| } | ||
|
|
||
| // Create the table if it is not present | ||
| CalciteSchema.TableEntry table = schema.getTable(tableName, CASE_SENSITIVE); | ||
| if (table == null) { | ||
| RelDataType rowType = | ||
| typeConverter.toCalcite(typeFactory, namedStruct.struct(), namedStruct.names()); | ||
| schema.add(tableName, new SubstraitTable(tableName, rowType)); | ||
| } | ||
| } | ||
|
|
||
| return rootSchema; | ||
| } | ||
|
|
||
| static class TableGatherer extends RelCopyOnWriteVisitor<RuntimeException> { | ||
| Map<List<String>, NamedStruct> tableMap; | ||
|
|
||
| private TableGatherer() { | ||
| super(); | ||
| this.tableMap = new HashMap<>(); | ||
| } | ||
|
|
||
| /** | ||
| * Gathers all tables defined in {@link NamedScan}s under the given {@link Rel} | ||
| * | ||
| * @param rootRel under which to search for {@link NamedScan}s | ||
| * @return a map of qualified table names to their associated Substrait schemas | ||
| */ | ||
| public static Map<List<String>, NamedStruct> gatherTables(Rel rootRel) { | ||
| var visitor = new TableGatherer(); | ||
| rootRel.accept(visitor); | ||
| return visitor.tableMap; | ||
| } | ||
|
|
||
| @Override | ||
| public Optional<Rel> visit(NamedScan namedScan) { | ||
| super.visit(namedScan); | ||
|
|
||
| List<String> tableName = namedScan.getNames(); | ||
| if (tableMap.containsKey(tableName)) { | ||
| NamedStruct existingSchema = tableMap.get(tableName); | ||
| if (!existingSchema.equals(namedScan.getInitialSchema())) { | ||
| throw new IllegalArgumentException( | ||
| String.format( | ||
| "NamedScan for %s is present multiple times with different schemas", tableName)); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's helpful to guard against this because it can result in some very weird errors. The same table can appear multiple times in a plan (i.e. through joins), and if an external producer applies field trimming and those tables are trimmed to have different fields, unpleasantness can ensue. |
||
| } | ||
| } | ||
| tableMap.put(tableName, namedScan.getInitialSchema()); | ||
|
|
||
| return Optional.empty(); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| package io.substrait.isthmus.calcite; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import org.apache.calcite.schema.Schema; | ||
| import org.apache.calcite.schema.Table; | ||
| import org.apache.calcite.schema.impl.AbstractSchema; | ||
|
|
||
| /** Basic {@link AbstractSchema} implementation */ | ||
| public class SubstraitSchema extends AbstractSchema { | ||
|
|
||
| /** Map of table names to their associated tables */ | ||
| protected final Map<String, Table> tableMap; | ||
|
|
||
| /** Map of schema names to their associated schemas */ | ||
| protected final Map<String, Schema> schemaMap; | ||
|
|
||
| public SubstraitSchema() { | ||
| this.tableMap = new HashMap<>(); | ||
| this.schemaMap = new HashMap<>(); | ||
| } | ||
|
|
||
| public SubstraitSchema(Map<String, Table> tableMap) { | ||
| this.tableMap = tableMap; | ||
| this.schemaMap = new HashMap<>(); | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, Table> getTableMap() { | ||
| return tableMap; | ||
| } | ||
|
|
||
| @Override | ||
| protected Map<String, Schema> getSubSchemaMap() { | ||
| return schemaMap; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| package io.substrait.isthmus.calcite; | ||
|
|
||
| import org.apache.calcite.rel.type.RelDataType; | ||
| import org.apache.calcite.rel.type.RelDataTypeFactory; | ||
| import org.apache.calcite.schema.impl.AbstractTable; | ||
|
|
||
| /** Basic {@link AbstractTable} implementation */ | ||
| public class SubstraitTable extends AbstractTable { | ||
|
|
||
| private final RelDataType rowType; | ||
| private final String tableName; | ||
|
|
||
| public SubstraitTable(String tableName, RelDataType rowType) { | ||
| this.tableName = tableName; | ||
| this.rowType = rowType; | ||
| } | ||
|
|
||
| public String getName() { | ||
| return tableName; | ||
| } | ||
|
|
||
| @Override | ||
| public RelDataType getRowType(RelDataTypeFactory typeFactory) { | ||
| return rowType; | ||
| } | ||
| } |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing LookupCalciteSchema code didn't handle nested schemas. I've added support for it here.