Skip to content

Commit 275e18a

Browse files
committed
Fix btree_scan
1 parent 71931c8 commit 275e18a

File tree

3 files changed

+77
-24
lines changed

3 files changed

+77
-24
lines changed

crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::{
22
datastore::Result,
33
sequence::{Sequence, SequencesState},
44
state_view::{Iter, IterByColRange, ScanIterByColRange, StateView},
5-
tx_state::{DeleteTable, IndexIdMap, TxState},
5+
tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState},
66
};
77
use crate::{
88
db::{
@@ -30,8 +30,8 @@ use spacetimedb_lib::{
3030
address::Address,
3131
db::auth::{StAccess, StTableType},
3232
};
33-
use spacetimedb_primitives::{ColList, ColSet, IndexId, TableId};
34-
use spacetimedb_sats::{AlgebraicValue, ProductValue};
33+
use spacetimedb_primitives::{ColList, ColSet, TableId};
34+
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue};
3535
use spacetimedb_schema::schema::TableSchema;
3636
use spacetimedb_table::{
3737
blob_store::{BlobStore, HashMapBlobStore},
@@ -469,7 +469,7 @@ impl CommittedState {
469469
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);
470470

471471
// Merge index id fast-lookup map changes.
472-
self.merge_index_map(tx_state.index_id_map, &tx_state.index_id_map_removals);
472+
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());
473473

474474
// If the TX will be logged, record its projected tx offset,
475475
// then increment the counter.
@@ -562,8 +562,8 @@ impl CommittedState {
562562
}
563563
}
564564

565-
fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: &[IndexId]) {
566-
for index_id in index_id_map_removals {
565+
fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) {
566+
for index_id in index_id_map_removals.into_iter().flatten() {
567567
self.index_id_map.remove(index_id);
568568
}
569569
self.index_id_map.extend(index_id_map);
@@ -609,6 +609,13 @@ impl CommittedState {
609609
let blob_store = &mut self.blob_store;
610610
(table, blob_store)
611611
}
612+
613+
/// Returns the table and index associated with the given `table_id` and `col_list`, if any.
614+
pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> {
615+
let table = self.tables.get(&table_id)?;
616+
let index = table.indexes.get(col_list)?;
617+
Some(&index.key_type)
618+
}
612619
}
613620

614621
pub struct CommittedIndexIter<'a> {

crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,10 @@ impl MutTxId {
488488
}
489489
// Remove the `index_id -> (table_id, col_list)` association.
490490
idx_map.remove(&index_id);
491-
self.tx_state.index_id_map_removals.push(index_id);
491+
self.tx_state
492+
.index_id_map_removals
493+
.get_or_insert_with(Default::default)
494+
.insert(index_id);
492495

493496
log::trace!("INDEX DROPPED: {}", index_id);
494497
Ok(())
@@ -530,34 +533,54 @@ impl MutTxId {
530533
.map_err(IndexError::Decode)?;
531534

532535
// Get an index seek iterator for the tx and committed state.
533-
let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds).unwrap();
536+
let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds);
534537
let commit_iter = self.committed_state_write_lock.index_seek(table_id, col_list, &bounds);
535538

536539
// Chain together the indexed rows in the tx and committed state,
537540
// but don't yield rows deleted in the tx state.
538-
enum Choice<A, B, C> {
541+
use itertools::Either::{Left, Right};
542+
// this is gross, but nested `Either`s don't optimize
543+
enum Choice<A, B, C, D, E, F> {
539544
A(A),
540545
B(B),
541546
C(C),
547+
D(D),
548+
E(E),
549+
F(F),
542550
}
543-
impl<T, A: Iterator<Item = T>, B: Iterator<Item = T>, C: Iterator<Item = T>> Iterator for Choice<A, B, C> {
551+
impl<
552+
T,
553+
A: Iterator<Item = T>,
554+
B: Iterator<Item = T>,
555+
C: Iterator<Item = T>,
556+
D: Iterator<Item = T>,
557+
E: Iterator<Item = T>,
558+
F: Iterator<Item = T>,
559+
> Iterator for Choice<A, B, C, D, E, F>
560+
{
544561
type Item = T;
545562
fn next(&mut self) -> Option<Self::Item> {
546563
match self {
547564
Self::A(i) => i.next(),
548565
Self::B(i) => i.next(),
549566
Self::C(i) => i.next(),
567+
Self::D(i) => i.next(),
568+
Self::E(i) => i.next(),
569+
Self::F(i) => i.next(),
550570
}
551571
}
552572
}
553-
let iter = match commit_iter {
554-
None => Choice::A(tx_iter),
555-
Some(commit_iter) => match self.tx_state.delete_tables.get(&table_id) {
556-
None => Choice::B(tx_iter.chain(commit_iter)),
557-
Some(tx_dels) => {
558-
Choice::C(tx_iter.chain(commit_iter.filter(move |row| !tx_dels.contains(&row.pointer()))))
559-
}
560-
},
573+
let commit_iter = commit_iter.map(|commit_iter| match self.tx_state.delete_tables.get(&table_id) {
574+
None => Left(commit_iter),
575+
Some(tx_dels) => Right(commit_iter.filter(move |row| !tx_dels.contains(&row.pointer()))),
576+
});
577+
let iter = match (tx_iter, commit_iter) {
578+
(None, None) => Choice::A(std::iter::empty()),
579+
(Some(tx_iter), None) => Choice::B(tx_iter),
580+
(None, Some(Left(commit_iter))) => Choice::C(commit_iter),
581+
(None, Some(Right(commit_iter))) => Choice::D(commit_iter),
582+
(Some(tx_iter), Some(Left(commit_iter))) => Choice::E(tx_iter.chain(commit_iter)),
583+
(Some(tx_iter), Some(Right(commit_iter))) => Choice::F(tx_iter.chain(commit_iter)),
561584
};
562585
Ok((table_id, iter))
563586
}
@@ -567,16 +590,34 @@ impl MutTxId {
567590
// The order of querying the committed vs. tx state for the translation is not important.
568591
// But it is vastly more likely that it is in the committed state,
569592
// so query that first to avoid two lookups.
570-
let (table_id, col_list) = self
593+
let &(table_id, ref col_list) = self
571594
.committed_state_write_lock
572595
.index_id_map
573596
.get(&index_id)
574597
.or_else(|| self.tx_state.index_id_map.get(&index_id))?;
598+
575599
// The tx state must have the index.
576600
// If the index was e.g., dropped from the tx state but exists physically in the committed state,
577601
// the index does not exist, semantically.
578-
let key_ty = self.tx_state.get_table_and_index_type(*table_id, col_list)?;
579-
Some((*table_id, col_list, key_ty))
602+
// TODO: handle the case where the table has been dropped in this transaction.
603+
let key_ty = if let Some(key_ty) = self
604+
.committed_state_write_lock
605+
.get_table_and_index_type(table_id, col_list)
606+
{
607+
if self
608+
.tx_state
609+
.index_id_map_removals
610+
.as_ref()
611+
.is_some_and(|s| s.contains(&index_id))
612+
{
613+
return None;
614+
}
615+
key_ty
616+
} else {
617+
self.tx_state.get_table_and_index_type(table_id, col_list)?
618+
};
619+
620+
Some((table_id, col_list, key_ty))
580621
}
581622

582623
/// Decode the bounds for a btree scan for an index typed at `key_type`.

crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
use core::ops::RangeBounds;
2-
use spacetimedb_data_structures::map::IntMap;
2+
use spacetimedb_data_structures::map::{IntMap, IntSet};
33
use spacetimedb_primitives::{ColList, IndexId, TableId};
44
use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
55
use spacetimedb_table::{
66
blob_store::{BlobStore, HashMapBlobStore},
77
indexes::{RowPointer, SquashedOffset},
8+
static_assert_size,
89
table::{IndexScanIter, RowRef, Table},
910
};
1011
use std::collections::{btree_map, BTreeMap, BTreeSet};
11-
use thin_vec::ThinVec;
1212

1313
pub(super) type DeleteTable = BTreeSet<RowPointer>;
1414

1515
/// A mapping to find the actual index given an `IndexId`.
1616
pub(super) type IndexIdMap = IntMap<IndexId, (TableId, ColList)>;
17+
pub(super) type RemovedIndexIdSet = IntSet<IndexId>;
1718

1819
/// `TxState` tracks all of the modifications made during a particular transaction.
1920
/// Rows inserted during a transaction will be added to insert_tables, and similarly,
@@ -71,9 +72,13 @@ pub(super) struct TxState {
7172
pub(super) index_id_map: IndexIdMap,
7273

7374
/// Lists all the `IndexId` that are to be removed from `CommittedState::index_id_map`.
74-
pub(super) index_id_map_removals: ThinVec<IndexId>,
75+
// This is in an `Option<Box<>>` to reduce the size of `TxState` - it's very uncommon
76+
// that this would be created.
77+
pub(super) index_id_map_removals: Option<Box<RemovedIndexIdSet>>,
7578
}
7679

80+
static_assert_size!(TxState, 120);
81+
7782
impl TxState {
7883
/// Returns the row count in insert tables
7984
/// and the number of rows deleted from committed state.

0 commit comments

Comments
 (0)