diff --git a/dingo-calcite/build.gradle b/dingo-calcite/build.gradle index 347a5a752d..a50b446799 100644 --- a/dingo-calcite/build.gradle +++ b/dingo-calcite/build.gradle @@ -40,6 +40,8 @@ dependencies { implementation group: 'io.dingodb.expr', name: 'dingo-expr-coding', version: 'dingo-expr'.v() implementation group: 'io.dingodb.expr', name: 'dingo-expr-common', version: 'dingo-expr'.v() implementation group: 'io.dropwizard.metrics', name: 'metrics-core', version: 'metrics-core'.v() + implementation group: 'org.codehaus.janino', name:'janino', version: '3.1.8' + implementation group: 'org.codehaus.janino', name:'commons-compiler', version: '3.1.8' api group: 'io.dingodb.expr', name: 'dingo-expr-parser', version: 'dingo-expr'.v() api group: 'io.dingodb.expr', name: 'dingo-expr-rel', version: 'dingo-expr'.v() diff --git a/dingo-calcite/src/main/codegen/templates/Parser.jj b/dingo-calcite/src/main/codegen/templates/Parser.jj index 96de23e850..002560b9a8 100644 --- a/dingo-calcite/src/main/codegen/templates/Parser.jj +++ b/dingo-calcite/src/main/codegen/templates/Parser.jj @@ -3815,14 +3815,15 @@ SqlNodeList WithList() : { final Span s; final List list = new ArrayList(); + boolean recursive = false; } { - { s = span(); } - AddWithItem(list) ( AddWithItem(list) )* + [ { recursive = true; } ] { s = span(); } + AddWithItem(list, SqlLiteral.createBoolean(recursive, getPos())) ( AddWithItem(list, SqlLiteral.createBoolean(recursive, getPos())) )* { return new SqlNodeList(list, s.end(this)); } } -void AddWithItem(List list) : +void AddWithItem(List list, SqlLiteral recursive) : { final SqlIdentifier id; final SqlNodeList columnList; @@ -3833,7 +3834,7 @@ void AddWithItem(List list) : ( columnList = ParenthesizedSimpleIdentifierList() | { columnList = null; } ) definition = ParenthesizedExpression(ExprContext.ACCEPT_QUERY) - { list.add(new SqlWithItem(id.getParserPosition(), id, columnList, definition)); } + { list.add(new SqlWithItem(id.getParserPosition(), id, columnList, definition, recursive)); } } /** diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/DingoParser.java b/dingo-calcite/src/main/java/io/dingodb/calcite/DingoParser.java index 5675371e44..02ced489f6 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/DingoParser.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/DingoParser.java @@ -414,6 +414,11 @@ public RelNode optimize(RelNode relNode) { List rules = DingoRules.rules(); ImmutableList.Builder builder = ImmutableList.builder(); builder.addAll(rules); + + builder.addAll(DingoRules.ABSTRACT_RELATIONAL_RULES); + builder.addAll(DingoRules.ABSTRACT_RULES); + builder.addAll(DingoRules.BASE_RULES); + if (!context.getConfig().topDownOpt()) { // This is needed for `IterativeRuleDriver`. builder.add(AbstractConverter.ExpandConversionRule.INSTANCE); diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/meta/DingoCostModelV1.java b/dingo-calcite/src/main/java/io/dingodb/calcite/meta/DingoCostModelV1.java index f015c00313..a6526643a9 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/meta/DingoCostModelV1.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/meta/DingoCostModelV1.java @@ -61,7 +61,9 @@ public static synchronized DingoCostModelV1 getCostModel() { public static double getScanAvgRowSize(LogicalDingoTableScan tableScan) { DingoTable dingoTable = tableScan.getTable().unwrap(DingoTable.class); - assert dingoTable != null; + if (dingoTable == null) { + return 0; + } String schemaName = dingoTable.getNames().get(1); //List selectionCdList = getSelectionCdList(tableScan, dingoTable); return getAvgRowSize( diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoRel.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoRel.java index cb6029374f..641be74376 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoRel.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoRel.java @@ -20,6 +20,7 @@ import io.dingodb.calcite.traits.DingoRelStreamingDef; import io.dingodb.calcite.visitor.DingoRelVisitor; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.PhysicalNode; import org.apache.calcite.rel.RelNode; import org.apache.calcite.util.Pair; @@ -31,6 +32,10 @@ public interface DingoRel extends PhysicalNode { static DingoRel dingo(RelNode rel) { + if (rel instanceof RelSubset) { + RelSubset relSubset = (RelSubset) rel; + return (DingoRel) relSubset.getBest(); + } return (DingoRel) rel; } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoRepeatUnion.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoRepeatUnion.java new file mode 100644 index 0000000000..94c5efbbab --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoRepeatUnion.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rel; + +import io.dingodb.calcite.visitor.DingoRelVisitor; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.RepeatUnion; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; + +public class DingoRepeatUnion extends RepeatUnion implements DingoRel { + public DingoRepeatUnion( + RelOptCluster cluster, RelTraitSet traitSet, RelNode seed, RelNode iterative, boolean all, + int iterationLimit, @Nullable RelOptTable transientTable) { + super(cluster, traitSet, seed, iterative, all, iterationLimit, transientTable); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new DingoRepeatUnion(getCluster(), traitSet, + getSeedRel(), getIterativeRel(), all, iterationLimit, transientTable); + } + + @Override + public T accept(@NonNull DingoRelVisitor visitor) { + return visitor.visit(this); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoTableSpool.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoTableSpool.java new file mode 100644 index 0000000000..d6ff592375 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoTableSpool.java @@ -0,0 +1,69 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rel; + +import io.dingodb.calcite.traits.DingoConvention; +import io.dingodb.calcite.visitor.DingoRelVisitor; +import io.dingodb.calcite.visitor.function.DingoTableSpoolVisitFun; +import lombok.Getter; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Spool; +import org.apache.calcite.rel.core.TableSpool; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.schema.impl.ListTransientTable; +import org.checkerframework.checker.nullness.qual.NonNull; + +public class DingoTableSpool extends TableSpool implements DingoRel { + @Getter + private ListTransientTable listTransientTable; + + public DingoTableSpool( + RelOptCluster cluster, RelTraitSet traitSet, RelNode input, Type readType, + Type writeType, RelOptTable table, ListTransientTable listTransientTable + ) { + super(cluster, traitSet, input, readType, writeType, table); + this.listTransientTable = listTransientTable; + } + + public static DingoTableSpool create(RelNode input, Type readType, + Type writeType, RelOptTable table) { + RelOptCluster cluster = input.getCluster(); + RelMetadataQuery mq = cluster.getMetadataQuery(); + RelTraitSet traitSet = cluster.traitSetOf(DingoConvention.INSTANCE) + .replaceIfs(RelCollationTraitDef.INSTANCE, + () -> mq.collations(input)) + .replaceIf(RelDistributionTraitDef.INSTANCE, + () -> mq.distribution(input)); + ListTransientTable tmp = table.unwrap(ListTransientTable.class); + return new DingoTableSpool(cluster, traitSet, input, readType, writeType, table, tmp); + } + + @Override + public T accept(@NonNull DingoRelVisitor visitor) { + return visitor.visit(this); + } + + @Override + protected Spool copy(RelTraitSet relTraitSet, RelNode relNode, Type type, Type type1) { + return null; + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoTransientTableScan.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoTransientTableScan.java new file mode 100644 index 0000000000..16dede58db --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoTransientTableScan.java @@ -0,0 +1,49 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rel; + +import io.dingodb.calcite.visitor.DingoRelVisitor; +import io.dingodb.common.type.TupleMapping; +import lombok.Getter; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.impl.ListTransientTable; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; + +public class DingoTransientTableScan extends LogicalDingoTableScan implements DingoRel { + @Getter + ListTransientTable transientTable; + + public DingoTransientTableScan( + RelOptCluster cluster, RelTraitSet traitSet, List hints, RelOptTable table, + @Nullable RexNode filter, @Nullable TupleMapping selection, ListTransientTable transientTable + ) { + super(cluster, traitSet, hints, table, filter, selection); + this.transientTable = transientTable; + } + + @Override + public T accept(@NonNull DingoRelVisitor visitor) { + return visitor.visit(this); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoWindow.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoWindow.java new file mode 100644 index 0000000000..888499c5f3 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/DingoWindow.java @@ -0,0 +1,80 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rel; + +import io.dingodb.calcite.visitor.DingoRelVisitor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.RexToLixTranslator; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.lang.reflect.Type; +import java.util.List; + +public class DingoWindow extends Window implements DingoRel { + + public DingoWindow( + RelOptCluster cluster, RelTraitSet traitSet, List hints, RelNode input, + List constants, RelDataType rowType, List groups + ) { + super(cluster, traitSet, hints, input, constants, rowType, groups); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new DingoWindow(this.getCluster(), traitSet, this.hints, sole(inputs), constants, rowType, groups); + } + + public static class WindowRelInputGetter + implements RexToLixTranslator.InputGetter { + private final Expression row; + private final PhysType rowPhysType; + private final int actualInputFieldCount; + private final List constants; + + public WindowRelInputGetter(Expression row, + PhysType rowPhysType, int actualInputFieldCount, + List constants) { + this.row = row; + this.rowPhysType = rowPhysType; + this.actualInputFieldCount = actualInputFieldCount; + this.constants = constants; + } + + @Override public Expression field(BlockBuilder list, int index, Type storageType) { + if (index < actualInputFieldCount) { + Expression current = list.append("current", row); + return rowPhysType.fieldReference(current, index, storageType); + } + return constants.get(index - actualInputFieldCount); + } + } + + + @Override + public T accept(@NonNull DingoRelVisitor visitor) { + return visitor.visit(this); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/LogicalDingoTableScan.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/LogicalDingoTableScan.java index e5b062e0c6..5950a6598c 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/LogicalDingoTableScan.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/LogicalDingoTableScan.java @@ -124,11 +124,10 @@ public LogicalDingoTableScan( this.groupSets = groupSets; this.pushDown = pushDown; DingoTable dingoTable = table.unwrap(DingoTable.class); - assert dingoTable != null; this.realSelection = selection; this.forDml = forDml; // If the columns of the table contain hide and delete, the data shows that they need to be deleted - if (selection != null) { + if (selection != null && dingoTable != null) { int fieldCount = dingoTable.getTable().getColumns().size(); if (forDml) { int[] mappingTmp = selection.getMappings(); @@ -149,7 +148,7 @@ public LogicalDingoTableScan( } this.selection = TupleMapping.of(mappingList); } - } else { + } else if (dingoTable != null) { List mapping = dingoTable.getTable() .getColumns() .stream() diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoStreamingConverter.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoStreamingConverter.java index 9987d24c18..e015f459fb 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoStreamingConverter.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rel/dingo/DingoStreamingConverter.java @@ -57,8 +57,9 @@ public double estimateRowCount(@NonNull RelMetadataQuery mq) { Set partitions = getStreaming().getPartitions(); Set inputPartitions = inputStreaming.getPartitions(); this.rowCount = rowCount; - assert partitions != null && inputPartitions != null; - if (partitions.size() > inputPartitions.size()) { + if (partitions == null || inputPartitions == null) { + return rowCount; + } else if (partitions.size() > inputPartitions.size()) { for (int i = 0; i < partitions.size() - inputPartitions.size(); ++i) { rowCount /= 1.000001d; } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRules.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRules.java index b97aaf731b..e5bf2bcb4b 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRules.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoRules.java @@ -18,9 +18,15 @@ import com.google.common.collect.ImmutableList; import io.dingodb.calcite.rule.dingo.DingoPhysicalRules; +import io.dingodb.calcite.rule.dingo.DingoRepeatUnionRule; +import io.dingodb.calcite.rule.dingo.DingoWindowRule; import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.volcano.AbstractConverter; import org.apache.calcite.rel.core.Correlate; import org.apache.calcite.rel.rules.CoreRules; +import org.apache.calcite.rel.rules.DateRangeRules; +import org.apache.calcite.rel.rules.JoinPushThroughJoinRule; +import org.apache.calcite.rel.rules.PruneEmptyRules; import java.util.List; @@ -193,6 +199,83 @@ public final class DingoRules { public static final DingoFilterReduceExpressionsRule FILTER_REDUCE_EXPRESSIONS_RULE = DingoFilterReduceExpressionsRule.Config.DEFAULT.toRule(); + public static final DingoWindowRule DINGO_WINDOW_RULE = + DingoWindowRule.DEFAULT.toRule(DingoWindowRule.class); + + public static final DingoRepeatUnionRule DINGO_REPEAT_UNION_RULE = + DingoRepeatUnionRule.DEFAULT.toRule(DingoRepeatUnionRule.class); + + public static final DingoTableSpoolRule DINGO_TABLE_SPOOL_RULE = DingoTableSpoolRule.DEFAULT_CONFIG + .toRule(DingoTableSpoolRule.class); + + public static final DingoTableScanForSpoolRule DINGO_TABLE_SCAN_FOR_SPOOL_RULE + = DingoTableScanForSpoolRule.DEFAULT.toRule(DingoTableScanForSpoolRule.class); + + public static final List BASE_RULES = ImmutableList.of( + CoreRules.AGGREGATE_STAR_TABLE, + CoreRules.AGGREGATE_PROJECT_STAR_TABLE, + CoreRules.FILTER_SCAN, + CoreRules.PROJECT_FILTER_TRANSPOSE, + CoreRules.FILTER_PROJECT_TRANSPOSE, + CoreRules.FILTER_INTO_JOIN, + CoreRules.JOIN_PUSH_EXPRESSIONS, + CoreRules.AGGREGATE_EXPAND_WITHIN_DISTINCT, + CoreRules.AGGREGATE_CASE_TO_FILTER, + CoreRules.AGGREGATE_REDUCE_FUNCTIONS, + CoreRules.FILTER_AGGREGATE_TRANSPOSE, + CoreRules.PROJECT_WINDOW_TRANSPOSE, + CoreRules.MATCH, + CoreRules.JOIN_COMMUTE, + JoinPushThroughJoinRule.RIGHT, + JoinPushThroughJoinRule.LEFT, + //CoreRules.SORT_PROJECT_TRANSPOSE, + //CoreRules.SORT_JOIN_TRANSPOSE, + //CoreRules.SORT_REMOVE_CONSTANT_KEYS, + //CoreRules.SORT_UNION_TRANSPOSE, + CoreRules.EXCHANGE_REMOVE_CONSTANT_KEYS, + CoreRules.SORT_EXCHANGE_REMOVE_CONSTANT_KEYS); + + public static final List ABSTRACT_RELATIONAL_RULES = ImmutableList.of( + CoreRules.FILTER_INTO_JOIN, + CoreRules.JOIN_CONDITION_PUSH, + AbstractConverter.ExpandConversionRule.INSTANCE, + CoreRules.JOIN_COMMUTE, + //CoreRules.PROJECT_TO_SEMI_JOIN, + //CoreRules.JOIN_ON_UNIQUE_TO_SEMI_JOIN, + //CoreRules.JOIN_TO_SEMI_JOIN, + //CoreRules.AGGREGATE_REMOVE, + CoreRules.UNION_TO_DISTINCT, + //CoreRules.PROJECT_REMOVE, + CoreRules.PROJECT_AGGREGATE_MERGE, + CoreRules.AGGREGATE_JOIN_TRANSPOSE, + CoreRules.AGGREGATE_MERGE, + //CoreRules.AGGREGATE_PROJECT_MERGE, + CoreRules.CALC_REMOVE + //CoreRules.SORT_REMOVE + ); + + public static final List ABSTRACT_RULES = ImmutableList.of( + CoreRules.AGGREGATE_ANY_PULL_UP_CONSTANTS, + CoreRules.UNION_PULL_UP_CONSTANTS, + PruneEmptyRules.UNION_INSTANCE, + PruneEmptyRules.INTERSECT_INSTANCE, + PruneEmptyRules.MINUS_INSTANCE, + PruneEmptyRules.PROJECT_INSTANCE, + PruneEmptyRules.FILTER_INSTANCE, + PruneEmptyRules.SORT_INSTANCE, + PruneEmptyRules.AGGREGATE_INSTANCE, + PruneEmptyRules.JOIN_LEFT_INSTANCE, + PruneEmptyRules.JOIN_RIGHT_INSTANCE, + PruneEmptyRules.SORT_FETCH_ZERO_INSTANCE, + PruneEmptyRules.EMPTY_TABLE_INSTANCE, + CoreRules.UNION_MERGE, + CoreRules.INTERSECT_MERGE, + CoreRules.MINUS_MERGE, + CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW, + CoreRules.FILTER_MERGE, + DateRangeRules.FILTER_INSTANCE, + CoreRules.INTERSECT_TO_DISTINCT); + private static final List rules = ImmutableList.of( CoreRules.AGGREGATE_EXPAND_DISTINCT_AGGREGATES, @@ -258,8 +341,11 @@ public final class DingoRules { DINGO_DOCUMENT_JOIN_RULE, DINGO_DOCUMENT_PROJECT_RULE, DINGO_DOCUMENT_FILTER_RULE, - DOCUMENT_INDEX_RANGE_SCAN_RULE - + DOCUMENT_INDEX_RANGE_SCAN_RULE, + DINGO_WINDOW_RULE, + DINGO_REPEAT_UNION_RULE, + DINGO_TABLE_SPOOL_RULE, + DINGO_TABLE_SCAN_FOR_SPOOL_RULE ); private DingoRules() { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableScanForSpoolRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableScanForSpoolRule.java new file mode 100644 index 0000000000..ab7f0316de --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableScanForSpoolRule.java @@ -0,0 +1,88 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rule; + +import io.dingodb.calcite.DingoTable; +import io.dingodb.calcite.rel.DingoInfoSchemaScan; +import io.dingodb.calcite.rel.DingoTableScan; +import io.dingodb.calcite.rel.DingoTransientTableScan; +import io.dingodb.calcite.traits.DingoConvention; +import io.dingodb.calcite.traits.DingoRelStreaming; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.schema.impl.ListTransientTable; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; + +public class DingoTableScanForSpoolRule extends ConverterRule { + public static final Config DEFAULT = Config.INSTANCE + .withConversion( + LogicalTableScan.class, + Convention.NONE, + DingoConvention.INSTANCE, + "DingoTableScanForSpoolRule" + ) + .withRuleFactory(DingoTableScanForSpoolRule::new); + + public DingoTableScanForSpoolRule(Config config) { + super(config); + } + + @Override + public @Nullable RelNode convert(RelNode relNode) { + LogicalTableScan scan = (LogicalTableScan) relNode; + RelTraitSet traits = scan.getTraitSet() + .replace(DingoConvention.INSTANCE) + .replace(DingoRelStreaming.of(scan.getTable())); + List fullNameList = scan.getTable().getQualifiedName(); + if (fullNameList.size() >= 2 && DingoTableScanRule.metaSchemaSet.contains(fullNameList.get(1))) { + DingoTable dingoTable = scan.getTable().unwrap(DingoTable.class); + if (dingoTable != null && "SYSTEM VIEW".equals(dingoTable.getTable().getTableType())) { + return new DingoInfoSchemaScan( + scan.getCluster(), + traits, + scan.getHints(), + scan.getTable(), + null, + null + ); + } + } + ListTransientTable transientTable = scan.getTable().unwrap(ListTransientTable.class); + if (transientTable != null) { + return new DingoTransientTableScan(scan.getCluster(), traits, scan.getHints(), + scan.getTable(), null, null, transientTable); + } + return new DingoTableScan( + scan.getCluster(), + traits, + scan.getHints(), + scan.getTable(), + null, + null, // selection + null, + null, + null, + false, + false + ); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableScanRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableScanRule.java index c76ebd57a9..2b5462a9b6 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableScanRule.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableScanRule.java @@ -19,6 +19,7 @@ import io.dingodb.calcite.DingoTable; import io.dingodb.calcite.rel.DingoInfoSchemaScan; import io.dingodb.calcite.rel.DingoTableScan; +import io.dingodb.calcite.rel.DingoTransientTableScan; import io.dingodb.calcite.rel.LogicalDingoTableScan; import io.dingodb.calcite.rel.logical.LogicalDocumentScanFilter; import io.dingodb.calcite.rel.logical.LogicalIndexFullScan; @@ -29,6 +30,7 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.schema.impl.ListTransientTable; import java.util.List; import java.util.Set; @@ -85,6 +87,11 @@ public RelNode convert(RelNode rel) { ); } } + ListTransientTable transientTable = scan.getTable().unwrap(ListTransientTable.class); + if (transientTable != null) { + return new DingoTransientTableScan(scan.getCluster(), traits, scan.getHints(), + scan.getTable(), scan.getFilter(), scan.getRealSelection(), transientTable); + } return new DingoTableScan( scan.getCluster(), traits, diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableSpoolRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableSpoolRule.java new file mode 100644 index 0000000000..0a3eab9e76 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/DingoTableSpoolRule.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rule; + +import io.dingodb.calcite.rel.DingoTableSpool; +import io.dingodb.calcite.traits.DingoConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableSpool; +import org.apache.calcite.rel.logical.LogicalTableSpool; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class DingoTableSpoolRule extends ConverterRule { + + public static final Config DEFAULT_CONFIG = Config.INSTANCE + .withConversion(LogicalTableSpool.class, Convention.NONE, + DingoConvention.INSTANCE, "TableSpoolRule") + .withRuleFactory(DingoTableSpoolRule::new); + + public DingoTableSpoolRule(Config config) { + super(config); + } + + @Override + public @Nullable RelNode convert(RelNode relNode) { + TableSpool spool = (TableSpool) relNode; + return DingoTableSpool.create( + convert(spool.getInput(), + spool.getInput().getTraitSet().replace(DingoConvention.INSTANCE)), + spool.readType, + spool.writeType, + spool.getTable() + ); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/dingo/DingoRepeatUnionRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/dingo/DingoRepeatUnionRule.java new file mode 100644 index 0000000000..baca9eaf26 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/dingo/DingoRepeatUnionRule.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rule.dingo; + +import io.dingodb.calcite.rel.DingoRepeatUnion; +import io.dingodb.calcite.rel.DingoWindow; +import io.dingodb.calcite.traits.DingoConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalRepeatUnion; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class DingoRepeatUnionRule extends ConverterRule { + public static final Config DEFAULT = Config.INSTANCE + .withConversion( + LogicalRepeatUnion.class, + Convention.NONE, + DingoConvention.INSTANCE, + "DingoRepeatUnionRule" + ) + .withRuleFactory(DingoRepeatUnionRule::new); + + public DingoRepeatUnionRule(Config config) { + super(config); + } + + @Override + public @Nullable RelNode convert(RelNode rel) { + final LogicalRepeatUnion union = (LogicalRepeatUnion) rel; + + final RelTraitSet traitSet = + union.getTraitSet().replace(DingoConvention.INSTANCE); + RelNode seedRel = union.getSeedRel(); + RelNode iterativeRel = union.getIterativeRel(); + return new DingoRepeatUnion( + rel.getCluster(), traitSet, + convert(seedRel, seedRel.getTraitSet().replace(DingoConvention.INSTANCE)), + convert(iterativeRel, iterativeRel.getTraitSet().replace(DingoConvention.INSTANCE)), + union.all, union.iterationLimit, + union.getTransientTable()); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/rule/dingo/DingoWindowRule.java b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/dingo/DingoWindowRule.java new file mode 100644 index 0000000000..93d9a581c8 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/rule/dingo/DingoWindowRule.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.rule.dingo; + +import io.dingodb.calcite.rel.DingoWindow; +import io.dingodb.calcite.rel.logical.LogicalScanWithRelOp; +import io.dingodb.calcite.traits.DingoConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class DingoWindowRule extends ConverterRule { + public static final Config DEFAULT = Config.INSTANCE + .withConversion( + LogicalWindow.class, + Convention.NONE, + DingoConvention.INSTANCE, + "DingoWindowRule" + ) + .withRuleFactory(DingoWindowRule::new); + + protected DingoWindowRule(Config config) { + super(config); + } + + @Override + public @Nullable RelNode convert(RelNode rel) { + final Window winAgg = (Window) rel; + final RelTraitSet traitSet = + winAgg.getTraitSet().replace(DingoConvention.INSTANCE); + final RelNode child = winAgg.getInput(); + final RelNode convertedChild = + convert(child, + child.getTraitSet().replace(DingoConvention.INSTANCE)); + return new DingoWindow(rel.getCluster(), traitSet, winAgg.getHints(), convertedChild, + winAgg.getConstants(), winAgg.getRowType(), winAgg.groups); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/StatsCache.java b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/StatsCache.java index 66090f6bbf..4a25da372d 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/stats/StatsCache.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/stats/StatsCache.java @@ -48,7 +48,9 @@ public static double getTableRowCount(String key) { public static double getTableRowCount(RelOptTable relOptTable) { DingoTable dingoTable = relOptTable.unwrap(DingoTable.class); - assert dingoTable != null; + if (dingoTable == null) { + return 1; + } if (dingoTable.getNames().size() > 2) { return getTableRowCount(dingoTable.getNames().get(1), dingoTable.getNames().get(2)); } else { diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/utils/DingoEnumUtils.java b/dingo-calcite/src/main/java/io/dingodb/calcite/utils/DingoEnumUtils.java new file mode 100644 index 0000000000..a46b378c18 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/utils/DingoEnumUtils.java @@ -0,0 +1,71 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.utils; + +import org.apache.calcite.adapter.enumerable.EnumUtils; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexNode; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.lang.reflect.Type; +import java.util.AbstractList; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class DingoEnumUtils { + public static List fieldRowTypes( + final RelDataType inputRowType, + final @Nullable List extraInputs, + final List argList) { + final List inputFields = inputRowType.getFieldList(); + return new AbstractList() { + @Override public RelDataType get(int index) { + final int arg = argList.get(index); + return arg < inputFields.size() + ? inputFields.get(arg).getType() + : requireNonNull(extraInputs, "extraInputs") + .get(arg - inputFields.size()).getType(); + } + @Override public int size() { + return argList.size(); + } + }; + } + + public static Type javaClass( + JavaTypeFactory typeFactory, RelDataType type) { + final Type clazz = typeFactory.getJavaClass(type); + return clazz instanceof Class ? clazz : Object[].class; + } + + public static List fieldTypes( + final JavaTypeFactory typeFactory, + final List inputTypes) { + return new AbstractList() { + @Override public Type get(int index) { + return javaClass(typeFactory, inputTypes.get(index)); + } + @Override public int size() { + return inputTypes.size(); + } + }; + } + +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/utils/DingoRelResult.java b/dingo-calcite/src/main/java/io/dingodb/calcite/utils/DingoRelResult.java new file mode 100644 index 0000000000..a4cf7a27fa --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/utils/DingoRelResult.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.utils; + +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.linq4j.tree.BlockStatement; + +public class DingoRelResult { + public final BlockStatement block; + + /** + * Describes the Java type returned by this relational expression, and the + * mapping between it and the fields of the logical row type. + */ + public final PhysType physType; + public final JavaRowFormat format; + + public DingoRelResult(BlockStatement block, PhysType physType, + JavaRowFormat format) { + this.block = block; + this.physType = physType; + this.format = format; + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/utils/WindowGenerate.java b/dingo-calcite/src/main/java/io/dingodb/calcite/utils/WindowGenerate.java new file mode 100644 index 0000000000..344a791af9 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/utils/WindowGenerate.java @@ -0,0 +1,270 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.utils; + +import com.google.common.collect.ImmutableList; +import io.dingodb.calcite.rel.DingoWindow; +import io.dingodb.common.util.Pair; +import org.apache.calcite.adapter.enumerable.EnumUtils; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.DeclarationStatement; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.MemberDeclaration; +import org.apache.calcite.linq4j.tree.ParameterExpression; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.linq4j.tree.PseudoField; +import org.apache.calcite.linq4j.tree.Types; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.runtime.SortedMultiMap; +import org.apache.calcite.runtime.Utilities; +import org.apache.calcite.util.BuiltInMethod; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.lang.reflect.Modifier; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.calcite.adapter.enumerable.EnumUtils.generateCollatorExpression; + +public class WindowGenerate { + public static Expression generateComparator(RelCollation collation, RelDataType rowType) { + BlockBuilder body = new BlockBuilder(); + final ParameterExpression parameterV0 = + Expressions.parameter(Object[].class, "v0"); + final ParameterExpression parameterV1 = + Expressions.parameter(Object[].class, "v1"); + final ParameterExpression parameterC = + Expressions.parameter(int.class, "c"); + final int mod = + collation.getFieldCollations().size() == 1 ? Modifier.FINAL : 0; + body.add(Expressions.declare(mod, parameterC, null)); + for (RelFieldCollation fieldCollation : collation.getFieldCollations()) { + final int index = fieldCollation.getFieldIndex(); + final RelDataType fieldType = rowType.getFieldList().get(index).getType(); + final Expression fieldComparator = generateCollatorExpression(fieldType.getCollation()); + + Expression ix = Expressions.constant(index); + Expression arg0 = Expressions.arrayIndex(parameterV0, ix); + Expression arg1 = Expressions.arrayIndex(parameterV1, ix); + + arg0 = EnumUtils.convert(arg0, Comparable.class); + arg1 = EnumUtils.convert(arg1, Comparable.class); + //switch (Primitive.flavor(fieldClass(index))) { + // case OBJECT: + // arg0 = EnumUtils.convert(arg0, Comparable.class); + // arg1 = EnumUtils.convert(arg1, Comparable.class); + // break; + // default: + // break; + //} + final boolean nullsFirst = + fieldCollation.nullDirection + == RelFieldCollation.NullDirection.FIRST; + final boolean descending = + fieldCollation.getDirection() + == RelFieldCollation.Direction.DESCENDING; + body.add( + Expressions.statement( + Expressions.assign( + parameterC, + Expressions.call( + Utilities.class, + fieldNullable(rowType, index) + ? (nullsFirst != descending + ? "compareNullsFirst" + : "compareNullsLast") + : "compare", + Expressions.list( + arg0, + arg1) + .appendIfNotNull(fieldComparator))))); + body.add( + Expressions.ifThen( + Expressions.notEqual( + parameterC, Expressions.constant(0)), + Expressions.return_( + null, + descending + ? Expressions.negate(parameterC) + : parameterC))); + } + body.add( + Expressions.return_(null, Expressions.constant(0))); + + final List memberDeclarations = + Expressions.list( + Expressions.methodDecl( + Modifier.PUBLIC, + int.class, + "compare", + ImmutableList.of(parameterV0, parameterV1), + body.toBlock())); + final ParameterExpression parameterO0 = + Expressions.parameter(Object.class, "o0"); + final ParameterExpression parameterO1 = + Expressions.parameter(Object.class, "o1"); + BlockBuilder bridgeBody = new BlockBuilder(); + bridgeBody.add( + Expressions.return_( + null, + Expressions.call( + Expressions.parameter( + Comparable.class, "this"), + BuiltInMethod.COMPARATOR_COMPARE.method, + Expressions.convert_( + parameterO0, + Object[].class), + Expressions.convert_( + parameterO1, + Object[].class)))); + memberDeclarations.add( + EnumUtils.overridingMethodDecl( + BuiltInMethod.COMPARATOR_COMPARE.method, + ImmutableList.of(parameterO0, parameterO1), + bridgeBody.toBlock())); + return Expressions.new_( + Comparator.class, + ImmutableList.of(), + memberDeclarations); + } + + public static Pair getPartitionIterator( + BlockBuilder builder, Expression source, Window.Group group, Expression comparator, PhysType inputPhysType + ) { + if (group.keys.isEmpty()) { + // dec list + final Expression tempList_ = builder.append("tempList", Expressions.new_( + ArrayList.class)); + BlockBuilder builder2 = new BlockBuilder(); + ParameterExpression rows_ = + (ParameterExpression) builder2.append( + "rows", + Expressions.convert_( + Expressions.call( + source, BuiltInMethod.ITERATOR_NEXT.method), + Object[].class), + false); + builder2.add( + Expressions.statement( + Expressions.call( + tempList_, + BuiltInMethod.COLLECTION_ADD.method, + rows_))); + builder.add(Expressions.while_( + Expressions.call( + source, + BuiltInMethod.ITERATOR_HAS_NEXT.method), + builder2.toBlock())); + + // while source.hasNext -> list.add(source.next) + return Pair.of(tempList_, + builder.append( + "iterator", + Expressions.call( + null, + BuiltInMethod.SORTED_MULTI_MAP_SINGLETON.method, + comparator, + tempList_))); + } + Expression multiMap_ = + builder.append( + "multiMap", Expressions.new_(SortedMultiMap.class)); + final BlockBuilder builder2 = new BlockBuilder(); + + ParameterExpression key; + + final ParameterExpression rows_ = + (ParameterExpression) builder2.append( + "rows", + Expressions.convert_( + Expressions.call( + source, BuiltInMethod.ITERATOR_NEXT.method), + Object[].class), + false); + + org.apache.calcite.util.Pair> selector = + inputPhysType.selector(rows_, group.keys.asList(), JavaRowFormat.CUSTOM); + if (selector.left instanceof Types.RecordType) { + Types.RecordType keyJavaType = (Types.RecordType) selector.left; + List initExpressions = selector.right; + key = Expressions.parameter(keyJavaType, "key"); + builder2.add(Expressions.declare(0, key, null)); + builder2.add( + Expressions.statement( + Expressions.assign(key, Expressions.new_(keyJavaType)))); + List fieldList = keyJavaType.getRecordFields(); + for (int i = 0; i < initExpressions.size(); i++) { + Expression right = initExpressions.get(i); + builder2.add( + Expressions.statement( + Expressions.assign( + Expressions.field(key, fieldList.get(i)), right))); + } + } else { + DeclarationStatement declare = + Expressions.declare(0, "key", selector.right.get(0)); + builder2.add(declare); + key = declare.parameter; + } + + // todo start + //int keyIndex = 0; + //if (group.keys.asList().size() == 1) { + // keyIndex = group.keys.asList().get(0); + //} + //Expression ix = Expressions.constant(keyIndex); + //Expression tmp = Expressions.arrayIndex(rows_, ix); + //key = builder2.append("key", EnumUtils.convert(tmp, Integer.class)); + // todo end + + builder2.add( + Expressions.statement( + Expressions.call( + multiMap_, + BuiltInMethod.SORTED_MULTI_MAP_PUT_MULTI.method, + key, + rows_))); + + builder.add(Expressions.while_( + Expressions.call( + source, + BuiltInMethod.ITERATOR_HAS_NEXT.method), + builder2.toBlock())); + + return Pair.of(multiMap_, + builder.append( + "iterator", + Expressions.call( + multiMap_, + BuiltInMethod.SORTED_MULTI_MAP_ARRAYS.method, + comparator))); + } + + public static boolean fieldNullable(RelDataType rowType, int field) { + return rowType.getFieldList().get(field).getType().isNullable(); + } + +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoExplainVisitor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoExplainVisitor.java index a3786ce4e5..93748068ad 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoExplainVisitor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoExplainVisitor.java @@ -41,11 +41,13 @@ import io.dingodb.calcite.rel.DingoProject; import io.dingodb.calcite.rel.DingoReduce; import io.dingodb.calcite.rel.DingoRel; +import io.dingodb.calcite.rel.DingoRepeatUnion; import io.dingodb.calcite.rel.DingoTableModify; import io.dingodb.calcite.rel.DingoTableScan; import io.dingodb.calcite.rel.DingoUnion; import io.dingodb.calcite.rel.DingoValues; import io.dingodb.calcite.rel.DingoVector; +import io.dingodb.calcite.rel.DingoWindow; import io.dingodb.calcite.rel.DocumentStreamConvertor; import io.dingodb.calcite.rel.VectorStreamConvertor; import io.dingodb.calcite.rel.dingo.DingoDocumentScanFilter; @@ -632,4 +634,16 @@ public Explain visit(@NonNull DingoDocumentScanFilter indexRangeScan) { ); } + @Override + public Explain visit(DingoWindow dingoWindow) { + Explain explain = dingo(dingoWindow.getInput()).accept(this); + Explain explain1 = new Explain("dingoWindow", dingoWindow.getRowCount(), "root", "", ""); + explain1.getChildren().add(explain); + return explain1; + } + + @Override + public Explain visit(DingoRepeatUnion dingoRepeatUnion) { + return null; + } } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoJobVisitor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoJobVisitor.java index 5be91abafa..45185bd261 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoJobVisitor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoJobVisitor.java @@ -39,11 +39,16 @@ import io.dingodb.calcite.rel.DingoPartRangeDelete; import io.dingodb.calcite.rel.DingoProject; import io.dingodb.calcite.rel.DingoReduce; +import io.dingodb.calcite.rel.DingoRel; +import io.dingodb.calcite.rel.DingoRepeatUnion; import io.dingodb.calcite.rel.DingoTableModify; import io.dingodb.calcite.rel.DingoTableScan; +import io.dingodb.calcite.rel.DingoTableSpool; +import io.dingodb.calcite.rel.DingoTransientTableScan; import io.dingodb.calcite.rel.DingoUnion; import io.dingodb.calcite.rel.DingoValues; import io.dingodb.calcite.rel.DingoVector; +import io.dingodb.calcite.rel.DingoWindow; import io.dingodb.calcite.rel.DocumentStreamConvertor; import io.dingodb.calcite.rel.VectorStreamConvertor; import io.dingodb.calcite.rel.dingo.DingoDocumentScanFilter; @@ -57,6 +62,7 @@ import io.dingodb.calcite.rel.dingo.DingoStreamingConverter; import io.dingodb.calcite.rel.dingo.IndexFullScan; import io.dingodb.calcite.rel.dingo.IndexRangeScan; +import io.dingodb.calcite.rule.DingoTableScanForSpoolRule; import io.dingodb.calcite.visitor.function.DingoAggregateVisitFun; import io.dingodb.calcite.visitor.function.DingoCountDeleteVisitFun; import io.dingodb.calcite.visitor.function.DingoDiskAnnBuildVisitFun; @@ -87,16 +93,20 @@ import io.dingodb.calcite.visitor.function.DingoReduceAggregateVisitFun; import io.dingodb.calcite.visitor.function.DingoReduceVisitFun; import io.dingodb.calcite.visitor.function.DingoRelOpVisitFun; +import io.dingodb.calcite.visitor.function.DingoRepeatUnionVisitFun; import io.dingodb.calcite.visitor.function.DingoRootVisitFun; import io.dingodb.calcite.visitor.function.DingoScanWithRelOpVisitFun; import io.dingodb.calcite.visitor.function.DingoSortVisitFun; import io.dingodb.calcite.visitor.function.DingoStreamingConverterVisitFun; import io.dingodb.calcite.visitor.function.DingoTableModifyVisitFun; import io.dingodb.calcite.visitor.function.DingoTableScanVisitFun; +import io.dingodb.calcite.visitor.function.DingoTableSpoolVisitFun; +import io.dingodb.calcite.visitor.function.DingoTransientTableScanVisitFun; import io.dingodb.calcite.visitor.function.DingoUnionVisitFun; import io.dingodb.calcite.visitor.function.DingoValuesVisitFun; import io.dingodb.calcite.visitor.function.DingoVectorStreamingVisitFun; import io.dingodb.calcite.visitor.function.DingoVectorVisitFun; +import io.dingodb.calcite.visitor.function.DingoWindowVisitFun; import io.dingodb.common.ExecuteVariables; import io.dingodb.common.Location; import io.dingodb.common.log.LogUtils; @@ -105,10 +115,12 @@ import io.dingodb.exec.base.JobManager; import io.dingodb.exec.dag.Vertex; import io.dingodb.exec.impl.IdGeneratorImpl; +import io.dingodb.exec.operator.params.TableSpoolParam; import io.dingodb.exec.transaction.base.ITransaction; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; import org.apache.calcite.sql.SqlKind; import org.checkerframework.checker.nullness.qual.NonNull; @@ -419,4 +431,20 @@ public Collection visit(@NonNull DingoDocumentScanFilter dingoDocumentSc return DingoDocumentScanFilterVisitFun.visit(job, idGenerator, currentLocation, this, transaction, dingoDocumentScanFilter); } + public Collection visit(@NonNull DingoWindow dingoWindow) { + return DingoWindowVisitFun.visit(job, idGenerator, currentLocation, this, transaction, dingoWindow); + } + + public Collection visit(@NonNull DingoRepeatUnion dingoRepeatUnion) { + return DingoRepeatUnionVisitFun.visit(job, idGenerator, currentLocation, this, transaction, dingoRepeatUnion); + } + + public Collection visit(@NonNull DingoTransientTableScan dingoTransientTableScan) { + return DingoTransientTableScanVisitFun.visit(job, idGenerator, currentLocation, this, transaction, dingoTransientTableScan); + } + + public Collection visit(DingoTableSpool dingoTableSpool) { + return DingoTableSpoolVisitFun.visit(job, idGenerator, currentLocation, this, transaction, dingoTableSpool); + } + } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoRelVisitor.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoRelVisitor.java index 682ac8d8d8..c2a903c507 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoRelVisitor.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/DingoRelVisitor.java @@ -39,11 +39,15 @@ import io.dingodb.calcite.rel.DingoPartRangeDelete; import io.dingodb.calcite.rel.DingoProject; import io.dingodb.calcite.rel.DingoReduce; +import io.dingodb.calcite.rel.DingoRepeatUnion; import io.dingodb.calcite.rel.DingoTableModify; import io.dingodb.calcite.rel.DingoTableScan; +import io.dingodb.calcite.rel.DingoTableSpool; +import io.dingodb.calcite.rel.DingoTransientTableScan; import io.dingodb.calcite.rel.DingoUnion; import io.dingodb.calcite.rel.DingoValues; import io.dingodb.calcite.rel.DingoVector; +import io.dingodb.calcite.rel.DingoWindow; import io.dingodb.calcite.rel.DocumentStreamConvertor; import io.dingodb.calcite.rel.VectorStreamConvertor; import io.dingodb.calcite.rel.dingo.DingoDocumentScanFilter; @@ -142,4 +146,16 @@ public interface DingoRelVisitor { T visit(DingoDocumentScanFilter documentIndexRangeScan); + T visit(DingoWindow dingoWindow); + + T visit(DingoRepeatUnion dingoRepeatUnion); + + default T visit(DingoTransientTableScan dingoTransientTableScan) { + return null; + } + + default T visit(DingoTableSpool dingoTableSpool) { + return null; + } + } diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoRepeatUnionVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoRepeatUnionVisitFun.java new file mode 100644 index 0000000000..d7d2c448fb --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoRepeatUnionVisitFun.java @@ -0,0 +1,101 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.visitor.function; + +import io.dingodb.calcite.rel.DingoRepeatUnion; +import io.dingodb.calcite.rel.dingo.DingoRoot; +import io.dingodb.calcite.type.converter.DefinitionMapper; +import io.dingodb.calcite.visitor.DingoJobVisitor; +import io.dingodb.common.CommonId; +import io.dingodb.common.Location; +import io.dingodb.common.log.LogUtils; +import io.dingodb.common.type.TupleMapping; +import io.dingodb.exec.base.IdGenerator; +import io.dingodb.exec.base.Job; +import io.dingodb.exec.base.JobManager; +import io.dingodb.exec.base.Task; +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.impl.JobManagerImpl; +import io.dingodb.exec.operator.params.RepeatUnionParam; +import io.dingodb.exec.transaction.base.ITransaction; +import io.dingodb.exec.transaction.impl.TransactionManager; +import io.dingodb.meta.MetaService; +import io.dingodb.tso.TsoService; +import lombok.extern.slf4j.Slf4j; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelRecordType; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import static io.dingodb.exec.utils.OperatorCodeUtils.REPEAT_UNION; + +@Slf4j +public class DingoRepeatUnionVisitFun { + public static @NonNull Collection visit( + Job job, + @NonNull IdGenerator idGenerator, + Location currentLocation, + DingoJobVisitor visitor, + ITransaction transaction, + @NonNull DingoRepeatUnion rel + ) { + Job seedJob = getSpoolJob(transaction.getStartTs(), rel.getSeedRel(), transaction); + Job iterationJob = getSpoolJob(transaction.getStartTs(), rel.getIterativeRel(), transaction); + RepeatUnionParam repeatUnionParam = new RepeatUnionParam(seedJob, iterationJob, rel.all, rel.iterationLimit); + Task task = job.getOrCreate(currentLocation, idGenerator); + Vertex vertex = new Vertex(REPEAT_UNION, repeatUnionParam); + vertex.setId(idGenerator.getOperatorId(task.getId())); + task.putVertex(vertex); + + List outputs = new ArrayList<>(); + outputs.add(vertex); + return outputs; + } + + public static Job getSpoolJob(long startTs, RelNode relNode, ITransaction transaction) { + RelNode relInput; + if (relNode instanceof RelSubset) { + RelSubset relSubset = (RelSubset) relNode; + relNode = relSubset.getBest(); + + List selection = new ArrayList<>(); + selection.add(0); + relInput = new DingoRoot(relNode.getCluster(), relNode.getTraitSet(), relNode, TupleMapping.of(selection)); + } else { + relInput = relNode; + } + JobManager jobManager = JobManagerImpl.INSTANCE; + long jobSeqId = TsoService.getDefault().cacheTso(); + CommonId txnId = new CommonId(CommonId.CommonType.TRANSACTION, + TransactionManager.getServerId().seq, TransactionManager.getStartTs()); + RelDataType parasType = new RelRecordType(new ArrayList<>()); + Job job = jobManager.createJob(startTs, jobSeqId, txnId, DefinitionMapper.mapToDingoType(parasType)); + Location currentLocation = MetaService.root().currentLocation(); + DingoJobVisitor.renderJob( + jobManager, job, relInput, currentLocation, false, + transaction, null, null, 0, + false, false, false, 1, "root", "%" + ); + return job; + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoStreamingConverterVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoStreamingConverterVisitFun.java index cdac014aec..a9b0633760 100644 --- a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoStreamingConverterVisitFun.java +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoStreamingConverterVisitFun.java @@ -103,6 +103,9 @@ public static Collection visit( assert dstPartitions != null && srcPartitions != null; final DingoRelPartition dstDistribution = dstStreaming.getDistribution(); final DingoRelPartition srcDistribution = srcStreaming.getDistribution(); + if (dstPartitions == null || srcPartitions == null) { + return inputs; + } DingoRelStreaming media = dstStreaming.withPartitions(srcPartitions); assert media.getPartitions() != null; Collection outputs = inputs; diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableSpoolVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableSpoolVisitFun.java new file mode 100644 index 0000000000..cdd0c224b9 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTableSpoolVisitFun.java @@ -0,0 +1,62 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.visitor.function; + +import com.google.common.collect.ImmutableList; +import io.dingodb.calcite.rel.DingoTableSpool; +import io.dingodb.calcite.visitor.DingoJobVisitor; +import io.dingodb.common.Location; +import io.dingodb.exec.base.IdGenerator; +import io.dingodb.exec.base.Job; +import io.dingodb.exec.base.Task; +import io.dingodb.exec.dag.Edge; +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.operator.params.TableSpoolParam; +import io.dingodb.exec.transaction.base.ITransaction; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.Collection; + +import static io.dingodb.calcite.rel.DingoRel.dingo; +import static io.dingodb.common.util.Utils.sole; +import static io.dingodb.exec.utils.OperatorCodeUtils.TABLE_SPOOL; + +public class DingoTableSpoolVisitFun { + @NonNull + public static Collection visit( + Job job, + IdGenerator idGenerator, + Location currentLocation, + DingoJobVisitor dingoJobVisitor, + ITransaction transaction, + @NonNull DingoTableSpool rel + ) { + Collection inputs = dingo(rel.getInput()).accept(dingoJobVisitor); + + TableSpoolParam tableSpoolParam = new TableSpoolParam(rel.getListTransientTable().getModifiableCollection()); + Vertex vertex = new Vertex(TABLE_SPOOL, tableSpoolParam); + Vertex input = sole(inputs); + Task task = input.getTask(); + vertex.setId(idGenerator.getOperatorId(task.getId())); + task.putVertex(vertex); + input.setPin(0); + Edge edge = new Edge(input, vertex); + input.addEdge(edge); + vertex.addIn(edge); + return ImmutableList.of(vertex); + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTransientTableScanVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTransientTableScanVisitFun.java new file mode 100644 index 0000000000..8588e31f32 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoTransientTableScanVisitFun.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.visitor.function; + +import io.dingodb.calcite.rel.DingoTransientTableScan; +import io.dingodb.calcite.visitor.DingoJobVisitor; +import io.dingodb.common.Location; +import io.dingodb.exec.base.IdGenerator; +import io.dingodb.exec.base.Job; +import io.dingodb.exec.base.Task; +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.operator.params.ListTransientScanParam; +import io.dingodb.exec.transaction.base.ITransaction; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static io.dingodb.exec.utils.OperatorCodeUtils.LIST_TRANSIENT_SCAN; + + +public class DingoTransientTableScanVisitFun { + public static @NonNull Collection visit( + Job job, + @NonNull IdGenerator idGenerator, + Location currentLocation, + DingoJobVisitor visitor, + ITransaction transaction, + @NonNull DingoTransientTableScan rel + ) { + ListTransientScanParam listTransientScanParam = new ListTransientScanParam(rel.getTransientTable()); + Task task = job.getOrCreate(currentLocation, idGenerator); + Vertex vertex = new Vertex(LIST_TRANSIENT_SCAN, listTransientScanParam); + vertex.setId(idGenerator.getOperatorId(task.getId())); + task.putVertex(vertex); + + List outputs = new ArrayList<>(); + outputs.add(vertex); + return outputs; + } +} diff --git a/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoWindowVisitFun.java b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoWindowVisitFun.java new file mode 100644 index 0000000000..93237cd4e7 --- /dev/null +++ b/dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoWindowVisitFun.java @@ -0,0 +1,924 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.calcite.visitor.function; + +import com.google.common.collect.ImmutableList; +import io.dingodb.calcite.rel.DingoWindow; +import io.dingodb.calcite.type.DingoSqlTypeFactory; +import io.dingodb.calcite.utils.DingoEnumUtils; +import io.dingodb.calcite.utils.DingoRelResult; +import io.dingodb.calcite.utils.WindowGenerate; +import io.dingodb.calcite.visitor.DingoJobVisitor; +import io.dingodb.common.Location; +import io.dingodb.common.util.Pair; +import io.dingodb.exec.base.IdGenerator; +import io.dingodb.exec.base.Job; +import io.dingodb.exec.base.Task; +import io.dingodb.exec.dag.Edge; +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.operator.params.WindowFunctionParam; +import io.dingodb.exec.transaction.base.ITransaction; +import io.dingodb.tool.api.WindowService; +import org.apache.calcite.adapter.enumerable.AggImpState; +import org.apache.calcite.adapter.enumerable.EnumUtils; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.adapter.enumerable.RexToLixTranslator; +import org.apache.calcite.adapter.enumerable.WinAggAddContext; +import org.apache.calcite.adapter.enumerable.WinAggContext; +import org.apache.calcite.adapter.enumerable.WinAggFrameResultContext; +import org.apache.calcite.adapter.enumerable.WinAggImplementor; +import org.apache.calcite.adapter.enumerable.impl.WinAggAddContextImpl; +import org.apache.calcite.adapter.enumerable.impl.WinAggResetContextImpl; +import org.apache.calcite.adapter.enumerable.impl.WinAggResultContextImpl; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteSystemProperty; +import org.apache.calcite.linq4j.tree.BinaryExpression; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.BlockStatement; +import org.apache.calcite.linq4j.tree.ClassDeclaration; +import org.apache.calcite.linq4j.tree.DeclarationStatement; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.MemberDeclaration; +import org.apache.calcite.linq4j.tree.ParameterExpression; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.linq4j.tree.Statement; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexWindowBound; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlConformanceEnum; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.codehaus.commons.compiler.CompileException; +import org.codehaus.commons.compiler.CompilerFactoryFactory; +import org.codehaus.commons.compiler.ICompilerFactory; +import org.codehaus.commons.compiler.ISimpleCompiler; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +import static io.dingodb.calcite.rel.DingoRel.dingo; +import static io.dingodb.common.util.Utils.sole; +import static io.dingodb.exec.utils.OperatorCodeUtils.WINDOW_FUNCTION; + +public final class DingoWindowVisitFun { + + private DingoWindowVisitFun() { + } + + public static @NonNull Collection visit( + Job job, + @NonNull IdGenerator idGenerator, + Location currentLocation, + DingoJobVisitor visitor, + ITransaction transaction, + @NonNull DingoWindow rel + ) { + Collection inputs = dingo(rel.getInput()).accept(visitor); + + WindowService windowService = generateCode(rel); + WindowFunctionParam windowFunctionParam = new WindowFunctionParam(windowService); + Vertex vertex = new Vertex(WINDOW_FUNCTION, windowFunctionParam); + Vertex input = sole(inputs); + Task task = input.getTask(); + vertex.setId(idGenerator.getOperatorId(task.getId())); + task.putVertex(vertex); + input.setPin(0); + Edge edge = new Edge(input, vertex); + input.addEdge(edge); + vertex.addIn(edge); + return ImmutableList.of(vertex); + } + + public static WindowService generateCode(DingoWindow rel) { + BlockBuilder builder = new BlockBuilder(); + ParameterExpression prevStart = + Expressions.parameter(int.class, builder.newName("prevStart")); + ParameterExpression prevEnd = + Expressions.parameter(int.class, builder.newName("prevEnd")); + Expression source = + Expressions.parameter(0, Iterator.class, "paramSource"); + + PhysType inputPhysType = PhysTypeImpl.of(DingoSqlTypeFactory.INSTANCE, rel.getInput().getRowType(), JavaRowFormat.ARRAY); + + DingoRelResult result = new DingoRelResult(null, inputPhysType, inputPhysType.getFormat()); + + final List translatedConstants = + new ArrayList<>(rel.constants.size()); + + builder.add(Expressions.declare(0, prevStart, null)); + builder.add(Expressions.declare(0, prevEnd, null)); + for (int windowIdx = 0; windowIdx < rel.groups.size(); windowIdx++) { + Window.Group group = rel.groups.get(windowIdx); + final Expression comparator_ = + builder.append( + "comparator", + WindowGenerate.generateComparator(group.collation(), rel.getInput().getRowType())); + Pair partitionIterator = + WindowGenerate.getPartitionIterator(builder, source, group, comparator_, inputPhysType); + final Expression collectionExpr = partitionIterator.getKey(); + final Expression iterator_ = partitionIterator.getValue(); + + List aggs = new ArrayList<>(); + List aggregateCalls = group.getAggregateCalls(rel); + for (int aggIdx = 0; aggIdx < aggregateCalls.size(); aggIdx++) { + AggregateCall call = aggregateCalls.get(aggIdx); + if (call.ignoreNulls()) { + throw new UnsupportedOperationException("IGNORE NULLS not supported"); + } + aggs.add(new AggImpState(aggIdx, call, true)); + } + + // The output from this stage is the input plus the aggregate functions. + JavaTypeFactory typeFactory = DingoSqlTypeFactory.INSTANCE; + final RelDataTypeFactory.Builder typeBuilder = typeFactory.builder(); + typeBuilder.addAll(rel.getInput().getRowType().getFieldList()); + for (AggImpState agg : aggs) { + // CALCITE-4326 + String name = Objects.requireNonNull(agg.call.name, + () -> "agg.call.name for " + agg.call); + typeBuilder.add(name, agg.call.type); + } + RelDataType outputRowType = typeBuilder.build(); + final PhysType outputPhysType = + PhysTypeImpl.of( + typeFactory, outputRowType, JavaRowFormat.ARRAY); + + final Expression list_ = + builder.append( + "list", + Expressions.new_( + ArrayList.class, + Expressions.call( + collectionExpr, BuiltInMethod.COLLECTION_SIZE.method)), + false); + + Pair collationKey = + getRowCollationKey(builder, inputPhysType, group, windowIdx); + Expression keySelector = collationKey.getKey(); + Expression keyComparator = collationKey.getValue(); + final BlockBuilder builder3 = new BlockBuilder(); + final Expression rows_ = + builder3.append( + "rows", + Expressions.convert_( + Expressions.call( + iterator_, BuiltInMethod.ITERATOR_NEXT.method), + Object[].class), + false); + + builder3.add( + Expressions.statement( + Expressions.assign(prevStart, Expressions.constant(-1)))); + builder3.add( + Expressions.statement( + Expressions.assign(prevEnd, + Expressions.constant(Integer.MAX_VALUE)))); + + final BlockBuilder builder4 = new BlockBuilder(); + + final ParameterExpression i_ = + Expressions.parameter(int.class, builder4.newName("i")); + + //Expression ix = Expressions.constant(0); + //final Expression tmp = Expressions.arrayIndex(rows_, ix); + final Expression row_ = + builder4.append( + "row", + EnumUtils.convert( + Expressions.arrayIndex(rows_, i_), + Object[].class)); + + final RexToLixTranslator.InputGetter inputGetter = + new DingoWindow.WindowRelInputGetter(row_, inputPhysType, + result.physType.getRowType().getFieldCount(), + translatedConstants); + + final RexToLixTranslator translator = + RexToLixTranslator.forAggregation(typeFactory, builder4, + inputGetter, SqlConformanceEnum.DEFAULT); + + final List outputRow = new ArrayList<>(); + int fieldCountWithAggResults = + inputPhysType.getRowType().getFieldCount(); + for (int i = 0; i < fieldCountWithAggResults; i++) { + outputRow.add( + inputPhysType.fieldReference( + row_, i, + outputPhysType.getJavaFieldType(i))); + } + + declareAndResetState(typeFactory, builder, result, windowIdx, aggs, + outputPhysType, outputRow, rel); + + // There are assumptions that minX==0. If ever change this, look for + // frameRowCount, bounds checking, etc + final Expression minX = Expressions.constant(0); + final Expression partitionRowCount = + builder3.append("partRows", Expressions.field(rows_, "length")); + final Expression maxX = builder3.append("maxX", + Expressions.subtract( + partitionRowCount, Expressions.constant(1))); + + final Expression startUnchecked = builder4.append("start", + translateBound(translator, i_, row_, minX, maxX, rows_, + group, true, inputPhysType, keySelector, keyComparator)); + final Expression endUnchecked = builder4.append("end", + translateBound(translator, i_, row_, minX, maxX, rows_, + group, false, inputPhysType, keySelector, keyComparator)); + + final Expression startX; + final Expression endX; + final Expression hasRows; + if (group.isAlwaysNonEmpty()) { + startX = startUnchecked; + endX = endUnchecked; + hasRows = Expressions.constant(true); + } else { + Expression startTmp = + group.lowerBound.isUnbounded() || startUnchecked == i_ + ? startUnchecked + : builder4.append("startTmp", + Expressions.call(null, BuiltInMethod.MATH_MAX.method, + startUnchecked, minX)); + Expression endTmp = + group.upperBound.isUnbounded() || endUnchecked == i_ + ? endUnchecked + : builder4.append("endTmp", + Expressions.call(null, BuiltInMethod.MATH_MIN.method, + endUnchecked, maxX)); + + ParameterExpression startPe = Expressions.parameter(0, int.class, + builder4.newName("startChecked")); + ParameterExpression endPe = Expressions.parameter(0, int.class, + builder4.newName("endChecked")); + builder4.add(Expressions.declare(Modifier.FINAL, startPe, null)); + builder4.add(Expressions.declare(Modifier.FINAL, endPe, null)); + + hasRows = builder4.append("hasRows", + Expressions.lessThanOrEqual(startTmp, endTmp)); + builder4.add( + Expressions.ifThenElse(hasRows, + Expressions.block( + Expressions.statement( + Expressions.assign(startPe, startTmp)), + Expressions.statement( + Expressions.assign(endPe, endTmp))), + Expressions.block( + Expressions.statement( + Expressions.assign(startPe, Expressions.constant(-1))), + Expressions.statement( + Expressions.assign(endPe, Expressions.constant(-1)))))); + startX = startPe; + endX = endPe; + } + + final BlockBuilder builder5 = new BlockBuilder(true, builder4); + + BinaryExpression rowCountWhenNonEmpty = Expressions.add( + startX == minX ? endX : Expressions.subtract(endX, startX), + Expressions.constant(1)); + + final Expression frameRowCount; + + if (hasRows.equals(Expressions.constant(true))) { + frameRowCount = + builder4.append("totalRows", rowCountWhenNonEmpty); + } else { + frameRowCount = + builder4.append("totalRows", + Expressions.condition(hasRows, rowCountWhenNonEmpty, + Expressions.constant(0))); + } + + ParameterExpression actualStart = Expressions.parameter( + 0, int.class, builder5.newName("actualStart")); + + final BlockBuilder builder6 = new BlockBuilder(true, builder5); + builder6.add( + Expressions.statement(Expressions.assign(actualStart, startX))); + + for (final AggImpState agg : aggs) { + List aggState = Objects.requireNonNull(agg.state, "agg.state"); + agg.implementor.implementReset(Objects.requireNonNull(agg.context, "agg.context"), + new WinAggResetContextImpl(builder6, aggState, i_, startX, endX, + hasRows, frameRowCount, partitionRowCount)); + } + + Expression lowerBoundCanChange = + group.lowerBound.isUnbounded() && group.lowerBound.isPreceding() + ? Expressions.constant(false) + : Expressions.notEqual(startX, prevStart); + Expression needRecomputeWindow = Expressions.orElse( + lowerBoundCanChange, + Expressions.lessThan(endX, prevEnd)); + + BlockStatement resetWindowState = builder6.toBlock(); + if (resetWindowState.statements.size() == 1) { + builder5.add( + Expressions.declare(0, actualStart, + Expressions.condition(needRecomputeWindow, startX, + Expressions.add(prevEnd, Expressions.constant(1))))); + } else { + builder5.add( + Expressions.declare(0, actualStart, null)); + builder5.add( + Expressions.ifThenElse(needRecomputeWindow, + resetWindowState, + Expressions.statement( + Expressions.assign(actualStart, + Expressions.add(prevEnd, Expressions.constant(1)))))); + } + + if (lowerBoundCanChange instanceof BinaryExpression) { + builder5.add( + Expressions.statement(Expressions.assign(prevStart, startX))); + } + builder5.add( + Expressions.statement(Expressions.assign(prevEnd, endX))); + + final BlockBuilder builder7 = new BlockBuilder(true, builder5); + final DeclarationStatement jDecl = + Expressions.declare(0, "j", actualStart); + + final PhysType inputPhysTypeFinal = inputPhysType; + final Function + resultContextBuilder = + getBlockBuilderWinAggFrameResultContextFunction(typeFactory, + SqlConformanceEnum.DEFAULT, result, translatedConstants, + comparator_, rows_, i_, startX, endX, minX, maxX, + hasRows, frameRowCount, partitionRowCount, + jDecl, inputPhysTypeFinal); + + final Function> rexArguments = agg -> { + List argList = agg.call.getArgList(); + List inputTypes = + DingoEnumUtils.fieldRowTypes( + result.physType.getRowType(), + rel.constants, + argList); + List args = new ArrayList<>(inputTypes.size()); + for (int i = 0; i < argList.size(); i++) { + Integer idx = argList.get(i); + args.add(new RexInputRef(idx, inputTypes.get(i))); + } + return args; + }; + + implementAdd(aggs, builder7, resultContextBuilder, rexArguments, jDecl); + + BlockStatement forBlock = builder7.toBlock(); + if (!forBlock.statements.isEmpty()) { + // For instance, row_number does not use for loop to compute the value + Statement forAggLoop = Expressions.for_( + Arrays.asList(jDecl), + Expressions.lessThanOrEqual(jDecl.parameter, endX), + Expressions.preIncrementAssign(jDecl.parameter), + forBlock); + if (!hasRows.equals(Expressions.constant(true))) { + forAggLoop = Expressions.ifThen(hasRows, forAggLoop); + } + builder5.add(forAggLoop); + } + + if (implementResult(aggs, builder5, resultContextBuilder, rexArguments, + true)) { + builder4.add( + Expressions.ifThen( + Expressions.orElse(lowerBoundCanChange, + Expressions.notEqual(endX, prevEnd)), + builder5.toBlock())); + } + + implementResult(aggs, builder4, resultContextBuilder, rexArguments, + false); + + // list.add(new Object[]{xx,xx,xx}); + builder4.add( + Expressions.statement( + Expressions.call( + list_, + BuiltInMethod.COLLECTION_ADD.method, + outputPhysType.record(outputRow)))); + + // for (int i = 0; i < rows.length; i++) { + // build4.body + // } + builder3.add( + Expressions.for_( + Expressions.declare(0, i_, Expressions.constant(0)), + Expressions.lessThan( + i_, + Expressions.field(rows_, "length")), + Expressions.preIncrementAssign(i_), + builder4.toBlock())); + // while(iterator.hasNext()) { + // builder3.body + // } + builder.add( + Expressions.while_( + Expressions.call( + iterator_, + BuiltInMethod.ITERATOR_HAS_NEXT.method), + builder3.toBlock())); + builder.add( + Expressions.statement( + Expressions.call( + collectionExpr, + BuiltInMethod.MAP_CLEAR.method))); + + // We're not assigning to "source". For each group, create a new + // final variable called "source" or "sourceN". + try { + Method method = List.class.getDeclaredMethod("iterator"); + source = + builder.append( + "source", + Expressions.call(list_, + method)); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + + inputPhysType = outputPhysType; + } + builder.add(Expressions.return_(null, source)); + try { + return generateInterface(builder.toBlock()); + } catch (InvocationTargetException | ClassNotFoundException + | InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + public static WindowService generateInterface(BlockStatement methodBody) + throws ClassNotFoundException, InvocationTargetException, + InstantiationException, IllegalAccessException { + List memberDeclarations = new ArrayList<>(); + ParameterExpression source = + Expressions.parameter(Iterator.class, "paramSource"); + memberDeclarations.add(Expressions.methodDecl( + Modifier.PUBLIC, + Iterator.class, + "transform", + Expressions.list(source), + methodBody)); + ClassDeclaration classDeclaration = Expressions.classDecl(Modifier.PUBLIC, + "DingoWindowService", + null, + Collections.singletonList(WindowService.class), + memberDeclarations); + String body = Expressions.toString(classDeclaration.memberDeclarations, "\n", false); + return generateCompileInterface(body); + } + + + public static WindowService generateCompileInterface(String classBody) + throws ClassNotFoundException, InvocationTargetException, + InstantiationException, IllegalAccessException { + ICompilerFactory compilerFactory; + ClassLoader classLoader = + Objects.requireNonNull(DingoWindowVisitFun.class.getClassLoader(), "classLoader"); + try { + compilerFactory = CompilerFactoryFactory.getDefaultCompilerFactory(classLoader); + } catch (Exception e) { + throw new IllegalStateException( + "Unable to instantiate java compiler", e); + } + final ISimpleCompiler compiler = compilerFactory.newSimpleCompiler(); + compiler.setParentClassLoader(classLoader); + String name = "WindowFunction"; + final String s = "public final class " + name + " implements " + + WindowService.class.getCanonicalName() + + " {\n" + + classBody + + "\n" + + "}"; + try { + compiler.cook(s); + } catch (CompileException e) { + throw new RuntimeException(e); + } + return (WindowService) compiler.getClassLoader().loadClass(name) + .getDeclaredConstructors()[0].newInstance(); + } + + private static Pair getRowCollationKey( + BlockBuilder builder, PhysType inputPhysType, + Window.Group group, int windowIdx) { + if (!(group.isRows + || (group.upperBound.isUnbounded() && group.lowerBound.isUnbounded()))) { + org.apache.calcite.util.Pair pair = + inputPhysType.generateCollationKey( + group.collation().getFieldCollations()); + // optimize=false to prevent inlining of object create into for-loops + return Pair.of( + builder.append("keySelector" + windowIdx, pair.getKey(), false), + builder.append("keyComparator" + windowIdx, pair.getValue(), false)); + } else { + return Pair.of(null, null); + } + } + + private static Expression translateBound(RexToLixTranslator translator, + ParameterExpression i_, Expression row_, Expression min_, Expression max_, + Expression rows_, Window.Group group, boolean lower, PhysType physType, + Expression keySelector, Expression keyComparator) { + // for example + // final int end = org.apache.calcite.runtime.BinarySearch + // .upperBound(_rows, row.empid, i, _rows.length - 1, _keySelector0, _keyComparator0); + RexWindowBound bound = lower ? group.lowerBound : group.upperBound; + if (bound.isUnbounded()) { + return bound.isPreceding() ? min_ : max_; + } + if (group.isRows) { + if (bound.isCurrentRow()) { + return i_; + } + RexNode node = bound.getOffset(); + Expression offs; + try { + Method translateMethod1 = RexToLixTranslator.class.getDeclaredMethod("translate", RexNode.class); + offs = (Expression) translateMethod1.invoke(translator, node); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // Floating offset does not make sense since we refer to array index. + // Nulls do not make sense as well. + offs = EnumUtils.convert(offs, int.class); + + Expression b = i_; + if (bound.isFollowing()) { + b = Expressions.add(b, offs); + } else { + b = Expressions.subtract(b, offs); + } + return b; + } + Expression searchLower = min_; + Expression searchUpper = max_; + if (bound.isCurrentRow()) { + if (lower) { + searchUpper = i_; + } else { + searchLower = i_; + } + } + + List fieldCollations = + group.collation().getFieldCollations(); + if (bound.isCurrentRow() && fieldCollations.size() != 1) { + return Expressions.call( + (lower + ? BuiltInMethod.BINARY_SEARCH5_LOWER + : BuiltInMethod.BINARY_SEARCH5_UPPER).method, + rows_, row_, searchLower, searchUpper, + Objects.requireNonNull(keySelector, "keySelector"), + Objects.requireNonNull(keyComparator, "keyComparator")); + } + assert fieldCollations.size() == 1 + : "When using range window specification, ORDER BY should have" + + " exactly one expression." + + " Actual collation is " + group.collation(); + // isRange + int orderKey = + fieldCollations.get(0).getFieldIndex(); + RelDataType keyType = + physType.getRowType().getFieldList().get(orderKey).getType(); + Type desiredKeyType = DingoSqlTypeFactory.INSTANCE.getJavaClass(keyType); + if (bound.getOffset() == null) { + desiredKeyType = Primitive.box(desiredKeyType); + } + Expression val; + + try { + Method translateMethod2 = RexToLixTranslator.class.getDeclaredMethod("translate", RexNode.class, Type.class); + translateMethod2.setAccessible(true); + val = (Expression) translateMethod2.invoke(translator, new RexInputRef(orderKey, keyType), desiredKeyType); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (!bound.isCurrentRow()) { + RexNode node = bound.getOffset(); + Expression offs; + + try { + Method translateMethod1 = RexToLixTranslator.class.getDeclaredMethod("translate", RexNode.class); + translateMethod1.setAccessible(true); + offs = (Expression) translateMethod1.invoke(translator, node); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // TODO: support date + interval somehow + if (bound.isFollowing()) { + val = Expressions.add(val, offs); + } else { + val = Expressions.subtract(val, offs); + } + } + return Expressions.call( + (lower + ? BuiltInMethod.BINARY_SEARCH6_LOWER + : BuiltInMethod.BINARY_SEARCH6_UPPER).method, + rows_, val, searchLower, searchUpper, + Objects.requireNonNull(keySelector, "keySelector"), + Objects.requireNonNull(keyComparator, "keyComparator")); + } + + private static boolean implementResult(List aggs, + final BlockBuilder builder, + final Function frame, + final Function> rexArguments, + boolean cachedBlock) { + boolean nonEmpty = false; + for (final AggImpState agg : aggs) { + boolean needCache = true; + if (agg.implementor instanceof WinAggImplementor) { + WinAggImplementor imp = (WinAggImplementor) agg.implementor; + needCache = imp.needCacheWhenFrameIntact(); + } + if (needCache ^ cachedBlock) { + // Regular aggregates do not change when the windowing frame keeps + // the same. Ths + continue; + } + nonEmpty = true; + Expression res = agg.implementor.implementResult(Objects.requireNonNull(agg.context, "agg.context"), + new WinAggResultContextImpl(builder, Objects.requireNonNull(agg.state, "agg.state"), frame) { + @Override public List rexArguments() { + return rexArguments.apply(agg); + } + }); + // Several count(a) and count(b) might share the result + Expression result = Objects.requireNonNull(agg.result, + () -> "agg.result for " + agg.call); + Expression aggRes = builder.append("a" + agg.aggIdx + "res", + EnumUtils.convert(res, result.getType())); + builder.add( + Expressions.statement(Expressions.assign(result, aggRes))); + } + return nonEmpty; + } + + private static void implementAdd(List aggs, + final BlockBuilder builder7, + final Function frame, + final Function> rexArguments, + final DeclarationStatement jDecl) { + for (final AggImpState agg : aggs) { + final WinAggAddContext addContext = + new WinAggAddContextImpl(builder7, Objects.requireNonNull(agg.state, "agg.state"), frame) { + @Override public Expression currentPosition() { + return jDecl.parameter; + } + + @Override public List rexArguments() { + return rexArguments.apply(agg); + } + + @Override public RexNode rexFilterArgument() { + return null; // REVIEW + } + }; + agg.implementor.implementAdd(Objects.requireNonNull(agg.context, "agg.context"), addContext); + } + } + + private static Function + getBlockBuilderWinAggFrameResultContextFunction( + final JavaTypeFactory typeFactory, final SqlConformance conformance, + final DingoRelResult result, final List translatedConstants, + final Expression comparator_, + final Expression rows_, final ParameterExpression i_, + final Expression startX, final Expression endX, + final Expression minX, final Expression maxX, + final Expression hasRows, final Expression frameRowCount, + final Expression partitionRowCount, + final DeclarationStatement jDecl, + final PhysType inputPhysType) { + return block -> new WinAggFrameResultContext() { + @Override public RexToLixTranslator rowTranslator(Expression rowIndex) { + Expression row = + getRow(rowIndex); + final RexToLixTranslator.InputGetter inputGetter = + new DingoWindow.WindowRelInputGetter(row, inputPhysType, + result.physType.getRowType().getFieldCount(), + translatedConstants); + + return RexToLixTranslator.forAggregation(typeFactory, + block, inputGetter, conformance); + } + + @Override public Expression computeIndex(Expression offset, + WinAggImplementor.SeekType seekType) { + Expression index; + if (seekType == WinAggImplementor.SeekType.AGG_INDEX) { + index = jDecl.parameter; + } else if (seekType == WinAggImplementor.SeekType.SET) { + index = i_; + } else if (seekType == WinAggImplementor.SeekType.START) { + index = startX; + } else if (seekType == WinAggImplementor.SeekType.END) { + index = endX; + } else { + throw new IllegalArgumentException("SeekSet " + seekType + + " is not supported"); + } + if (!Expressions.constant(0).equals(offset)) { + index = block.append("idx", Expressions.add(index, offset)); + } + return index; + } + + private Expression checkBounds(Expression rowIndex, + Expression minIndex, Expression maxIndex) { + if (rowIndex == i_ || rowIndex == startX || rowIndex == endX) { + // No additional bounds check required + return hasRows; + } + + //noinspection UnnecessaryLocalVariable + Expression res = block.append("rowInFrame", + Expressions.foldAnd( + ImmutableList.of(hasRows, + Expressions.greaterThanOrEqual(rowIndex, minIndex), + Expressions.lessThanOrEqual(rowIndex, maxIndex)))); + + return res; + } + + @Override public Expression rowInFrame(Expression rowIndex) { + return checkBounds(rowIndex, startX, endX); + } + + @Override public Expression rowInPartition(Expression rowIndex) { + return checkBounds(rowIndex, minX, maxX); + } + + @Override public Expression compareRows(Expression a, Expression b) { + return Expressions.call(comparator_, + BuiltInMethod.COMPARATOR_COMPARE.method, + getRow(a), getRow(b)); + } + + public Expression getRow(Expression rowIndex) { + return block.append( + "jRow", + EnumUtils.convert( + Expressions.arrayIndex(rows_, rowIndex), + inputPhysType.getJavaRowType())); + } + + @Override public Expression index() { + return i_; + } + + @Override public Expression startIndex() { + return startX; + } + + @Override public Expression endIndex() { + return endX; + } + + @Override public Expression hasRows() { + return hasRows; + } + + @Override public Expression getFrameRowCount() { + return frameRowCount; + } + + @Override public Expression getPartitionRowCount() { + return partitionRowCount; + } + }; + } + + private static void declareAndResetState(final JavaTypeFactory typeFactory, + BlockBuilder builder, final DingoRelResult result, int windowIdx, + List aggs, PhysType outputPhysType, + List outputRow, DingoWindow rel) { + for (final AggImpState agg : aggs) { + agg.context = + new WinAggContext() { + @Override public SqlAggFunction aggregation() { + return agg.call.getAggregation(); + } + + @Override public RelDataType returnRelType() { + return agg.call.type; + } + + @Override public Type returnType() { + return DingoEnumUtils.javaClass(typeFactory, returnRelType()); + } + + @Override public List parameterTypes() { + return DingoEnumUtils.fieldTypes(typeFactory, + parameterRelTypes()); + } + + @Override public List parameterRelTypes() { + return DingoEnumUtils.fieldRowTypes(result.physType.getRowType(), + rel.constants, agg.call.getArgList()); + } + + @Override public List groupSets() { + throw new UnsupportedOperationException(); + } + + @Override public List keyOrdinals() { + throw new UnsupportedOperationException(); + } + + @Override public List keyRelTypes() { + throw new UnsupportedOperationException(); + } + + @Override public List keyTypes() { + throw new UnsupportedOperationException(); + } + }; + String aggName = "a" + agg.aggIdx; + if (CalciteSystemProperty.DEBUG.value()) { + aggName = Util.toJavaId(agg.call.getAggregation().getName(), 0) + .substring("ID$0$".length()) + aggName; + } + List state = agg.implementor.getStateType(agg.context); + final List decls = new ArrayList<>(state.size()); + for (int i = 0; i < state.size(); i++) { + Type type = state.get(i); + ParameterExpression pe = + Expressions.parameter(type, + builder.newName(aggName + + "s" + i + "w" + windowIdx)); + builder.add(Expressions.declare(0, pe, null)); + decls.add(pe); + } + agg.state = decls; + Type aggHolderType = agg.context.returnType(); + Type aggStorageType = + outputPhysType.getJavaFieldType(outputRow.size()); + if (Primitive.is(aggHolderType) && !Primitive.is(aggStorageType)) { + aggHolderType = Primitive.box(aggHolderType); + } + ParameterExpression aggRes = Expressions.parameter(0, + aggHolderType, + builder.newName(aggName + "w" + windowIdx)); + + builder.add( + Expressions.declare(0, aggRes, + Expressions.constant( + Optional.ofNullable(Primitive.of(aggRes.getType())) + .map(x -> x.defaultValue) + .orElse(null), + aggRes.getType()))); + agg.result = aggRes; + outputRow.add(aggRes); + agg.implementor.implementReset(agg.context, + new WinAggResetContextImpl(builder, agg.state, + null, null, null, null, + null, null)); + } + } +} diff --git a/dingo-calcite/src/test/java/io/dingodb/calcite/TestJoin.java b/dingo-calcite/src/test/java/io/dingodb/calcite/TestJoin.java index 704c8b5a3f..a88e76e5a6 100644 --- a/dingo-calcite/src/test/java/io/dingodb/calcite/TestJoin.java +++ b/dingo-calcite/src/test/java/io/dingodb/calcite/TestJoin.java @@ -140,7 +140,7 @@ public void testJoinFilter() throws SqlParseException { Assert.relNode(optimized) .isA(DingoRoot.class).streaming(DingoRelStreaming.ROOT) .soleInput().isA(DingoStreamingConverter.class).streaming(DingoRelStreaming.ROOT) - .soleInput().isA(DingoRelOp.class) + //.soleInput().isA(DingoRelOp.class) .soleInput().isA(DingoRelOp.class) .soleInput().isA(DingoHashJoin.class).prop("joinType", JoinRelType.INNER) .inputNum(2); diff --git a/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java b/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java index 314c28d4ef..bc0f65ddda 100644 --- a/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java +++ b/dingo-driver/host/src/main/java/io/dingodb/driver/DingoDriverParser.java @@ -106,6 +106,7 @@ import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOrderBy; import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.SqlWith; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.type.BasicSqlType; import org.apache.calcite.sql.type.SqlTypeName; @@ -944,6 +945,9 @@ private static boolean checkEngine(SqlNode sqlNode, ITransaction transaction, PlanProfile planProfile, boolean isNewTxn) { + if (sqlNode instanceof SqlWith) { + return true; + } boolean isTxn = false; boolean isNotTransactionTable = false; // for UT test @@ -958,12 +962,14 @@ private static boolean checkEngine(SqlNode sqlNode, Table tableTarget; if (table instanceof RelOptTableImpl) { RelOptTableImpl relOptTable = (RelOptTableImpl) table; - tableTarget = ((DingoTable) relOptTable.table()).getTable(); - engine = tableTarget.getEngine(); - List fullName = relOptTable.getQualifiedName(); - if (fullName.size() == 3) { - name = fullName.get(1) + "." + fullName.get(2); - tableList.add(name); + if (relOptTable.table() instanceof DingoTable) { + tableTarget = ((DingoTable) relOptTable.table()).getTable(); + engine = tableTarget.getEngine(); + List fullName = relOptTable.getQualifiedName(); + if (fullName.size() == 3) { + name = fullName.get(1) + "." + fullName.get(2); + tableList.add(name); + } } } else if (table instanceof DingoRelOptTable) { DingoRelOptTable dingoRelOptTable = (DingoRelOptTable) table; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/OperatorFactory.java b/dingo-exec/src/main/java/io/dingodb/exec/OperatorFactory.java index 58ab783f6c..7b50d785bc 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/OperatorFactory.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/OperatorFactory.java @@ -38,6 +38,7 @@ import io.dingodb.exec.operator.IndexMergeOperator; import io.dingodb.exec.operator.InfoSchemaScanOperator; import io.dingodb.exec.operator.LikeScanOperator; +import io.dingodb.exec.operator.ListTransientScanOperator; import io.dingodb.exec.operator.NewCalcDistributionOperator; import io.dingodb.exec.operator.PartCountOperator; import io.dingodb.exec.operator.PartDeleteOperator; @@ -58,6 +59,7 @@ import io.dingodb.exec.operator.ReduceOperator; import io.dingodb.exec.operator.ReduceRelOpOperator; import io.dingodb.exec.operator.RemovePartOperator; +import io.dingodb.exec.operator.RepeatUnionOperator; import io.dingodb.exec.operator.RootOperator; import io.dingodb.exec.operator.ScanOperator; import io.dingodb.exec.operator.ScanWithCacheOpOperator; @@ -65,6 +67,7 @@ import io.dingodb.exec.operator.SendOperator; import io.dingodb.exec.operator.SortOperator; import io.dingodb.exec.operator.SumUpOperator; +import io.dingodb.exec.operator.TableSpoolOperator; import io.dingodb.exec.operator.TxnDiskAnnBuildOperator; import io.dingodb.exec.operator.TxnDiskAnnCountMemoryOperator; import io.dingodb.exec.operator.TxnDiskAnnLoadOperator; @@ -88,6 +91,7 @@ import io.dingodb.exec.operator.ValuesOperator; import io.dingodb.exec.operator.VectorPartitionOperator; import io.dingodb.exec.operator.VectorPointDistanceOperator; +import io.dingodb.exec.operator.WindowFunctionOperator; import io.dingodb.exec.transaction.operator.CleanCacheOperator; import io.dingodb.exec.transaction.operator.CleanExtraDataCacheOperator; import io.dingodb.exec.transaction.operator.CommitOperator; @@ -123,6 +127,7 @@ import static io.dingodb.exec.utils.OperatorCodeUtils.INDEX_MERGE; import static io.dingodb.exec.utils.OperatorCodeUtils.INFO_SCHEMA_SCAN; import static io.dingodb.exec.utils.OperatorCodeUtils.LIKE_SCAN; +import static io.dingodb.exec.utils.OperatorCodeUtils.LIST_TRANSIENT_SCAN; import static io.dingodb.exec.utils.OperatorCodeUtils.OPTIMISTIC_ROLL_BACK; import static io.dingodb.exec.utils.OperatorCodeUtils.PARTITION; import static io.dingodb.exec.utils.OperatorCodeUtils.PART_COUNT; @@ -146,6 +151,7 @@ import static io.dingodb.exec.utils.OperatorCodeUtils.REDUCE; import static io.dingodb.exec.utils.OperatorCodeUtils.REDUCE_REL_OP; import static io.dingodb.exec.utils.OperatorCodeUtils.REMOVE_PART; +import static io.dingodb.exec.utils.OperatorCodeUtils.REPEAT_UNION; import static io.dingodb.exec.utils.OperatorCodeUtils.ROLL_BACK; import static io.dingodb.exec.utils.OperatorCodeUtils.ROOT; import static io.dingodb.exec.utils.OperatorCodeUtils.SCAN_CACHE; @@ -155,6 +161,7 @@ import static io.dingodb.exec.utils.OperatorCodeUtils.SEND; import static io.dingodb.exec.utils.OperatorCodeUtils.SORT; import static io.dingodb.exec.utils.OperatorCodeUtils.SUM_UP; +import static io.dingodb.exec.utils.OperatorCodeUtils.TABLE_SPOOL; import static io.dingodb.exec.utils.OperatorCodeUtils.TXN_CLEAN_CACHE; import static io.dingodb.exec.utils.OperatorCodeUtils.TXN_CLEAN_EXTRA_DATA_CACHE; import static io.dingodb.exec.utils.OperatorCodeUtils.TXN_DISK_ANN_BUILD; @@ -180,6 +187,7 @@ import static io.dingodb.exec.utils.OperatorCodeUtils.VECTOR_PARTITION; import static io.dingodb.exec.utils.OperatorCodeUtils.VECTOR_POINT_DISTANCE; import static io.dingodb.exec.utils.OperatorCodeUtils.DOCUMENT_PRE_FILTER; +import static io.dingodb.exec.utils.OperatorCodeUtils.WINDOW_FUNCTION; public final class OperatorFactory { @@ -265,6 +273,10 @@ public final class OperatorFactory { OPERATORS.put(TXN_DISK_ANN_LOAD, TxnDiskAnnLoadOperator.INSTANCE); OPERATORS.put(FOR_UPDATE, ForUpdateOperator.INSTANCE); OPERATORS.put(DOCUMENT_SCAN_FILTER, TxnDocumentScanOperator.INSTANCE); + OPERATORS.put(WINDOW_FUNCTION, WindowFunctionOperator.INSTANCE); + OPERATORS.put(REPEAT_UNION, RepeatUnionOperator.INSTANCE); + OPERATORS.put(TABLE_SPOOL, TableSpoolOperator.INSTANCE); + OPERATORS.put(LIST_TRANSIENT_SCAN, ListTransientScanOperator.INSTANCE); } private OperatorFactory() { diff --git a/dingo-exec/src/main/java/io/dingodb/exec/fun/DateAddFun.java b/dingo-exec/src/main/java/io/dingodb/exec/fun/DateAddFun.java index 8085bfb391..b8a0cfb155 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/fun/DateAddFun.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/fun/DateAddFun.java @@ -44,7 +44,7 @@ public OpKey keyOf(@NonNull Type type0, @NonNull Type type1) { @Override public Object evalValue(@NonNull Object value0, @NonNull Object value1, ExprConfig config) { int delta = (int) value1; - delta = Math.negateExact(delta); + //delta = Math.negateExact(delta); if (value0 instanceof Date) { Date date = (Date) value0; Calendar calendar = Calendar.getInstance(); diff --git a/dingo-exec/src/main/java/io/dingodb/exec/impl/SnapshotIterator.java b/dingo-exec/src/main/java/io/dingodb/exec/impl/SnapshotIterator.java new file mode 100644 index 0000000000..bb9190eca1 --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/impl/SnapshotIterator.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.impl; + +import io.dingodb.exec.base.Job; +import io.dingodb.exec.base.JobIterator; + +public class SnapshotIterator extends JobIterator { + + protected SnapshotIterator(Job job) { + super(job); + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Object[] next() { + return new Object[0]; + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/DingoEnumerable.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/DingoEnumerable.java new file mode 100644 index 0000000000..5f32312ca2 --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/DingoEnumerable.java @@ -0,0 +1,119 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator; + +import io.dingodb.exec.base.Job; +import io.dingodb.exec.impl.JobManagerImpl; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class DingoEnumerable implements Iterator { + private static final Object DUMMY = new Object(); + private Object current = (Object) DUMMY; + private boolean seedProcessed = false; + private int currentIteration = 0; + int iterationLimit; + boolean all; + private final Iterator seedEnumerator; + private Iterator iterativeEnumerator = null; + + private Job iterationJob; + + public DingoEnumerable(Iterator seed, Job iterationJob, boolean all, int iterationLimit) { + this.seedEnumerator = seed; + this.iterationJob = iterationJob; + this.all = all; + this.iterationLimit = iterationLimit; + } + + @Override + public boolean hasNext() { + // if we are not done with the seed moveNext on it + while (!seedProcessed) { + if (seedEnumerator.hasNext()) { + Object value = seedEnumerator.next(); + if (checkValue(value)) { + current = value; + return true; + } + } else { + seedProcessed = true; + } + } + + // if we are done with the seed, moveNext on the iterative part + while (true) { + if (iterationLimit >= 0 && currentIteration == iterationLimit) { + // max number of iterations reached, we are done + current = DUMMY; + return false; + } + + Iterator iterativeEnumerator = this.iterativeEnumerator; + if (iterativeEnumerator == null) { + this.iterativeEnumerator = iterativeEnumerator = iterator(iterationJob); + } + + while (iterativeEnumerator.hasNext()) { + Object value = iterativeEnumerator.next(); + if (checkValue(value)) { + current = value; + return true; + } + } + + if (current == DUMMY) { + // current iteration did not return any value, we are done + return false; + } + + // current iteration level (which returned some values) is finished, go to next one + current = DUMMY; + //iterativeEnumerator.close(); + this.iterativeEnumerator = null; + currentIteration++; + } + } + + private boolean checkValue(Object value) { + if (all) { + return true; // no need to check duplicates + } + + // check duplicates +// final Wrapped wrapped = wrapper.apply(value); +// if (!processed.contains(wrapped)) { +// processed.add(wrapped); +// return true; +// } + + return false; + } + + @Override + public Object next() { + if (current == DUMMY) { + throw new NoSuchElementException(); + } + return current; + } + + public Iterator iterator(Job job) { + return JobManagerImpl.INSTANCE.createIterator(job, null); + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/ListTransientScanOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/ListTransientScanOperator.java new file mode 100644 index 0000000000..4fdc8d6d90 --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/ListTransientScanOperator.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator; + +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.operator.params.ListTransientScanParam; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class ListTransientScanOperator extends FilterProjectSourceOperator { + public static ListTransientScanOperator INSTANCE = new ListTransientScanOperator(); + + @Override + protected @NonNull Iterator createSourceIterator(Vertex vertex) { + ListTransientScanParam listTransientScanParam = vertex.getParam(); + return listTransientScanParam.getList().iterator(); + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/RepeatUnionOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/RepeatUnionOperator.java new file mode 100644 index 0000000000..e5a3e202e4 --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/RepeatUnionOperator.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator; + +import io.dingodb.exec.base.Job; +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.fin.Fin; +import io.dingodb.exec.impl.JobManagerImpl; +import io.dingodb.exec.operator.params.RepeatUnionParam; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.Iterator; + +@Slf4j +public class RepeatUnionOperator extends FilterProjectSourceOperator { + + public static RepeatUnionOperator INSTANCE = new RepeatUnionOperator(); + + @Override + protected @NonNull Iterator createSourceIterator(Vertex vertex) { + RepeatUnionParam repeatUnionParam = vertex.getParam(); + Job seedJob = repeatUnionParam.seed; + Job iterationJob = repeatUnionParam.iteration; + + Iterator seedIterator = JobManagerImpl.INSTANCE.createIterator(seedJob, null); + + // seedIterator + // iteration + return new DingoEnumerable(seedIterator, iterationJob, repeatUnionParam.all, repeatUnionParam.iterationLimit); + } + + @Override + public void fin(int pin, Fin fin, Vertex vertex) { + super.fin(pin, fin, vertex); + RepeatUnionParam repeatUnionParam = vertex.getParam(); + Job seedJob = repeatUnionParam.seed; + Job iterationJob = repeatUnionParam.iteration; + JobManagerImpl.INSTANCE.removeJob(seedJob.getJobId()); + JobManagerImpl.INSTANCE.removeJob(iterationJob.getJobId()); + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/ScanOperatorBase.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/ScanOperatorBase.java index e42f8e1316..dc9d8adf5b 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/operator/ScanOperatorBase.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/ScanOperatorBase.java @@ -41,7 +41,7 @@ public abstract class ScanOperatorBase extends SoleOutOperator { @Override public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { - if(vertex.getTask().getStatus() == Status.CANCEL) { + if (vertex.getTask().getStatus() == Status.CANCEL) { throw new TaskCancelException("task is cancel"); } else if (vertex.getTask().getStatus() == Status.STOPPED) { return false; diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/TableSpoolOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/TableSpoolOperator.java new file mode 100644 index 0000000000..3bd95c2daa --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/TableSpoolOperator.java @@ -0,0 +1,44 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator; + +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.fin.Fin; +import io.dingodb.exec.operator.data.Context; +import io.dingodb.exec.operator.params.TableSpoolParam; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class TableSpoolOperator extends SoleOutOperator { + public static TableSpoolOperator INSTANCE = new TableSpoolOperator(); + + @Override + public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { + TableSpoolParam tableSpoolParam = vertex.getParam(); + tableSpoolParam.tempCollection.add(tuple); + vertex.getSoleEdge().transformToNext(tuple); + return true; + } + + @Override + public void fin(int pin, @Nullable Fin fin, Vertex vertex) { + TableSpoolParam tableSpoolParam = vertex.getParam(); + tableSpoolParam.list.clear(); + tableSpoolParam.list.addAll(tableSpoolParam.tempCollection); + tableSpoolParam.tempCollection.clear(); + vertex.getSoleEdge().fin(fin); + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/WindowFunctionOperator.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/WindowFunctionOperator.java new file mode 100644 index 0000000000..a255cc61dd --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/WindowFunctionOperator.java @@ -0,0 +1,70 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator; + +import io.dingodb.common.log.LogUtils; +import io.dingodb.exec.dag.Vertex; +import io.dingodb.exec.fin.Fin; +import io.dingodb.exec.fin.FinWithException; +import io.dingodb.exec.fin.TaskStatus; +import io.dingodb.exec.operator.data.Context; +import io.dingodb.exec.operator.params.WindowFunctionParam; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Iterator; +import java.util.List; + +@Slf4j +public class WindowFunctionOperator extends SoleOutOperator { + public static final WindowFunctionOperator INSTANCE = new WindowFunctionOperator(); + + private WindowFunctionOperator() { + + } + + @Override + public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) { + WindowFunctionParam param = vertex.getParam(); + param.getList().add(tuple); + return true; + } + + @Override + public void fin(int pin, @Nullable Fin fin, Vertex vertex) { + // push next + WindowFunctionParam param = vertex.getParam(); + + try { + Iterator response = param.getWindowService().transform(param.getList().iterator()); + while (response.hasNext()) { + Object[] tuple1 = (Object[]) response.next(); + vertex.getSoleEdge().transformToNext(tuple1); + } + } catch (Exception e) { + LogUtils.error(log, e.getMessage(), e); + TaskStatus taskStatus = new TaskStatus(); + taskStatus.setStatus(false); + taskStatus.setTaskId(vertex.getTask().getId().toString()); + taskStatus.setErrorMsg(e.getMessage()); + vertex.getSoleEdge().fin(FinWithException.of(taskStatus)); + return; + } + // push fin + vertex.getSoleEdge().fin(fin); + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ListTransientScanParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ListTransientScanParam.java new file mode 100644 index 0000000000..78959028ed --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/ListTransientScanParam.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator.params; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonTypeName; +import lombok.Getter; +import org.apache.calcite.schema.impl.ListTransientTable; + +import java.util.Collection; + +@Getter +@JsonTypeName("listTransientScanParam") +@JsonPropertyOrder({"listTransientTable"}) +public class ListTransientScanParam extends FilterProjectSourceParam { + + @JsonProperty("listTransientTable") + Collection list; + + public ListTransientScanParam(ListTransientTable listTransientTable) { + super(null, null, null, 0, null, null, null, 0); + this.list = listTransientTable.getModifiableCollection(); + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/RepeatUnionParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/RepeatUnionParam.java new file mode 100644 index 0000000000..7db50c5dab --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/RepeatUnionParam.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator.params; + +import io.dingodb.common.CommonId; +import io.dingodb.common.type.DingoType; +import io.dingodb.common.type.TupleMapping; +import io.dingodb.exec.base.Job; +import io.dingodb.exec.expr.SqlExpr; + +public class RepeatUnionParam extends FilterProjectSourceParam { + public Job seed; + public Job iteration; + public boolean all; + public int iterationLimit; + + public RepeatUnionParam(Job seedRelNode, Job iterationRelNode, boolean all, int iterationLimit) { + this(null, null, null, 1, null, null, null, 2, seedRelNode, iterationRelNode, all, iterationLimit); + } + + + public RepeatUnionParam(CommonId tableId, + CommonId partId, DingoType schema, int schemaVersion, SqlExpr filter, + TupleMapping selection, TupleMapping keyMapping, int codecVersion, Job seedJob, Job iterationJob, + boolean all, int iterationLimit + ) { + super(tableId, partId, schema, schemaVersion, filter, selection, keyMapping, codecVersion); + this.seed = seedJob; + this.iteration = iterationJob; + this.all = all; + this.iterationLimit = iterationLimit; + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TableSpoolParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TableSpoolParam.java new file mode 100644 index 0000000000..8da91499e8 --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/TableSpoolParam.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator.params; + +import io.dingodb.exec.dag.Vertex; + +import java.util.ArrayList; +import java.util.Collection; + +public class TableSpoolParam extends AbstractParams { + public Collection list; + public Collection tempCollection; + + public TableSpoolParam(Collection list) { + this.list = list; + this.tempCollection = new ArrayList(); + } + + @Override + public void init(Vertex vertex) { + super.init(vertex); + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/operator/params/WindowFunctionParam.java b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/WindowFunctionParam.java new file mode 100644 index 0000000000..63c28b239c --- /dev/null +++ b/dingo-exec/src/main/java/io/dingodb/exec/operator/params/WindowFunctionParam.java @@ -0,0 +1,44 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.exec.operator.params; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.dingodb.tool.api.WindowService; +import lombok.Getter; + +import java.util.ArrayList; +import java.util.List; + +@JsonTypeName("window") +@JsonPropertyOrder({"funName"}) +public class WindowFunctionParam extends AbstractParams { + + @JsonProperty("funName") + String funName; + + @Getter + List list = new ArrayList<>(); + + @Getter + WindowService windowService; + + public WindowFunctionParam(WindowService windowService) { + this.windowService = windowService; + } +} diff --git a/dingo-exec/src/main/java/io/dingodb/exec/utils/OperatorCodeUtils.java b/dingo-exec/src/main/java/io/dingodb/exec/utils/OperatorCodeUtils.java index 0830eb3e47..270a391839 100644 --- a/dingo-exec/src/main/java/io/dingodb/exec/utils/OperatorCodeUtils.java +++ b/dingo-exec/src/main/java/io/dingodb/exec/utils/OperatorCodeUtils.java @@ -43,6 +43,7 @@ public final class OperatorCodeUtils { public static final CommonId CALC_DISTRIBUTION_1 = new CommonId(CommonId.CommonType.OP, SOURCE, 15); public static final CommonId TXN_PART_DOCUMENT = new CommonId(CommonId.CommonType.OP, SOURCE, 16); public static final CommonId DOCUMENT_SCAN_FILTER = new CommonId(CommonId.CommonType.OP, SOURCE, 17); + public static final CommonId LIST_TRANSIENT_SCAN = new CommonId(CommonId.CommonType.OP, SOURCE, 18); // op @@ -121,6 +122,10 @@ public final class OperatorCodeUtils { public static final CommonId TXN_DISK_ANN_BUILD = new CommonId(CommonId.CommonType.OP, SOURCE, 95); public static final CommonId TXN_DISK_ANN_LOAD = new CommonId(CommonId.CommonType.OP, SOURCE, 96); + public static final CommonId WINDOW_FUNCTION = new CommonId(CommonId.CommonType.OP, OP, 97); + public static final CommonId REPEAT_UNION = new CommonId(CommonId.CommonType.OP, SOURCE, 98); + public static final CommonId TABLE_SPOOL = new CommonId(CommonId.CommonType.OP, OP, 99); + private OperatorCodeUtils() { } } diff --git a/dingo-tool-api/src/main/java/io/dingodb/tool/api/WindowService.java b/dingo-tool-api/src/main/java/io/dingodb/tool/api/WindowService.java new file mode 100644 index 0000000000..48456057d4 --- /dev/null +++ b/dingo-tool-api/src/main/java/io/dingodb/tool/api/WindowService.java @@ -0,0 +1,23 @@ +/* + * Copyright 2021 DataCanvas + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dingodb.tool.api; + +import java.util.Iterator; + +public interface WindowService { + Iterator transform(Iterator input); +}