Skip to content

Commit 43eb5dd

Browse files
committed
Lift restriction that multi-col indices only allow = constraints #1317
benches
1 parent 2326db7 commit 43eb5dd

File tree

10 files changed

+816
-504
lines changed

10 files changed

+816
-504
lines changed

crates/core/src/estimation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub fn num_rows(tx: &Tx, expr: &QueryExpr) -> u64 {
1414
pub fn estimate_rows_scanned(tx: &Tx, plan: &PhysicalPlan) -> u64 {
1515
match plan {
1616
PhysicalPlan::TableScan(..) | PhysicalPlan::IxScan(..) => row_estimate(tx, plan),
17+
PhysicalPlan::IxScansAnd(plans) => plans.iter().map(|p| estimate_rows_scanned(tx, p)).sum(),
1718
PhysicalPlan::Filter(input, _) => estimate_rows_scanned(tx, input).saturating_add(row_estimate(tx, input)),
1819
PhysicalPlan::NLJoin(lhs, rhs) => estimate_rows_scanned(tx, lhs)
1920
.saturating_add(estimate_rows_scanned(tx, rhs))
@@ -51,6 +52,7 @@ pub fn row_estimate(tx: &Tx, plan: &PhysicalPlan) -> u64 {
5152
// Use a row limit as the estimate if present
5253
PhysicalPlan::TableScan(TableScan { limit: Some(n), .. }, _)
5354
| PhysicalPlan::IxScan(IxScan { limit: Some(n), .. }, _) => *n,
55+
PhysicalPlan::IxScansAnd(idx) => idx.iter().map(|plan| row_estimate(tx, plan)).sum(),
5456
// Table scans return the number of rows in the table
5557
PhysicalPlan::TableScan(
5658
TableScan {

crates/core/src/sql/compiler.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -507,15 +507,14 @@ Seq Scan on test
507507
db.create_table_for_test("test", schema, indexes)?;
508508

509509
let tx = begin_tx(&db);
510-
// TODO: Need support for index range scans.
511510
expect_query(
512511
&tx,
513512
"select * from test where b > 2",
514513
expect![
515514
r#"
516-
Seq Scan on test
517-
Output: test.a, test.b
518-
-> Filter: (test.b > U64(2))"#
515+
Index Scan using Index test_b_idx_btree (test.b) on test
516+
Index Cond: (test.b > U64(2))
517+
Output: test.a, test.b"#
519518
],
520519
);
521520

@@ -532,15 +531,15 @@ Seq Scan on test
532531
db.create_table_for_test("test", schema, indexes)?;
533532

534533
let tx = begin_tx(&db);
535-
//TODO(sql): Need support for index scans for ranges
536534
expect_query(
537535
&tx,
538536
"select * from test where b > 2 and b < 5",
539537
expect![
540538
r#"
541-
Seq Scan on test
539+
Index Scan using Index test_b_idx_btree (test.b) on test
540+
Index Cond: (test.b > U64(2))
542541
Output: test.a, test.b
543-
-> Filter: (test.b > U64(2) AND test.b < U64(5))"#
542+
-> Filter: (test.b < U64(5))"#
544543
],
545544
);
546545

@@ -557,17 +556,20 @@ Seq Scan on test
557556
db.create_table_for_test("test", schema, indexes)?;
558557

559558
let tx = begin_tx(&db);
560-
// Note, order matters - the equality condition occurs first which
561-
// means an index scan will be generated rather than the range condition.
562559
expect_query(
563560
&tx,
564561
"select * from test where a = 3 and b > 2 and b < 5",
565562
expect![
566563
r#"
567-
Index Scan using Index test_a_idx_btree (test.a) on test
568-
Index Cond: (test.a = U64(3))
564+
Union
565+
-> Index Scan using Index test_a_idx_btree (test.a) on test
566+
Index Cond: (test.a = U64(3))
567+
Output: test.a, test.b
568+
-> Index Scan using Index test_b_idx_btree (test.b) on test
569+
Index Cond: (test.b > U64(2))
570+
Output: test.a, test.b
569571
Output: test.a, test.b
570-
-> Filter: (test.b < U64(5) AND test.b > U64(2))"#
572+
-> Filter: (test.b < U64(5) AND test.a = U64(3) AND test.b > U64(2))"#
571573
],
572574
);
573575

@@ -710,9 +712,9 @@ Hash Join: Lhs
710712
Index Cond: (lhs.a = U64(3))
711713
Output: lhs.a, lhs.b
712714
-> Hash Build: rhs.b
713-
-> Seq Scan on rhs
714-
Output: rhs.b, rhs.c
715-
-> Filter: (rhs.c < U64(4))"#
715+
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
716+
Index Cond: (rhs.c < U64(4))
717+
Output: rhs.b, rhs.c"#
716718
],
717719
);
718720

@@ -749,9 +751,10 @@ Index Join: Rhs on lhs
749751
Inner Unique: false
750752
Join Cond: (rhs.b = lhs.b)
751753
Output: lhs.a, lhs.b
752-
-> Seq Scan on rhs
754+
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
755+
Index Cond: (rhs.c > U64(2))
753756
Output: rhs.b, rhs.c, rhs.d
754-
-> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"#
757+
-> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"#
755758
],
756759
);
757760

crates/core/src/subscription/subscription.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,6 @@ mod tests {
709709
panic!("expected an index join, but got {join:#?}");
710710
};
711711

712-
//TODO(sql): Remove manual checks to just `EXPLAIN` the query.
713712
expect_sub(
714713
&tx,
715714
sql,
@@ -719,9 +718,10 @@ Index Join: Rhs on lhs
719718
Inner Unique: false
720719
Join Cond: (rhs.b = lhs.b)
721720
Output: lhs.a, lhs.b
722-
-> Seq Scan on rhs
721+
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
722+
Index Cond: (rhs.c > U64(2))
723723
Output: rhs.b, rhs.c, rhs.d
724-
-> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"#
724+
-> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"#
725725
],
726726
);
727727

@@ -805,8 +805,7 @@ Index Join: Rhs on lhs
805805
panic!("expected an index join, but got {join:#?}");
806806
};
807807

808-
//TODO(sql): Remove manual checks to just `EXPLAIN` the query.
809-
// Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side'
808+
//TODO(sql): Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side'
810809
expect_sub(
811810
&tx,
812811
sql,
@@ -816,9 +815,10 @@ Index Join: Rhs on lhs
816815
Inner Unique: false
817816
Join Cond: (rhs.b = lhs.b)
818817
Output: lhs.a, lhs.b
819-
-> Seq Scan on rhs
818+
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
819+
Index Cond: (rhs.c > U64(2))
820820
Output: rhs.b, rhs.c, rhs.d
821-
-> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"#
821+
-> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"#
822822
],
823823
);
824824

@@ -908,8 +908,7 @@ Index Join: Rhs on lhs
908908
"expected an index join, but got {src_join:#?}"
909909
);
910910

911-
//TODO(sql): Remove manual checks to just `EXPLAIN` the query.
912-
// Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side'
911+
//TODO(sql): Why this generate same plan than the previous test? 'compile_incremental_index_join_index_side'
913912
expect_sub(
914913
&tx,
915914
sql,
@@ -919,9 +918,10 @@ Index Join: Rhs on lhs
919918
Inner Unique: false
920919
Join Cond: (rhs.b = lhs.b)
921920
Output: lhs.a, lhs.b
922-
-> Seq Scan on rhs
921+
-> Index Scan using Index rhs_c_idx_btree (rhs.c) on rhs
922+
Index Cond: (rhs.c > U64(2))
923923
Output: rhs.b, rhs.c, rhs.d
924-
-> Filter: (rhs.c > U64(2) AND rhs.c < U64(4) AND rhs.d = U64(3))"#
924+
-> Filter: (rhs.c < U64(4) AND rhs.d = U64(3))"#
925925
],
926926
);
927927

crates/execution/src/iter.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub enum Iter<'a> {
7575
Row(RowRefIter<'a>),
7676
Join(LeftDeepJoinIter<'a>),
7777
Filter(Filter<'a, Iter<'a>>),
78+
FilterIdx(FilterIdx<'a>),
7879
}
7980

8081
impl<'a> Iterator for Iter<'a> {
@@ -85,6 +86,7 @@ impl<'a> Iterator for Iter<'a> {
8586
Self::Row(iter) => iter.next().map(Tuple::Row),
8687
Self::Join(iter) => iter.next(),
8788
Self::Filter(iter) => iter.next(),
89+
Self::FilterIdx(iter) => iter.next(),
8890
}
8991
}
9092
}
@@ -96,6 +98,12 @@ impl<'a> Iter<'a> {
9698
{
9799
match plan {
98100
PhysicalPlan::TableScan(..) | PhysicalPlan::IxScan(..) => RowRefIter::build(plan, tx).map(Self::Row),
101+
PhysicalPlan::IxScansAnd(input) => {
102+
let input: Vec<_> = input.iter().map(|plan| Iter::build(plan, tx)).collect::<Result<_>>()?;
103+
Ok(Iter::FilterIdx(FilterIdx {
104+
input: input.into_boxed_slice(),
105+
}))
106+
}
99107
PhysicalPlan::Filter(input, expr) => {
100108
// Build a filter iterator
101109
Iter::build(input, tx)
@@ -1050,3 +1058,17 @@ impl<'a> Iterator for Filter<'a, Iter<'a>> {
10501058
self.input.find(|tuple| self.expr.eval_bool(tuple))
10511059
}
10521060
}
1061+
1062+
/// A tuple-at-a-time filter iterator based on check multiple indexes
1063+
pub struct FilterIdx<'a> {
1064+
input: Box<[Iter<'a>]>,
1065+
}
1066+
1067+
impl<'a> Iterator for FilterIdx<'a> {
1068+
type Item = Tuple<'a>;
1069+
1070+
fn next(&mut self) -> Option<Self::Item> {
1071+
// Find the row that matches all the indexes
1072+
self.input.iter_mut().find_map(|iter| iter.next())
1073+
}
1074+
}

crates/execution/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ impl ProjectField for Row<'_> {
190190
}
191191

192192
/// Each query operator returns a tuple of [RowRef]s
193-
#[derive(Clone)]
193+
#[derive(Clone, PartialEq, Eq, Hash)]
194194
pub enum Tuple<'a> {
195195
/// A pointer to a row in a base table
196196
Row(Row<'a>),

crates/execution/src/pipelined.rs

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ impl PipelinedProject {
186186
pub enum PipelinedExecutor {
187187
TableScan(PipelinedScan),
188188
IxScan(PipelinedIxScan),
189+
IxScansAnd(PipelinedIxScanAnd),
189190
IxJoin(PipelinedIxJoin),
190191
IxDeltaScan(PipelinedIxDeltaScan),
191192
IxDeltaJoin(PipelinedIxDeltaJoin),
@@ -205,6 +206,12 @@ impl From<PhysicalPlan> for PipelinedExecutor {
205206
}),
206207
PhysicalPlan::IxScan(scan @ IxScan { delta: None, .. }, _) => Self::IxScan(scan.into()),
207208
PhysicalPlan::IxScan(scan, _) => Self::IxDeltaScan(scan.into()),
209+
PhysicalPlan::IxScansAnd(scans) => {
210+
// Apply the filter to each scan to form an intersection
211+
Self::IxScansAnd(PipelinedIxScanAnd {
212+
scans: scans.into_iter().map(PipelinedExecutor::from).collect(),
213+
})
214+
}
208215
PhysicalPlan::IxJoin(
209216
IxJoin {
210217
lhs,
@@ -292,7 +299,7 @@ impl PipelinedExecutor {
292299
lhs.visit(f);
293300
rhs.visit(f);
294301
}
295-
Self::TableScan(..) | Self::IxScan(..) | Self::IxDeltaScan(..) => {}
302+
Self::TableScan(..) | Self::IxScan(..) | Self::IxDeltaScan(..) | Self::IxScansAnd(..) => {}
296303
}
297304
}
298305

@@ -301,6 +308,7 @@ impl PipelinedExecutor {
301308
match self {
302309
Self::TableScan(scan) => scan.is_empty(tx),
303310
Self::IxScan(scan) => scan.is_empty(tx),
311+
Self::IxScansAnd(scan) => scan.is_empty(tx),
304312
Self::IxDeltaScan(scan) => scan.is_empty(tx),
305313
Self::IxJoin(join) => join.is_empty(tx),
306314
Self::IxDeltaJoin(join) => join.is_empty(tx),
@@ -320,6 +328,7 @@ impl PipelinedExecutor {
320328
match self {
321329
Self::TableScan(scan) => scan.execute(tx, metrics, f),
322330
Self::IxScan(scan) => scan.execute(tx, metrics, f),
331+
Self::IxScansAnd(scan) => scan.execute(tx, metrics, f),
323332
Self::IxDeltaScan(scan) => scan.execute(tx, metrics, f),
324333
Self::IxJoin(join) => join.execute(tx, metrics, f),
325334
Self::IxDeltaJoin(join) => join.execute(tx, metrics, f),
@@ -567,6 +576,7 @@ pub struct PipelinedIxScan {
567576
pub table_id: TableId,
568577
/// The index id
569578
pub index_id: IndexId,
579+
pub index_is_multi_column: bool,
570580
pub limit: Option<u64>,
571581
/// An equality prefix for multi-column scans
572582
pub prefix: Vec<AlgebraicValue>,
@@ -578,6 +588,12 @@ pub struct PipelinedIxScan {
578588

579589
impl From<IxScan> for PipelinedIxScan {
580590
fn from(scan: IxScan) -> Self {
591+
let index_is_multi_column = scan
592+
.schema
593+
.indexes
594+
.iter()
595+
.find(|i| i.index_id == scan.index_id)
596+
.is_some_and(|index| index.index_algorithm.columns().iter().count() > 1);
581597
match scan {
582598
IxScan {
583599
schema,
@@ -589,6 +605,7 @@ impl From<IxScan> for PipelinedIxScan {
589605
} => Self {
590606
table_id: schema.table_id,
591607
index_id,
608+
index_is_multi_column,
592609
limit,
593610
prefix: prefix.into_iter().map(|(_, v)| v).collect(),
594611
lower: Bound::Included(v.clone()),
@@ -604,6 +621,7 @@ impl From<IxScan> for PipelinedIxScan {
604621
} => Self {
605622
table_id: schema.table_id,
606623
index_id,
624+
index_is_multi_column,
607625
limit,
608626
prefix: prefix.into_iter().map(|(_, v)| v).collect(),
609627
lower,
@@ -672,7 +690,7 @@ impl PipelinedIxScan {
672690
f(t)
673691
};
674692
match self.prefix.as_slice() {
675-
[] => {
693+
[] if !self.index_is_multi_column => {
676694
for ptr in single_col_limit_scan(self.limit.map(|n| n as usize))?
677695
.map(Row::Ptr)
678696
.map(Tuple::Row)
@@ -695,6 +713,66 @@ impl PipelinedIxScan {
695713
}
696714
}
697715

716+
/// A pipelined executor for scanning multiple indexes
717+
#[derive(Debug)]
718+
pub struct PipelinedIxScanAnd {
719+
scans: Vec<PipelinedExecutor>,
720+
}
721+
722+
impl PipelinedIxScanAnd {
723+
/// Does this operation contain an empty scan?
724+
pub fn is_empty(&self, tx: &impl DeltaStore) -> bool {
725+
self.scans.iter().all(|scan| scan.is_empty(tx))
726+
}
727+
728+
/// Executes the pipelined index scans, producing tuples when all
729+
pub fn execute<'a, Tx: Datastore + DeltaStore>(
730+
&self,
731+
tx: &'a Tx,
732+
metrics: &mut ExecutionMetrics,
733+
f: &mut dyn FnMut(Tuple<'a>) -> Result<()>,
734+
) -> Result<()> {
735+
let scans = match self.scans.as_slice() {
736+
[] => return Ok(()), // No scans to execute
737+
[scan] => {
738+
// Single scan, execute directly
739+
return scan.execute(tx, metrics, f);
740+
}
741+
scans => scans,
742+
};
743+
744+
// Execute first scan and materialize all rows
745+
let mut first_rows = HashSet::new();
746+
scans[0].execute(tx, metrics, &mut |t| {
747+
first_rows.insert(t);
748+
Ok(())
749+
})?;
750+
751+
// Intersect with remaining scans
752+
for scan in &scans[1..] {
753+
let mut current_rows = HashSet::new();
754+
scan.execute(tx, metrics, &mut |t| {
755+
if first_rows.contains(&t) {
756+
current_rows.insert(t);
757+
}
758+
Ok(())
759+
})?;
760+
first_rows = current_rows;
761+
if first_rows.is_empty() {
762+
break;
763+
}
764+
}
765+
766+
// Emit results
767+
for t in first_rows {
768+
metrics.rows_scanned += 1;
769+
f(t)?;
770+
}
771+
772+
Ok(())
773+
}
774+
}
775+
698776
/// A pipelined index join executor
699777
#[derive(Debug)]
700778
pub struct PipelinedIxJoin {

0 commit comments

Comments
 (0)