46
46
//! [`Subscriber`], which brings all the tooling to get the current value or the
47
47
//! future values with a stream.
48
48
49
+ mod error;
49
50
mod latest_event;
50
51
51
52
use std:: {
@@ -54,6 +55,7 @@ use std::{
54
55
sync:: Arc ,
55
56
} ;
56
57
58
+ pub use error:: LatestEventsError ;
57
59
use eyeball:: { AsyncLock , Subscriber } ;
58
60
use futures_util:: { select, FutureExt } ;
59
61
use latest_event:: LatestEvent ;
@@ -64,7 +66,7 @@ use tokio::sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard};
64
66
use tracing:: error;
65
67
66
68
use crate :: {
67
- event_cache:: { EventCache , RoomEventCacheGenericUpdate } ,
69
+ event_cache:: { EventCache , EventCacheError , RoomEventCacheGenericUpdate } ,
68
70
send_queue:: SendQueue ,
69
71
} ;
70
72
@@ -126,8 +128,8 @@ impl LatestEvents {
126
128
/// Start listening to updates (if not already) for a particular room.
127
129
///
128
130
/// It returns `true` if the room exists, `false` otherwise.
129
- pub async fn listen_to_room ( & self , room_id : & RoomId ) -> bool {
130
- self . state . registered_rooms . for_room ( room_id) . await . is_some ( )
131
+ pub async fn listen_to_room ( & self , room_id : & RoomId ) -> Result < bool , LatestEventsError > {
132
+ Ok ( self . state . registered_rooms . for_room ( room_id) . await ? . is_some ( ) )
131
133
}
132
134
133
135
/// Start listening to updates (if not already) for a particular room, and
@@ -138,18 +140,24 @@ impl LatestEvents {
138
140
pub async fn listen_and_subscribe_to_room (
139
141
& self ,
140
142
room_id : & RoomId ,
141
- ) -> Option < Subscriber < LatestEventValue , AsyncLock > > {
142
- let latest_event = self . state . registered_rooms . for_room ( room_id) . await ?;
143
+ ) -> Result < Option < Subscriber < LatestEventValue , AsyncLock > > , LatestEventsError > {
144
+ let Some ( latest_event) = self . state . registered_rooms . for_room ( room_id) . await ? else {
145
+ return Ok ( None ) ;
146
+ } ;
143
147
144
- Some ( latest_event. subscribe ( ) . await )
148
+ Ok ( Some ( latest_event. subscribe ( ) . await ) )
145
149
}
146
150
147
151
/// Start listening to updates (if not already) for a particular room and a
148
152
/// particular thread in this room.
149
153
///
150
154
/// It returns `true` if the room and the thread exists, `false` otherwise.
151
- pub async fn listen_to_thread ( & self , room_id : & RoomId , thread_id : & EventId ) -> bool {
152
- self . state . registered_rooms . for_thread ( room_id, thread_id) . await . is_some ( )
155
+ pub async fn listen_to_thread (
156
+ & self ,
157
+ room_id : & RoomId ,
158
+ thread_id : & EventId ,
159
+ ) -> Result < bool , LatestEventsError > {
160
+ Ok ( self . state . registered_rooms . for_thread ( room_id, thread_id) . await ?. is_some ( ) )
153
161
}
154
162
155
163
/// Start listening to updates (if not already) for a particular room and a
@@ -161,10 +169,13 @@ impl LatestEvents {
161
169
& self ,
162
170
room_id : & RoomId ,
163
171
thread_id : & EventId ,
164
- ) -> Option < Subscriber < LatestEventValue , AsyncLock > > {
165
- let latest_event = self . state . registered_rooms . for_thread ( room_id, thread_id) . await ?;
172
+ ) -> Result < Option < Subscriber < LatestEventValue , AsyncLock > > , LatestEventsError > {
173
+ let Some ( latest_event) = self . state . registered_rooms . for_thread ( room_id, thread_id) . await ?
174
+ else {
175
+ return Ok ( None ) ;
176
+ } ;
166
177
167
- Some ( latest_event. subscribe ( ) . await )
178
+ Ok ( Some ( latest_event. subscribe ( ) . await ) )
168
179
}
169
180
170
181
/// Forget a room.
@@ -230,8 +241,8 @@ impl RegisteredRooms {
230
241
& self ,
231
242
room_id : & RoomId ,
232
243
thread_id : Option < & EventId > ,
233
- ) -> Option < RwLockReadGuard < ' _ , RoomLatestEvents > > {
234
- match thread_id {
244
+ ) -> Result < Option < RwLockReadGuard < ' _ , RoomLatestEvents > > , LatestEventsError > {
245
+ Ok ( match thread_id {
235
246
// Get the room latest event with the aim of fetching the latest event for a particular
236
247
// thread.
237
248
//
@@ -243,7 +254,9 @@ impl RegisteredRooms {
243
254
// The `RoomLatestEvents` doesn't exist. Let's create and insert it.
244
255
if rooms. contains_key ( room_id) . not ( ) {
245
256
// Insert the room if it's been successfully created.
246
- if let Some ( room_latest_event) = RoomLatestEvents :: new ( room_id) . await {
257
+ if let Some ( room_latest_event) =
258
+ RoomLatestEvents :: new ( room_id, & self . event_cache ) . await ?
259
+ {
247
260
rooms. insert ( room_id. to_owned ( ) , room_latest_event) ;
248
261
249
262
let _ = self
@@ -281,7 +294,9 @@ impl RegisteredRooms {
281
294
282
295
if rooms. contains_key ( room_id) . not ( ) {
283
296
// Insert the room if it's been successfully created.
284
- if let Some ( room_latest_event) = RoomLatestEvents :: new ( room_id) . await {
297
+ if let Some ( room_latest_event) =
298
+ RoomLatestEvents :: new ( room_id, & self . event_cache ) . await ?
299
+ {
285
300
rooms. insert ( room_id. to_owned ( ) , room_latest_event) ;
286
301
287
302
let _ = self
@@ -295,17 +310,20 @@ impl RegisteredRooms {
295
310
}
296
311
}
297
312
}
298
- }
313
+ } )
299
314
}
300
315
301
316
/// Start listening to updates (if not already) for a particular room, and
302
317
/// fetch the [`LatestEvent`] for this room.
303
318
///
304
319
/// It returns `None` if the room doesn't exist.
305
- pub async fn for_room ( & self , room_id : & RoomId ) -> Option < RwLockReadGuard < ' _ , LatestEvent > > {
306
- self . room_latest_event ( room_id, None ) . await . map ( |lock_guard| {
320
+ pub async fn for_room (
321
+ & self ,
322
+ room_id : & RoomId ,
323
+ ) -> Result < Option < RwLockReadGuard < ' _ , LatestEvent > > , LatestEventsError > {
324
+ Ok ( self . room_latest_event ( room_id, None ) . await ?. map ( |lock_guard| {
307
325
RwLockReadGuard :: map ( lock_guard, |room_latest_event| room_latest_event. for_room ( ) )
308
- } )
326
+ } ) )
309
327
}
310
328
311
329
/// Start listening to updates (if not already) for a particular room, and
@@ -316,13 +334,13 @@ impl RegisteredRooms {
316
334
& self ,
317
335
room_id : & RoomId ,
318
336
thread_id : & EventId ,
319
- ) -> Option < RwLockReadGuard < ' _ , LatestEvent > > {
320
- self . room_latest_event ( room_id, Some ( thread_id) ) . await . and_then ( |lock_guard| {
337
+ ) -> Result < Option < RwLockReadGuard < ' _ , LatestEvent > > , LatestEventsError > {
338
+ Ok ( self . room_latest_event ( room_id, Some ( thread_id) ) . await ? . and_then ( |lock_guard| {
321
339
RwLockReadGuard :: try_map ( lock_guard, |room_latest_event| {
322
340
room_latest_event. for_thread ( thread_id)
323
341
} )
324
342
. ok ( )
325
- } )
343
+ } ) )
326
344
}
327
345
328
346
/// Forget a room.
@@ -383,11 +401,24 @@ struct RoomLatestEvents {
383
401
}
384
402
385
403
impl RoomLatestEvents {
386
- async fn new ( room_id : & RoomId ) -> Option < Self > {
387
- Some ( Self {
388
- for_the_room : Self :: create_latest_event_for ( room_id, None ) . await ?,
389
- per_thread : HashMap :: new ( ) ,
390
- } )
404
+ async fn new (
405
+ room_id : & RoomId ,
406
+ event_cache : & EventCache ,
407
+ ) -> Result < Option < Self > , LatestEventsError > {
408
+ let _room_event_cache = match event_cache. for_room ( room_id) . await {
409
+ // It's fine to drop the `EventCacheDropHandles` here as the caller
410
+ // (`LatestEventState`) owns a clone of the `EventCache`.
411
+ Ok ( ( room_event_cache, _drop_handles) ) => room_event_cache,
412
+ Err ( EventCacheError :: RoomNotFound { .. } ) => return Ok ( None ) ,
413
+ Err ( err) => return Err ( LatestEventsError :: EventCache ( err) ) ,
414
+ } ;
415
+
416
+ let latest_event = match Self :: create_latest_event_for ( room_id, None ) . await {
417
+ Some ( latest_event) => latest_event,
418
+ None => return Ok ( None ) ,
419
+ } ;
420
+
421
+ Ok ( Some ( Self { for_the_room : latest_event, per_thread : HashMap :: new ( ) } ) )
391
422
}
392
423
393
424
#[ allow( clippy:: unused_async) ]
@@ -565,8 +596,8 @@ mod tests {
565
596
assert ! ( latest_events. state. registered_rooms. rooms. read( ) . await . is_empty( ) ) ;
566
597
567
598
// Now let's listen to two rooms.
568
- assert ! ( latest_events. listen_to_room( room_id_0) . await ) ;
569
- assert ! ( latest_events. listen_to_room( room_id_1) . await ) ;
599
+ assert ! ( latest_events. listen_to_room( room_id_0) . await . unwrap ( ) ) ;
600
+ assert ! ( latest_events. listen_to_room( room_id_1) . await . unwrap ( ) ) ;
570
601
571
602
{
572
603
let rooms = latest_events. state . registered_rooms . rooms . read ( ) . await ;
@@ -583,8 +614,8 @@ mod tests {
583
614
}
584
615
585
616
// Now let's listen to one thread respectively for two rooms.
586
- assert ! ( latest_events. listen_to_thread( room_id_1, thread_id_1_0) . await ) ;
587
- assert ! ( latest_events. listen_to_thread( room_id_2, thread_id_2_0) . await ) ;
617
+ assert ! ( latest_events. listen_to_thread( room_id_1, thread_id_1_0) . await . unwrap ( ) ) ;
618
+ assert ! ( latest_events. listen_to_thread( room_id_2, thread_id_2_0) . await . unwrap ( ) ) ;
588
619
589
620
{
590
621
let rooms = latest_events. state . registered_rooms . rooms . read ( ) . await ;
@@ -623,7 +654,7 @@ mod tests {
623
654
let latest_events = client. latest_events ( ) . await ;
624
655
625
656
// Now let's fetch one room.
626
- assert ! ( latest_events. listen_to_room( room_id_0) . await ) ;
657
+ assert ! ( latest_events. listen_to_room( room_id_0) . await . unwrap ( ) ) ;
627
658
628
659
{
629
660
let rooms = latest_events. state . registered_rooms . rooms . read ( ) . await ;
@@ -662,7 +693,7 @@ mod tests {
662
693
let latest_events = client. latest_events ( ) . await ;
663
694
664
695
// Now let's fetch one thread .
665
- assert ! ( latest_events. listen_to_thread( room_id_0, thread_id_0_0) . await ) ;
696
+ assert ! ( latest_events. listen_to_thread( room_id_0, thread_id_0_0) . await . unwrap ( ) ) ;
666
697
667
698
{
668
699
let rooms = latest_events. state . registered_rooms . rooms . read ( ) . await ;
0 commit comments