Skip to content

IndexedDB: Add IndexedDB implementation of EventCacheStore::load_last_chunk #5384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions crates/matrix-sdk-indexeddb/src/event_cache_store/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ impl<T> AsyncErrorDeps for T where T: std::error::Error + SendOutsideWasm + Sync
pub enum IndexeddbEventCacheStoreError {
#[error("DomException {name} ({code}): {message}")]
DomException { name: String, message: String, code: u16 },
#[error("chunks contain disjoint lists")]
ChunksContainDisjointLists,
#[error("chunks contain cycle")]
ChunksContainCycle,
#[error("unable to load chunk")]
UnableToLoadChunk,
#[error("no max chunk id")]
NoMaxChunkId,
#[error("transaction: {0}")]
Transaction(#[from] IndexeddbEventCacheStoreTransactionError),
#[error("media store: {0}")]
Expand All @@ -51,23 +59,16 @@ impl From<web_sys::DomException> for IndexeddbEventCacheStoreError {

impl From<IndexeddbEventCacheStoreError> for EventCacheStoreError {
fn from(value: IndexeddbEventCacheStoreError) -> Self {
use IndexeddbEventCacheStoreError::*;

match value {
IndexeddbEventCacheStoreError::DomException { .. } => {
Self::InvalidData { details: value.to_string() }
}
IndexeddbEventCacheStoreError::Transaction(ref inner) => match inner {
IndexeddbEventCacheStoreTransactionError::DomException { .. } => {
Self::InvalidData { details: value.to_string() }
}
IndexeddbEventCacheStoreTransactionError::Serialization(e) => {
Self::Serialization(serde_json::Error::custom(e.to_string()))
}
IndexeddbEventCacheStoreTransactionError::ItemIsNotUnique
| IndexeddbEventCacheStoreTransactionError::ItemNotFound => {
Self::InvalidData { details: value.to_string() }
}
},
IndexeddbEventCacheStoreError::MemoryStore(inner) => inner,
DomException { .. }
| ChunksContainCycle
| ChunksContainDisjointLists
| NoMaxChunkId
| UnableToLoadChunk => Self::InvalidData { details: value.to_string() },
Transaction(inner) => inner.into(),
MemoryStore(inner) => inner,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,73 @@ pub async fn test_linked_chunk_update_is_a_transaction(store: IndexeddbEventCach
assert!(chunks.is_empty());
}

pub async fn test_load_last_chunk(store: IndexeddbEventCacheStore) {
let room_id = &DEFAULT_TEST_ROOM_ID;
let linked_chunk_id = LinkedChunkId::Room(room_id);
let event = |msg: &str| make_test_event(room_id, msg);

// Case #1: no last chunk.
let (last_chunk, chunk_identifier_generator) =
store.load_last_chunk(linked_chunk_id).await.unwrap();
assert!(last_chunk.is_none());
assert_eq!(chunk_identifier_generator.current(), 0);

// Case #2: only one chunk is present.
let updates = vec![
Update::NewItemsChunk { previous: None, new: ChunkIdentifier::new(42), next: None },
Update::PushItems {
at: Position::new(ChunkIdentifier::new(42), 0),
items: vec![event("saucisse de morteau"), event("comté")],
},
];
store.handle_linked_chunk_updates(linked_chunk_id, updates).await.unwrap();

let (last_chunk, chunk_identifier_generator) =
store.load_last_chunk(linked_chunk_id).await.unwrap();
assert_matches!(last_chunk, Some(last_chunk) => {
assert_eq!(last_chunk.identifier, 42);
assert!(last_chunk.previous.is_none());
assert!(last_chunk.next.is_none());
assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
assert_eq!(items.len(), 2);
check_test_event(&items[0], "saucisse de morteau");
check_test_event(&items[1], "comté");
});
});
assert_eq!(chunk_identifier_generator.current(), 42);

// Case #3: more chunks are present.
let updates = vec![
Update::NewItemsChunk {
previous: Some(ChunkIdentifier::new(42)),
new: ChunkIdentifier::new(7),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(7), 0),
items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
},
];
store.handle_linked_chunk_updates(linked_chunk_id, updates).await.unwrap();

let (last_chunk, chunk_identifier_generator) =
store.load_last_chunk(linked_chunk_id).await.unwrap();
assert_matches!(last_chunk, Some(last_chunk) => {
assert_eq!(last_chunk.identifier, 7);
assert_matches!(last_chunk.previous, Some(previous) => {
assert_eq!(previous, 42);
});
assert!(last_chunk.next.is_none());
assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
assert_eq!(items.len(), 3);
check_test_event(&items[0], "fondue");
check_test_event(&items[1], "gruyère");
check_test_event(&items[2], "mont d'or");
});
});
assert_eq!(chunk_identifier_generator.current(), 42);
}

/// Macro for generating tests for IndexedDB implementation of
/// [`EventCacheStore`]
///
Expand Down Expand Up @@ -547,6 +614,13 @@ macro_rules! indexeddb_event_cache_store_integration_tests {
$crate::event_cache_store::integration_tests::test_linked_chunk_update_is_a_transaction(store)
.await
}

#[async_test]
async fn test_load_last_chunk() {
let store = get_event_cache_store().await.expect("Failed to get event cache store");
$crate::event_cache_store::integration_tests::test_load_last_chunk(store)
.await
}
}
};
}
Expand Down
52 changes: 47 additions & 5 deletions crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use web_sys::IdbTransactionMode;
use crate::event_cache_store::{
migrations::current::keys,
serializer::IndexeddbEventCacheStoreSerializer,
transaction::IndexeddbEventCacheStoreTransaction,
transaction::{IndexeddbEventCacheStoreTransaction, IndexeddbEventCacheStoreTransactionError},
types::{ChunkType, InBandEvent},
};

Expand Down Expand Up @@ -300,10 +300,52 @@ impl_event_cache_store! {
(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator),
IndexeddbEventCacheStoreError,
> {
self.memory_store
.load_last_chunk(linked_chunk_id)
.await
.map_err(IndexeddbEventCacheStoreError::MemoryStore)
let linked_chunk_id = linked_chunk_id.to_owned();
let room_id = linked_chunk_id.room_id();
let transaction = self.transaction(
&[keys::LINKED_CHUNKS, keys::EVENTS, keys::GAPS],
IdbTransactionMode::Readonly,
)?;

if transaction.get_chunks_count_in_room(room_id).await? == 0 {
return Ok((None, ChunkIdentifierGenerator::new_from_scratch()));
}
// Now that we know we have some chunks in the room, we query IndexedDB
// for the last chunk in the room by getting the chunk which does not
// have a next chunk.
match transaction.get_chunk_by_next_chunk_id(room_id, &None).await {
Err(IndexeddbEventCacheStoreTransactionError::ItemIsNotUnique) => {
// If there are multiple chunks that do not have a next chunk, that
// means we have more than one last chunk, which means that we have
// more than one list in the room.
Err(IndexeddbEventCacheStoreError::ChunksContainDisjointLists)
}
Err(e) => {
// There was some error querying IndexedDB, but it is not necessarily
// a violation of our data constraints.
Err(e.into())
},
Ok(None) => {
// If there is no chunk without a next chunk, that means every chunk
// points to another chunk, which means that we have a cycle in our list.
Err(IndexeddbEventCacheStoreError::ChunksContainCycle)
},
Ok(Some(last_chunk)) => {
let last_chunk_identifier = ChunkIdentifier::new(last_chunk.identifier);
let last_raw_chunk = transaction
.load_chunk_by_id(room_id, &last_chunk_identifier)
.await?
.ok_or(IndexeddbEventCacheStoreError::UnableToLoadChunk)?;
let max_chunk_id = transaction
.get_max_chunk_by_id(room_id)
.await?
.map(|chunk| ChunkIdentifier::new(chunk.identifier))
.ok_or(IndexeddbEventCacheStoreError::NoMaxChunkId)?;
let generator =
ChunkIdentifierGenerator::new_from_previous_chunk_identifier(max_chunk_id);
Ok((Some(last_raw_chunk), generator))
}
}
}

async fn load_previous_chunk(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

use indexed_db_futures::{prelude::IdbTransaction, IdbQuerySource};
use matrix_sdk_base::{
event_cache::{Event as RawEvent, Gap as RawGap},
event_cache::{store::EventCacheStoreError, Event as RawEvent, Gap as RawGap},
linked_chunk::{ChunkContent, ChunkIdentifier, RawChunk},
};
use ruma::{events::relation::RelationType, OwnedEventId, RoomId};
use serde::{de::DeserializeOwned, Serialize};
use serde::{
de::{DeserializeOwned, Error},
Serialize,
};
use thiserror::Error;
use web_sys::IdbCursorDirection;

Expand Down Expand Up @@ -55,9 +58,19 @@ impl From<web_sys::DomException> for IndexeddbEventCacheStoreTransactionError {

impl From<serde_wasm_bindgen::Error> for IndexeddbEventCacheStoreTransactionError {
fn from(e: serde_wasm_bindgen::Error) -> Self {
Self::Serialization(Box::new(<serde_json::Error as serde::de::Error>::custom(
e.to_string(),
)))
Self::Serialization(Box::new(serde_json::Error::custom(e.to_string())))
}
}

impl From<IndexeddbEventCacheStoreTransactionError> for EventCacheStoreError {
fn from(value: IndexeddbEventCacheStoreTransactionError) -> Self {
use IndexeddbEventCacheStoreTransactionError::*;

match value {
DomException { .. } => Self::InvalidData { details: value.to_string() },
Serialization(e) => Self::Serialization(serde_json::Error::custom(e.to_string())),
ItemIsNotUnique | ItemNotFound => Self::InvalidData { details: value.to_string() },
}
}
}

Expand Down Expand Up @@ -406,6 +419,18 @@ impl<'a> IndexeddbEventCacheStoreTransaction<'a> {
self.get_item_by_key_components::<Chunk, IndexedChunkIdKey>(room_id, chunk_id).await
}

/// Query IndexedDB for chunks such that the next chunk matches the given
/// chunk identifier in the given room. If more than one item is found,
/// an error is returned.
pub async fn get_chunk_by_next_chunk_id(
&self,
room_id: &RoomId,
next_chunk_id: &Option<ChunkIdentifier>,
) -> Result<Option<Chunk>, IndexeddbEventCacheStoreTransactionError> {
self.get_item_by_key_components::<Chunk, IndexedNextChunkIdKey>(room_id, next_chunk_id)
.await
}

/// Query IndexedDB for all chunks in the given room
pub async fn get_chunks_in_room(
&self,
Expand All @@ -414,6 +439,22 @@ impl<'a> IndexeddbEventCacheStoreTransaction<'a> {
self.get_items_in_room::<Chunk, IndexedChunkIdKey>(room_id).await
}

/// Query IndexedDB for the number of chunks in the given room.
pub async fn get_chunks_count_in_room(
&self,
room_id: &RoomId,
) -> Result<usize, IndexeddbEventCacheStoreTransactionError> {
self.get_items_count_in_room::<Chunk, IndexedChunkIdKey>(room_id).await
}

/// Query IndexedDB for the chunk with the maximum key in the given room.
pub async fn get_max_chunk_by_id(
&self,
room_id: &RoomId,
) -> Result<Option<Chunk>, IndexeddbEventCacheStoreTransactionError> {
self.get_max_item_by_key::<Chunk, IndexedChunkIdKey>(room_id).await
}

/// Query IndexedDB for given chunk in given room and additionally query
/// for events or gap, depending on chunk type, in order to construct the
/// full chunk.
Expand Down
Loading