Skip to content

Commit 4e51ad5

Browse files
authored
First part of Comet fixes (#179)
1 parent bb9750e commit 4e51ad5

File tree

6 files changed

+51
-13
lines changed

6 files changed

+51
-13
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.biodatageeks.sequila.apps
2+
3+
import org.apache.spark.sql.types.StructType
4+
5+
object ITApp extends App with SequilaApp {
6+
override def main(args: Array[String]): Unit = {
7+
8+
val ss = createSequilaSession()
9+
ss.sqlContext.setConf("spark.biodatageeks.rangejoin.useJoinOrder", "true")
10+
11+
val chainRn4_chr1 = ss
12+
.read
13+
.option("delimiter", "\t")
14+
.option("header", true)
15+
.schema(StructType.fromDDL("column0 string, column1 int, column2 int"))
16+
.csv("/Users/mwiewior/CLionProjects/sequila-native/sandbox/chainRn4_chr1.csv")
17+
18+
chainRn4_chr1.createOrReplaceTempView("chainRn4_chr1")
19+
val chainVicPac2 = ss
20+
.read
21+
.option("delimiter", "\t")
22+
.option("header", true)
23+
.schema(StructType.fromDDL("column0 string, column1 int, column2 int"))
24+
.csv("/Users/mwiewior/CLionProjects/sequila-native/sandbox/chainVicPac2_chr1.csv")
25+
chainVicPac2.createOrReplaceTempView("chainVicPac2_chr1")
26+
ss.time{
27+
ss
28+
.sql("select count(*) from chainRn4_chr1 a, chainVicPac2_chr1 b where (a.column0=b.column0 and a.column2>=b.column1 and a.column1<=b.column2);")
29+
.show()
30+
}
31+
ss.stop()
32+
}
33+
}

src/main/scala/org/biodatageeks/sequila/rangejoins/common/ExtractRangeJoinKeysWithEquality.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ import org.apache.spark.sql.catalyst.plans.Inner
1111
*/
1212

1313
object ExtractRangeJoinKeysWithEquality extends Logging with PredicateHelper {
14+
15+
//joinType, rangeJoinKeys, left, right, condition
16+
//(JoinType, Seq[Expression], LogicalPlan, LogicalPlan)
1417
type ReturnType =
15-
(JoinType, Seq[Expression], LogicalPlan, LogicalPlan)
18+
(JoinType, Seq[Expression], LogicalPlan, LogicalPlan, Option[Expression])
1619
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
1720
case join @ Join(left, right, joinType, condition, hint) =>
1821
logDebug(s"Considering join on: $condition")
@@ -24,21 +27,20 @@ object ExtractRangeJoinKeysWithEquality extends Logging with PredicateHelper {
2427
if (condition.size!=0 && joinType == Inner) {
2528
condition.head match {
2629
case And(And(EqualTo(l3, r3), LessThanOrEqual(l1, g1)), LessThanOrEqual(l2, g2)) =>
27-
Some((joinType,
28-
getKeys(l1, l2, g1, g2, l3, r3, left, right),
29-
left, right))
30+
val rangeJoinKeys = getKeys(l1, l2, g1, g2, l3, r3, left, right)
31+
Some((joinType, rangeJoinKeys, left, right, condition))
3032
case And(And(EqualTo(l3, r3), GreaterThanOrEqual(g1, l1)), LessThanOrEqual(l2, g2)) =>
3133
Some((joinType,
3234
getKeys(l1, l2, g1, g2, l3, r3, left, right),
33-
left, right))
35+
left, right,condition))
3436
case And(And(EqualTo(l3, r3), LessThanOrEqual(l1, g1)), GreaterThanOrEqual(g2, l2)) =>
3537
Some((joinType,
3638
getKeys(l1, l2, g1, g2, l3, r3, left, right),
37-
left, right))
39+
left, right, condition))
3840
case And(And(EqualTo(l3, r3), GreaterThanOrEqual(g1, l1)), GreaterThanOrEqual(g2, l2)) =>
3941
Some((joinType,
4042
getKeys(l1, l2, g1, g2, l3, r3, left, right),
41-
left, right))
43+
left, right, condition))
4244
case _ => None
4345
}
4446
} else {

src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinOptimChromosome.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
26+
import org.apache.spark.sql.catalyst.plans.JoinType
2627
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2728
import org.apache.spark.sql.execution.{SparkPlan, _}
2829
import org.apache.spark.sql.internal.SQLConf
@@ -36,7 +37,9 @@ case class IntervalTreeJoinOptimChromosome(left: SparkPlan,
3637
context: SparkSession,
3738
minOverlap: Int, maxGap: Int,
3839
useJoinOrder: Boolean,
39-
intervalHolderClassName: String
40+
intervalHolderClassName: String,
41+
conditionExact: Option[Expression],
42+
joinType: JoinType
4043
) extends BinaryExecNode with Serializable {
4144
@transient lazy val output = left.output ++ right.output
4245

src/main/scala/org/biodatageeks/sequila/rangejoins/methods/IntervalTree/IntervalTreeJoinStrategyOptim.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class IntervalTreeJoinStrategyOptim(spark: SparkSession) extends Strategy with S
2121
plan match {
2222
case ExtractRangeJoinKeys(joinType, rangeJoinKeys, left, right) =>
2323
IntervalTreeJoinOptim(planLater(left), planLater(right), rangeJoinKeys, spark,left,right,intervalHolderClassName) :: Nil
24-
case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right) => {
24+
case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right, condition) => {
2525
val minOverlap = spark.sqlContext.getConf(InternalParams.minOverlap,"1")
2626
val maxGap = spark.sqlContext.getConf(InternalParams.maxGap,"0")
2727
val useJoinOrder = spark.sqlContext.getConf(InternalParams.useJoinOrder,"false")
@@ -40,7 +40,7 @@ class IntervalTreeJoinStrategyOptim(spark: SparkSession) extends Strategy with S
4040
minOverlap.toInt,
4141
maxGap.toInt,
4242
useJoinOrder.toBoolean,
43-
intervalHolderClassName) :: Nil
43+
intervalHolderClassName, condition, joinType) :: Nil
4444
}
4545
case _ =>
4646
Nil

src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinChromosome.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ case class
1414
IntervalTreeJoinChromosome(left: SparkPlan,
1515
right: SparkPlan,
1616
condition: Seq[Expression],
17-
context: SparkSession) extends BinaryExecNode {
17+
context: SparkSession, conditionExact: Option[Expression]) extends BinaryExecNode {
1818
def output = left.output ++ right.output
1919

2020
lazy val (buildPlan, streamedPlan) = (left, right)

src/main/scala/org/biodatageeks/sequila/rangejoins/methods/genApp/IntervalTreeJoinStrategy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ class IntervalTreeJoinStrategy(spark: SparkSession) extends Strategy with Serial
1111
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
1212
case ExtractRangeJoinKeys(joinType, rangeJoinKeys, left, right) =>
1313
IntervalTreeJoin(planLater(left), planLater(right), rangeJoinKeys, spark) :: Nil
14-
case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right) =>
15-
IntervalTreeJoinChromosome(planLater(left), planLater(right), rangeJoinKeys, spark) :: Nil
14+
case ExtractRangeJoinKeysWithEquality(joinType, rangeJoinKeys, left, right,condition) =>
15+
IntervalTreeJoinChromosome(planLater(left), planLater(right), rangeJoinKeys, spark, condition) :: Nil
1616
case _ =>
1717
Nil
1818
}

0 commit comments

Comments
 (0)