@@ -1151,80 +1151,19 @@ impl EventCacheStore for SqliteEventCacheStore {
1151
1151
1152
1152
let event_id = event_id. to_owned ( ) ;
1153
1153
let filters = filters. map ( ToOwned :: to_owned) ;
1154
- let this = self . clone ( ) ;
1154
+ let store = self . clone ( ) ;
1155
1155
1156
1156
self . acquire ( )
1157
1157
. await ?
1158
1158
. with_transaction ( move |txn| -> Result < _ > {
1159
- let get_rows = |row : & rusqlite:: Row < ' _ > | {
1160
- Ok ( (
1161
- row. get :: < _ , Vec < u8 > > ( 0 ) ?,
1162
- row. get :: < _ , Option < u64 > > ( 1 ) ?,
1163
- row. get :: < _ , Option < usize > > ( 2 ) ?,
1164
- ) )
1165
- } ;
1166
-
1167
- // Collect related events.
1168
- let collect_results = |transaction| {
1169
- let mut related = Vec :: new ( ) ;
1170
-
1171
- for result in transaction {
1172
- let ( event_blob, chunk_id, index) : ( Vec < u8 > , Option < u64 > , _ ) = result?;
1173
-
1174
- let event: Event = serde_json:: from_slice ( & this. decode_value ( & event_blob) ?) ?;
1175
-
1176
- // Only build the position if both the chunk_id and position were present; in
1177
- // theory, they should either be present at the same time, or not at all.
1178
- let pos = chunk_id. zip ( index) . map ( |( chunk_id, index) | {
1179
- Position :: new ( ChunkIdentifier :: new ( chunk_id) , index)
1180
- } ) ;
1181
-
1182
- related. push ( ( event, pos) ) ;
1183
- }
1184
-
1185
- Ok ( related)
1186
- } ;
1187
-
1188
- let related = if let Some ( filters) = compute_filters_string ( filters. as_deref ( ) ) {
1189
- let question_marks = repeat_vars ( filters. len ( ) ) ;
1190
- let query = format ! (
1191
- "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1192
- FROM events
1193
- LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1194
- WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1195
- ) ;
1196
-
1197
- let filters: Vec < _ > = filters. iter ( ) . map ( |f| f. to_sql ( ) . unwrap ( ) ) . collect ( ) ;
1198
- let parameters = params_from_iter (
1199
- [
1200
- hashed_linked_chunk_id. to_sql ( ) . expect ( "We should be able to convert a hashed linked chunk ID to a SQLite value" ) ,
1201
- event_id. as_str ( ) . to_sql ( ) . expect ( "We should be able to convert an event ID to a SQLite value" ) ,
1202
- hashed_room_id. to_sql ( ) . expect ( "We should be able to convert a room ID to a SQLite value" )
1203
- ]
1204
- . into_iter ( )
1205
- . chain ( filters)
1206
- ) ;
1207
-
1208
- let mut transaction = txn. prepare ( & query) ?;
1209
- let transaction = transaction. query_map ( parameters, get_rows) ?;
1210
-
1211
- collect_results ( transaction)
1212
-
1213
- } else {
1214
- let query =
1215
- "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1216
- FROM events
1217
- LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1218
- WHERE relates_to = ? AND room_id = ?" ;
1219
- let parameters = ( hashed_linked_chunk_id, event_id. as_str ( ) , hashed_room_id) ;
1220
-
1221
- let mut transaction = txn. prepare ( query) ?;
1222
- let transaction = transaction. query_map ( parameters, get_rows) ?;
1223
-
1224
- collect_results ( transaction)
1225
- } ;
1226
-
1227
- related
1159
+ find_event_relations_transaction (
1160
+ store,
1161
+ hashed_room_id,
1162
+ hashed_linked_chunk_id,
1163
+ event_id,
1164
+ filters,
1165
+ txn,
1166
+ )
1228
1167
} )
1229
1168
. await
1230
1169
}
@@ -1598,6 +1537,91 @@ impl EventCacheStoreMedia for SqliteEventCacheStore {
1598
1537
}
1599
1538
}
1600
1539
1540
+ fn find_event_relations_transaction (
1541
+ store : SqliteEventCacheStore ,
1542
+ hashed_room_id : Key ,
1543
+ hashed_linked_chunk_id : Key ,
1544
+ event_id : OwnedEventId ,
1545
+ filters : Option < Vec < RelationType > > ,
1546
+ txn : & Transaction < ' _ > ,
1547
+ ) -> Result < Vec < ( Event , Option < Position > ) > > {
1548
+ let get_rows = |row : & rusqlite:: Row < ' _ > | {
1549
+ Ok ( (
1550
+ row. get :: < _ , Vec < u8 > > ( 0 ) ?,
1551
+ row. get :: < _ , Option < u64 > > ( 1 ) ?,
1552
+ row. get :: < _ , Option < usize > > ( 2 ) ?,
1553
+ ) )
1554
+ } ;
1555
+
1556
+ // Collect related events.
1557
+ let collect_results = |transaction| {
1558
+ let mut related = Vec :: new ( ) ;
1559
+
1560
+ for result in transaction {
1561
+ let ( event_blob, chunk_id, index) : ( Vec < u8 > , Option < u64 > , _ ) = result?;
1562
+
1563
+ let event: Event = serde_json:: from_slice ( & store. decode_value ( & event_blob) ?) ?;
1564
+
1565
+ // Only build the position if both the chunk_id and position were present; in
1566
+ // theory, they should either be present at the same time, or not at all.
1567
+ let pos = chunk_id
1568
+ . zip ( index)
1569
+ . map ( |( chunk_id, index) | Position :: new ( ChunkIdentifier :: new ( chunk_id) , index) ) ;
1570
+
1571
+ related. push ( ( event, pos) ) ;
1572
+ }
1573
+
1574
+ Ok ( related)
1575
+ } ;
1576
+
1577
+ let related = if let Some ( filters) = compute_filters_string ( filters. as_deref ( ) ) {
1578
+ let question_marks = repeat_vars ( filters. len ( ) ) ;
1579
+ let query = format ! (
1580
+ "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1581
+ FROM events
1582
+ LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1583
+ WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1584
+ ) ;
1585
+
1586
+ let filters: Vec < _ > = filters. iter ( ) . map ( |f| f. to_sql ( ) . unwrap ( ) ) . collect ( ) ;
1587
+ let parameters = params_from_iter (
1588
+ [
1589
+ hashed_linked_chunk_id. to_sql ( ) . expect (
1590
+ "We should be able to convert a hashed linked chunk ID to a SQLite value" ,
1591
+ ) ,
1592
+ event_id
1593
+ . as_str ( )
1594
+ . to_sql ( )
1595
+ . expect ( "We should be able to convert an event ID to a SQLite value" ) ,
1596
+ hashed_room_id
1597
+ . to_sql ( )
1598
+ . expect ( "We should be able to convert a room ID to a SQLite value" ) ,
1599
+ ]
1600
+ . into_iter ( )
1601
+ . chain ( filters) ,
1602
+ ) ;
1603
+
1604
+ let mut transaction = txn. prepare ( & query) ?;
1605
+ let transaction = transaction. query_map ( parameters, get_rows) ?;
1606
+
1607
+ collect_results ( transaction)
1608
+ } else {
1609
+ let query =
1610
+ "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1611
+ FROM events
1612
+ LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1613
+ WHERE relates_to = ? AND room_id = ?" ;
1614
+ let parameters = ( hashed_linked_chunk_id, event_id. as_str ( ) , hashed_room_id) ;
1615
+
1616
+ let mut transaction = txn. prepare ( query) ?;
1617
+ let transaction = transaction. query_map ( parameters, get_rows) ?;
1618
+
1619
+ collect_results ( transaction)
1620
+ } ;
1621
+
1622
+ related
1623
+ }
1624
+
1601
1625
/// Like `deadpool::managed::Object::with_transaction`, but starts the
1602
1626
/// transaction in immediate (write) mode from the beginning, precluding errors
1603
1627
/// of the kind SQLITE_BUSY from happening, for transactions that may involve
0 commit comments