diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs
index 72ee4804755..c909d95dd3c 100644
--- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs
+++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs
@@ -2,7 +2,7 @@ use super::{
datastore::Result,
sequence::{Sequence, SequencesState},
state_view::{Iter, IterByColRange, ScanIterByColRange, StateView},
- tx_state::{DeleteTable, IndexIdMap, TxState},
+ tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState},
};
use crate::{
db::{
@@ -30,8 +30,8 @@ use spacetimedb_lib::{
address::Address,
db::auth::{StAccess, StTableType},
};
-use spacetimedb_primitives::{ColList, ColSet, IndexId, TableId};
-use spacetimedb_sats::{AlgebraicValue, ProductValue};
+use spacetimedb_primitives::{ColList, ColSet, TableId};
+use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
@@ -469,7 +469,7 @@ impl CommittedState {
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);
// Merge index id fast-lookup map changes.
- self.merge_index_map(tx_state.index_id_map, &tx_state.index_id_map_removals);
+ self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());
// If the TX will be logged, record its projected tx offset,
// then increment the counter.
@@ -562,8 +562,8 @@ impl CommittedState {
}
}
- fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: &[IndexId]) {
- for index_id in index_id_map_removals {
+ fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) {
+ for index_id in index_id_map_removals.into_iter().flatten() {
self.index_id_map.remove(index_id);
}
self.index_id_map.extend(index_id_map);
@@ -609,6 +609,13 @@ impl CommittedState {
let blob_store = &mut self.blob_store;
(table, blob_store)
}
+
+ /// Returns the table and index associated with the given `table_id` and `col_list`, if any.
+ pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> {
+ let table = self.tables.get(&table_id)?;
+ let index = table.indexes.get(col_list)?;
+ Some(&index.key_type)
+ }
}
pub struct CommittedIndexIter<'a> {
diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
index ef16e21cd91..d69b10619a2 100644
--- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
+++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
@@ -488,7 +488,10 @@ impl MutTxId {
}
// Remove the `index_id -> (table_id, col_list)` association.
idx_map.remove(&index_id);
- self.tx_state.index_id_map_removals.push(index_id);
+ self.tx_state
+ .index_id_map_removals
+ .get_or_insert_with(Default::default)
+ .insert(index_id);
log::trace!("INDEX DROPPED: {}", index_id);
Ok(())
@@ -530,34 +533,54 @@ impl MutTxId {
.map_err(IndexError::Decode)?;
// Get an index seek iterator for the tx and committed state.
- let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds).unwrap();
+ let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds);
let commit_iter = self.committed_state_write_lock.index_seek(table_id, col_list, &bounds);
// Chain together the indexed rows in the tx and committed state,
// but don't yield rows deleted in the tx state.
- enum Choice {
+ use itertools::Either::{Left, Right};
+ // this is gross, but nested `Either`s don't optimize
+ enum Choice {
A(A),
B(B),
C(C),
+ D(D),
+ E(E),
+ F(F),
}
- impl, B: Iterator- , C: Iterator
- > Iterator for Choice {
+ impl<
+ T,
+ A: Iterator
- ,
+ B: Iterator
- ,
+ C: Iterator
- ,
+ D: Iterator
- ,
+ E: Iterator
- ,
+ F: Iterator
- ,
+ > Iterator for Choice
+ {
type Item = T;
fn next(&mut self) -> Option {
match self {
Self::A(i) => i.next(),
Self::B(i) => i.next(),
Self::C(i) => i.next(),
+ Self::D(i) => i.next(),
+ Self::E(i) => i.next(),
+ Self::F(i) => i.next(),
}
}
}
- let iter = match commit_iter {
- None => Choice::A(tx_iter),
- Some(commit_iter) => match self.tx_state.delete_tables.get(&table_id) {
- None => Choice::B(tx_iter.chain(commit_iter)),
- Some(tx_dels) => {
- Choice::C(tx_iter.chain(commit_iter.filter(move |row| !tx_dels.contains(&row.pointer()))))
- }
- },
+ let commit_iter = commit_iter.map(|commit_iter| match self.tx_state.delete_tables.get(&table_id) {
+ None => Left(commit_iter),
+ Some(tx_dels) => Right(commit_iter.filter(move |row| !tx_dels.contains(&row.pointer()))),
+ });
+ let iter = match (tx_iter, commit_iter) {
+ (None, None) => Choice::A(std::iter::empty()),
+ (Some(tx_iter), None) => Choice::B(tx_iter),
+ (None, Some(Left(commit_iter))) => Choice::C(commit_iter),
+ (None, Some(Right(commit_iter))) => Choice::D(commit_iter),
+ (Some(tx_iter), Some(Left(commit_iter))) => Choice::E(tx_iter.chain(commit_iter)),
+ (Some(tx_iter), Some(Right(commit_iter))) => Choice::F(tx_iter.chain(commit_iter)),
};
Ok((table_id, iter))
}
@@ -567,16 +590,34 @@ impl MutTxId {
// The order of querying the committed vs. tx state for the translation is not important.
// But it is vastly more likely that it is in the committed state,
// so query that first to avoid two lookups.
- let (table_id, col_list) = self
+ let &(table_id, ref col_list) = self
.committed_state_write_lock
.index_id_map
.get(&index_id)
.or_else(|| self.tx_state.index_id_map.get(&index_id))?;
+
// The tx state must have the index.
// If the index was e.g., dropped from the tx state but exists physically in the committed state,
// the index does not exist, semantically.
- let key_ty = self.tx_state.get_table_and_index_type(*table_id, col_list)?;
- Some((*table_id, col_list, key_ty))
+ // TODO: handle the case where the table has been dropped in this transaction.
+ let key_ty = if let Some(key_ty) = self
+ .committed_state_write_lock
+ .get_table_and_index_type(table_id, col_list)
+ {
+ if self
+ .tx_state
+ .index_id_map_removals
+ .as_ref()
+ .is_some_and(|s| s.contains(&index_id))
+ {
+ return None;
+ }
+ key_ty
+ } else {
+ self.tx_state.get_table_and_index_type(table_id, col_list)?
+ };
+
+ Some((table_id, col_list, key_ty))
}
/// Decode the bounds for a btree scan for an index typed at `key_type`.
diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs
index b993bb3c088..5b04d76eed0 100644
--- a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs
+++ b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs
@@ -1,19 +1,20 @@
use core::ops::RangeBounds;
-use spacetimedb_data_structures::map::IntMap;
+use spacetimedb_data_structures::map::{IntMap, IntSet};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
+ static_assert_size,
table::{IndexScanIter, RowRef, Table},
};
use std::collections::{btree_map, BTreeMap, BTreeSet};
-use thin_vec::ThinVec;
pub(super) type DeleteTable = BTreeSet;
/// A mapping to find the actual index given an `IndexId`.
pub(super) type IndexIdMap = IntMap;
+pub(super) type RemovedIndexIdSet = IntSet;
/// `TxState` tracks all of the modifications made during a particular transaction.
/// Rows inserted during a transaction will be added to insert_tables, and similarly,
@@ -71,9 +72,13 @@ pub(super) struct TxState {
pub(super) index_id_map: IndexIdMap,
/// Lists all the `IndexId` that are to be removed from `CommittedState::index_id_map`.
- pub(super) index_id_map_removals: ThinVec,
+ // This is in an `Option>` to reduce the size of `TxState` - it's very uncommon
+ // that this would be created.
+ pub(super) index_id_map_removals: Option>,
}
+static_assert_size!(TxState, 120);
+
impl TxState {
/// Returns the row count in insert tables
/// and the number of rows deleted from committed state.