Skip to content

This branch is just instructive of what needs to be removed to get to ~10m inserts per second #772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: tyler/bench-1m-base
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions crates/bench/src/spacetime_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl BenchDatabase for SpacetimeRaw {
self.db.with_auto_commit(&ExecutionContext::default(), |tx| {
let table_def = TableDef::from_product(&name, T::product_type());
let table_id = self.db.create_table(tx, table_def)?;
self.db.rename_table(tx, table_id, &name)?;
// self.db.rename_table(tx, table_id, &name)?;
match index_strategy {
IndexStrategy::Unique => {
self.db
Expand Down Expand Up @@ -105,7 +105,7 @@ impl BenchDatabase for SpacetimeRaw {
.db
.iter_mut(&ctx, tx, *table_id)?
.take(row_count as usize)
.map(|row| row.view().clone())
.map(|row| row.to_product_value())
.collect::<Vec<_>>();

assert_eq!(rows.len(), row_count as usize, "not enough rows found for update_bulk!");
Expand All @@ -118,8 +118,7 @@ impl BenchDatabase for SpacetimeRaw {
.iter_by_col_eq_mut(&ctx, tx, *table_id, ColId(0), row.elements[0].clone())?
.next()
.expect("failed to find row during update!")
.id()
.clone();
.pointer();

assert_eq!(
self.db.delete(tx, *table_id, [id.into()]),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ impl CommitLogMut {
let mut guard = self.odb.lock().unwrap();
for record in &tx_data.records {
if let (DataKey::Hash(_), TxOp::Insert(bytes)) = (&record.key, &record.op) {
guard.add(Vec::clone(bytes));
guard.add(bytes.to_vec());
}
}
}
Expand Down
25 changes: 14 additions & 11 deletions crates/core/src/db/datastore/mem_arch_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ impl CommittedState {
// Likely we want to decide dynamically whether to move a page or copy its contents,
// based on the available holes in the committed state
// and the fullness of the page.
let mut bytes = Vec::with_capacity(1024);

for (table_id, mut tx_table) in insert_tables {
self.with_table_and_blob_store_or_create(
Expand All @@ -405,18 +406,20 @@ impl CommittedState {

for row_ref in tx_table.scan_rows(&tx_blob_store) {
let pv = row_ref.to_product_value();
commit_table
.insert(commit_blob_store, &pv)
.expect("Failed to insert when merging commit");
let bytes = bsatn::to_vec(&pv).expect("Failed to BSATN-serialize ProductValue");
// commit_table
// .insert(commit_blob_store, &pv)
// .expect("Failed to insert when merging commit");
bsatn::to_writer(&mut bytes, &pv).expect("Failed to BSATN-serialize ProductValue");
// let bytes = bsatn::to_vec(&pv).expect("Failed to BSATN-serialize ProductValue");
let data_key = DataKey::from_data(&bytes);
tx_data.records.push(TxRecord {
op: TxOp::Insert(Arc::new(bytes)),
product_value: pv,
key: data_key,
table_name: commit_table.schema.table_name.clone(),
table_id,
});
// tx_data.records.push(TxRecord {
// op: TxOp::Insert(Arc::from(bytes.clone().into_boxed_slice())),
// product_value: pv,
// key: data_key,
// table_name: commit_table.schema.table_name.clone(),
// table_id,
// });
bytes.clear();
}

for (cols, mut index) in std::mem::take(&mut tx_table.indexes) {
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/db/datastore/mem_arch_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,11 @@ impl MutTxId {
// to ensure that `UniqueConstraintViolation` errors have precedence over other errors.
// `tx_table.insert` will later perform the same check on the tx table,
// so this method needs only to check the committed state.
if let Some(commit_table) = commit_table {
commit_table
.check_unique_constraints(row, |maybe_conflict| self.tx_state.is_deleted(table_id, maybe_conflict))
.map_err(IndexError::from)?;
}
// if let Some(commit_table) = commit_table {
// commit_table
// .check_unique_constraints(row, |maybe_conflict| self.tx_state.is_deleted(table_id, maybe_conflict))
// .map_err(IndexError::from)?;
// }

// Get the insert table, so we can write the row into it.
let (tx_table, tx_blob_store, delete_table) = self
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::{system_tables::StTableRow, Result};
/// Inserts report the byte objects they inserted, to be persisted
/// later in an object store.
pub enum TxOp {
Insert(Arc<Vec<u8>>),
Insert(Arc<[u8]>),
Delete,
}

Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,13 +620,13 @@ impl RelationalDB {
self.inner.iter_by_col_range_tx(ctx, tx, table_id.into(), cols, range)
}

#[tracing::instrument(skip(self, tx, row))]
// #[tracing::instrument(skip(self, tx, row))]
pub fn insert(&self, tx: &mut MutTx, table_id: TableId, row: ProductValue) -> Result<ProductValue, DBError> {
#[cfg(feature = "metrics")]
let _guard = DB_METRICS
.rdb_insert_row_time
.with_label_values(&table_id.0)
.start_timer();
// let _guard = DB_METRICS
// .rdb_insert_row_time
// .with_label_values(&table_id.0)
// .start_timer();
self.inner.insert_mut_tx(tx, table_id, row)
}

Expand Down
1 change: 0 additions & 1 deletion crates/sats/src/bsatn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub use ser::Serializer;
pub use crate::buffer::DecodeError;

/// Serialize `value` into the buffered writer `w` in the BSATN format.
#[tracing::instrument(skip_all)]
pub fn to_writer<W: BufWriter, T: Serialize + ?Sized>(w: &mut W, value: &T) -> Result<(), ser::BsatnError> {
value.serialize(Serializer::new(w))
}
Expand Down
102 changes: 63 additions & 39 deletions crates/table/src/bflatn_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,49 +168,73 @@ impl BflatnSerializedRowBuffer<'_> {
let ty_alignment = ty.align();
self.curr_offset = align_to(self.curr_offset, ty_alignment);

match (ty, val) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change didn't really make much of a difference. It can probably be ignored. This function is definitely slow though and takes up quite a lot of time.

// For sums, select the type based on the sum tag,
// write the variant data given the variant type,
// and finally write the tag.
(AlgebraicTypeLayout::Sum(ty), AlgebraicValue::Sum(val)) => self.write_sum(ty, val)?,
// For products, write every element in order.
(AlgebraicTypeLayout::Product(ty), AlgebraicValue::Product(val)) => self.write_product(ty, val)?,

match val {
// For primitive types, write their contents by LE-encoding.
(&AlgebraicTypeLayout::Bool, AlgebraicValue::Bool(val)) => self.write_bool(*val),
// Integer types:
(&AlgebraicTypeLayout::I8, AlgebraicValue::I8(val)) => self.write_i8(*val),
(&AlgebraicTypeLayout::U8, AlgebraicValue::U8(val)) => self.write_u8(*val),
(&AlgebraicTypeLayout::I16, AlgebraicValue::I16(val)) => self.write_i16(*val),
(&AlgebraicTypeLayout::U16, AlgebraicValue::U16(val)) => self.write_u16(*val),
(&AlgebraicTypeLayout::I32, AlgebraicValue::I32(val)) => self.write_i32(*val),
(&AlgebraicTypeLayout::U32, AlgebraicValue::U32(val)) => self.write_u32(*val),
(&AlgebraicTypeLayout::I64, AlgebraicValue::I64(val)) => self.write_i64(*val),
(&AlgebraicTypeLayout::U64, AlgebraicValue::U64(val)) => self.write_u64(*val),
(&AlgebraicTypeLayout::I128, AlgebraicValue::I128(val)) => self.write_i128(*val),
(&AlgebraicTypeLayout::U128, AlgebraicValue::U128(val)) => self.write_u128(*val),
// Float types:
(&AlgebraicTypeLayout::F32, AlgebraicValue::F32(val)) => self.write_f32((*val).into()),
(&AlgebraicTypeLayout::F64, AlgebraicValue::F64(val)) => self.write_f64((*val).into()),

// For strings, we reserve space for a `VarLenRef`
// and push the bytes as a var-len object.
(&AlgebraicTypeLayout::String, AlgebraicValue::String(val)) => self.write_string(val)?,

// For array and maps, we reserve space for a `VarLenRef`
// and push the bytes, after BSATN encoding, as a var-len object.
(AlgebraicTypeLayout::VarLen(VarLenType::Array(_)), val @ AlgebraicValue::Array(_))
| (AlgebraicTypeLayout::VarLen(VarLenType::Map(_)), val @ AlgebraicValue::Map(_)) => {
self.write_av_bsatn(val)?
}

// TODO(error-handling): return type error
(ty, val) => panic!(
"AlgebraicValue is not valid instance of AlgebraicTypeLayout: {:?} should be of type {:?}",
val, ty,
),
AlgebraicValue::Bool(val) => self.write_bool(*val),

AlgebraicValue::I8(val) => self.write_i8(*val),
AlgebraicValue::U8(val) => self.write_u8(*val),
AlgebraicValue::I16(val) => self.write_i16(*val),
AlgebraicValue::U16(val) => self.write_u16(*val),
AlgebraicValue::I32(val) => self.write_i32(*val),
AlgebraicValue::U32(val) => self.write_u32(*val),
AlgebraicValue::I64(val) => self.write_i64(*val),
AlgebraicValue::U64(val) => self.write_u64(*val),
AlgebraicValue::I128(val) => self.write_i128(*val),
AlgebraicValue::U128(val) => self.write_u128(*val),

AlgebraicValue::F32(val) => self.write_f32((*val).into()),
AlgebraicValue::F64(val) => self.write_f64((*val).into()),
AlgebraicValue::String(val) => self.write_string(val)?,
AlgebraicValue::Product(val) => self.write_product(ty.as_product().unwrap(), val)?,
val @ AlgebraicValue::Array(_) => self.write_av_bsatn(val)?,
_ => panic!("at the disco {:?}!", ty)
}

// match (ty, val) {
// // For sums, select the type based on the sum tag,
// // write the variant data given the variant type,
// // and finally write the tag.
// (AlgebraicTypeLayout::Sum(ty), AlgebraicValue::Sum(val)) => self.write_sum(ty, val)?,
// // For products, write every element in order.
// (AlgebraicTypeLayout::Product(ty), AlgebraicValue::Product(val)) => self.write_product(ty, val)?,

// // For primitive types, write their contents by LE-encoding.
// (&AlgebraicTypeLayout::Bool, AlgebraicValue::Bool(val)) => self.write_bool(*val),
// // Integer types:
// (&AlgebraicTypeLayout::I8, AlgebraicValue::I8(val)) => self.write_i8(*val),
// (&AlgebraicTypeLayout::U8, AlgebraicValue::U8(val)) => self.write_u8(*val),
// (&AlgebraicTypeLayout::I16, AlgebraicValue::I16(val)) => self.write_i16(*val),
// (&AlgebraicTypeLayout::U16, AlgebraicValue::U16(val)) => self.write_u16(*val),
// (&AlgebraicTypeLayout::I32, AlgebraicValue::I32(val)) => self.write_i32(*val),
// (&AlgebraicTypeLayout::U32, AlgebraicValue::U32(val)) => self.write_u32(*val),
// (&AlgebraicTypeLayout::I64, AlgebraicValue::I64(val)) => self.write_i64(*val),
// (&AlgebraicTypeLayout::U64, AlgebraicValue::U64(val)) => self.write_u64(*val),
// (&AlgebraicTypeLayout::I128, AlgebraicValue::I128(val)) => self.write_i128(*val),
// (&AlgebraicTypeLayout::U128, AlgebraicValue::U128(val)) => self.write_u128(*val),
// // Float types:
// (&AlgebraicTypeLayout::F32, AlgebraicValue::F32(val)) => self.write_f32((*val).into()),
// (&AlgebraicTypeLayout::F64, AlgebraicValue::F64(val)) => self.write_f64((*val).into()),

// // For strings, we reserve space for a `VarLenRef`
// // and push the bytes as a var-len object.
// (&AlgebraicTypeLayout::String, AlgebraicValue::String(val)) => self.write_string(val)?,

// // For array and maps, we reserve space for a `VarLenRef`
// // and push the bytes, after BSATN encoding, as a var-len object.
// (AlgebraicTypeLayout::VarLen(VarLenType::Array(_)), val @ AlgebraicValue::Array(_))
// | (AlgebraicTypeLayout::VarLen(VarLenType::Map(_)), val @ AlgebraicValue::Map(_)) => {
// self.write_av_bsatn(val)?
// }

// // TODO(error-handling): return type error
// (ty, val) => panic!(
// "AlgebraicValue is not valid instance of AlgebraicTypeLayout: {:?} should be of type {:?}",
// val, ty,
// ),
// }

self.curr_offset = align_to(self.curr_offset, ty_alignment);

Ok(())
Expand Down
12 changes: 11 additions & 1 deletion crates/table/src/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,21 @@ use spacetimedb_sats::{F32, F64};
pub unsafe fn hash_row_in_page(hasher: &mut impl Hasher, page: &Page, fixed_offset: PageOffset, ty: &RowTypeLayout) {
let fixed_bytes = page.get_row_data(fixed_offset, ty.size());

let u8_slice: &[u8] = unsafe {
// Convert &[MaybeUninit<u8>] to &[u8]
std::slice::from_raw_parts(
fixed_bytes.as_ptr() as *const u8,
fixed_bytes.len(),
)
};

hasher.write(u8_slice);

// SAFETY:
// - Per 1. and 2., `fixed_bytes` points at a row in `page` valid for `ty`.
// - Per 3., for any `vlr: VarLenRef` stored in `fixed_bytes`,
// `vlr.first_offset` is either `NULL` or points to a valid granule in `page`.
unsafe { hash_product(hasher, fixed_bytes, page, &mut 0, ty.product()) };
// unsafe { hash_product(hasher, fixed_bytes, page, &mut 0, ty.product()) };
}

/// Hashes every product field in `value = &bytes[range_move(0..ty.size(), *curr_offset)]`
Expand Down
48 changes: 24 additions & 24 deletions crates/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ impl Table {
) -> Result<(RowHash, RowPointer), InsertError> {
// Check unique constraints.
// This error should take precedence over any other potential failures.
self.check_unique_constraints(
row,
// No need to worry about the committed vs tx state dichotomy here;
// just treat all rows in the table as live.
|_| false,
)?;
// self.check_unique_constraints(
// row,
// // No need to worry about the committed vs tx state dichotomy here;
// // just treat all rows in the table as live.
// |_| false,
// )?;

// Optimistically insert the `row` before checking for set-semantic collisions,
// under the assumption that set-semantic collisions are rare.
Expand All @@ -133,31 +133,31 @@ impl Table {
// Ensure row isn't already there.
// SAFETY: We just inserted `ptr`, so we know it's valid.
let hash = unsafe { self.row_hash_for(ptr) };
// Safety:
// We just inserted `ptr` and computed `hash`, so they're valid.
// `self` trivially has the same `row_layout` as `self`.
let existing_row = unsafe { Self::find_same_row(self, self, ptr, hash) };

if let Some(existing_row) = existing_row {
// If an equal row was already present,
// roll back our optimistic insert to avoid violating set semantics.

// SAFETY: we just inserted `ptr`, so it must be valid.
unsafe {
self.pages
.delete_row(&self.visitor_prog, self.row_size(), ptr, blob_store)
};
return Err(InsertError::Duplicate(existing_row));
}
// // Safety:
// // We just inserted `ptr` and computed `hash`, so they're valid.
// // `self` trivially has the same `row_layout` as `self`.
// let existing_row = unsafe { Self::find_same_row(self, self, ptr, hash) };

// if let Some(existing_row) = existing_row {
// // If an equal row was already present,
// // roll back our optimistic insert to avoid violating set semantics.

// // SAFETY: we just inserted `ptr`, so it must be valid.
// unsafe {
// self.pages
// .delete_row(&self.visitor_prog, self.row_size(), ptr, blob_store)
// };
// return Err(InsertError::Duplicate(existing_row));
// }

// If the optimistic insertion was correct,
// i.e. this is not a set-semantic duplicate,
// add it to the `pointer_map`.
self.pointer_map.insert(hash, ptr);
// self.pointer_map.insert(hash, ptr);

// Insert row into indices.
for (cols, index) in self.indexes.iter_mut() {
index.insert(cols, row, ptr).unwrap();
// index.insert(cols, row, ptr).unwrap();
}

Ok((hash, ptr))
Expand Down