Skip to content

IndexedDB: Add IndexedDB implementation of Event-related functions in EventCacheStore #5414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,34 @@ macro_rules! event_cache_store_integration_tests {
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_remove_room().await;
}

#[async_test]
async fn test_filter_duplicated_events() {
let event_cache_store =
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_filter_duplicated_events().await;
}

#[async_test]
async fn test_find_event() {
let event_cache_store =
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_find_event().await;
}

#[async_test]
async fn test_find_event_relations() {
let event_cache_store =
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_find_event_relations().await;
}

#[async_test]
async fn test_save_event() {
let event_cache_store =
get_event_cache_store().await.unwrap().into_event_cache_store();
event_cache_store.test_save_event().await;
}
}
};
}
86 changes: 68 additions & 18 deletions crates/matrix-sdk-indexeddb/src/event_cache_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ use matrix_sdk_base::{
timer,
};
use ruma::{events::relation::RelationType, EventId, MxcUri, OwnedEventId, RoomId};
use tracing::{instrument, trace};
use tracing::{error, instrument, trace};
use web_sys::IdbTransactionMode;

use crate::event_cache_store::{
migrations::current::keys,
serializer::IndexeddbEventCacheStoreSerializer,
transaction::{IndexeddbEventCacheStoreTransaction, IndexeddbEventCacheStoreTransactionError},
types::{ChunkType, InBandEvent},
types::{ChunkType, InBandEvent, OutOfBandEvent},
};

mod builder;
Expand Down Expand Up @@ -204,7 +204,7 @@ impl_event_cache_store! {

for (i, item) in items.into_iter().enumerate() {
transaction
.add_item(
.put_item(
room_id,
&types::Event::InBand(InBandEvent {
content: item,
Expand Down Expand Up @@ -435,10 +435,24 @@ impl_event_cache_store! {
events: Vec<OwnedEventId>,
) -> Result<Vec<(OwnedEventId, Position)>, IndexeddbEventCacheStoreError> {
let _timer = timer!("method");
self.memory_store
.filter_duplicated_events(linked_chunk_id, events)
.await
.map_err(IndexeddbEventCacheStoreError::MemoryStore)

if events.is_empty() {
return Ok(Vec::new());
}

let linked_chunk_id = linked_chunk_id.to_owned();
let room_id = linked_chunk_id.room_id();
let transaction =
self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
let mut duplicated = Vec::new();
for event_id in events {
if let Some(types::Event::InBand(event)) =
transaction.get_event_by_id(room_id, &event_id).await?
{
duplicated.push((event_id, event.position.into()));
}
}
Ok(duplicated)
}

#[instrument(skip(self, event_id))]
Expand All @@ -448,10 +462,14 @@ impl_event_cache_store! {
event_id: &EventId,
) -> Result<Option<Event>, IndexeddbEventCacheStoreError> {
let _timer = timer!("method");
self.memory_store
.find_event(room_id, event_id)

let transaction =
self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
transaction
.get_event_by_id(room_id, &event_id.to_owned())
.await
.map_err(IndexeddbEventCacheStoreError::MemoryStore)
.map(|ok| ok.map(Into::into))
.map_err(Into::into)
}

#[instrument(skip(self, event_id, filters))]
Expand All @@ -462,10 +480,32 @@ impl_event_cache_store! {
filters: Option<&[RelationType]>,
) -> Result<Vec<(Event, Option<Position>)>, IndexeddbEventCacheStoreError> {
let _timer = timer!("method");
self.memory_store
.find_event_relations(room_id, event_id, filters)
.await
.map_err(IndexeddbEventCacheStoreError::MemoryStore)

let transaction =
self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;

let mut related_events = Vec::new();
match filters {
Some(relation_types) if !relation_types.is_empty() => {
for relation_type in relation_types {
let relation = (event_id.to_owned(), relation_type.clone());
let events = transaction.get_events_by_relation(room_id, &relation).await?;
for event in events {
let position = event.position().map(Into::into);
related_events.push((event.into(), position));
}
}
}
_ => {
for event in
transaction.get_events_by_related_event(room_id, &event_id.to_owned()).await?
{
let position = event.position().map(Into::into);
related_events.push((event.into(), position));
}
}
}
Ok(related_events)
}

#[instrument(skip(self, event))]
Expand All @@ -475,10 +515,20 @@ impl_event_cache_store! {
event: Event,
) -> Result<(), IndexeddbEventCacheStoreError> {
let _timer = timer!("method");
self.memory_store
.save_event(room_id, event)
.await
.map_err(IndexeddbEventCacheStoreError::MemoryStore)

let Some(event_id) = event.event_id() else {
error!(%room_id, "Trying to save an event with no ID");
return Ok(());
};
let transaction =
self.transaction(&[keys::EVENTS], IdbTransactionMode::Readwrite)?;
let event = match transaction.get_event_by_id(room_id, &event_id).await? {
Some(mut inner) => inner.set_content(event),
None => types::Event::OutOfBand(OutOfBandEvent { content: event, position: () }),
};
transaction.put_event(room_id, &event).await?;
transaction.commit().await?;
Ok(())
}

#[instrument(skip_all)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,23 @@ pub type IndexedEventPositionIndex = usize;
#[derive(Debug, Serialize, Deserialize)]
pub struct IndexedEventRelationKey(IndexedRoomId, IndexedEventId, IndexedRelationType);

impl IndexedEventRelationKey {
/// Sets the related event field of this key. This is helpful
/// when searching for all events which are related to the given
/// event.
pub fn set_related_event_id(
&self,
related_event_id: &OwnedEventId,
serializer: &IndexeddbSerializer,
) -> Self {
let room_id = self.0.clone();
let related_event_id =
serializer.encode_key_as_string(keys::EVENTS_RELATION_RELATED_EVENTS, related_event_id);
let relation_type = self.2.clone();
Self(room_id, related_event_id, relation_type)
}
}

impl IndexedKey<Event> for IndexedEventRelationKey {
const INDEX: Option<&'static str> = Some(keys::EVENTS_RELATION);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,17 @@ impl<'a> IndexeddbEventCacheStoreTransaction<'a> {
self.delete_items_in_room::<Chunk, IndexedChunkIdKey>(room_id).await
}

/// Query IndexedDB for events that match the given event id in the given
/// room. If more than one item is found, an error is returned.
pub async fn get_event_by_id(
&self,
room_id: &RoomId,
event_id: &OwnedEventId,
) -> Result<Option<Event>, IndexeddbEventCacheStoreTransactionError> {
let key = self.serializer.encode_key(room_id, event_id);
self.get_item_by_key::<Event, IndexedEventIdKey>(room_id, key).await
}

/// Query IndexedDB for events in the given position range in the given
/// room.
pub async fn get_events_by_position(
Expand Down Expand Up @@ -620,6 +631,32 @@ impl<'a> IndexeddbEventCacheStoreTransaction<'a> {
self.get_events_count_by_position(room_id, range).await
}

/// Query IndexedDB for events that match the given relation range in the
/// given room.
pub async fn get_events_by_relation(
&self,
room_id: &RoomId,
range: impl Into<IndexedKeyRange<&(OwnedEventId, RelationType)>>,
) -> Result<Vec<Event>, IndexeddbEventCacheStoreTransactionError> {
let range = range.into().encoded(room_id, self.serializer.inner());
self.get_items_by_key::<Event, IndexedEventRelationKey>(room_id, range).await
}

/// Query IndexedDB for events that are related to the given event in the
/// given room.
pub async fn get_events_by_related_event(
&self,
room_id: &RoomId,
related_event_id: &OwnedEventId,
) -> Result<Vec<Event>, IndexeddbEventCacheStoreTransactionError> {
let lower = IndexedEventRelationKey::lower_key(room_id, self.serializer.inner())
.set_related_event_id(related_event_id, self.serializer.inner());
let upper = IndexedEventRelationKey::upper_key(room_id, self.serializer.inner())
.set_related_event_id(related_event_id, self.serializer.inner());
let range = IndexedKeyRange::Bound(lower, upper);
self.get_items_by_key::<Event, IndexedEventRelationKey>(room_id, range).await
}

/// Puts an event in the given room. If an event with the same key already
/// exists, it will be overwritten.
pub async fn put_event(
Expand Down
9 changes: 9 additions & 0 deletions crates/matrix-sdk-indexeddb/src/event_cache_store/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ impl Event {
Event::OutOfBand(e) => e.relation(),
}
}

/// Sets the content of the underlying [`GenericEvent`]
pub fn set_content(mut self, content: TimelineEvent) -> Self {
match self {
Event::InBand(ref mut i) => i.content = content,
Event::OutOfBand(ref mut o) => o.content = content,
}
self
}
}

/// A generic representation of an
Expand Down
Loading