14
14
15
15
use std:: {
16
16
collections:: HashMap ,
17
- pin:: Pin ,
18
- sync:: { Arc , RwLock , Weak } ,
19
- task:: { Context , Poll , Waker } ,
17
+ sync:: { Arc , RwLock } ,
18
+ task:: Waker ,
20
19
} ;
21
20
22
- use futures_core:: Stream ;
23
-
24
21
use super :: { ChunkIdentifier , Position } ;
25
22
26
23
/// Represent the updates that have happened inside a [`LinkedChunk`].
@@ -321,36 +318,42 @@ impl<Item, Gap> UpdatesInner<Item, Gap> {
321
318
322
319
/// A subscriber to [`ObservableUpdates`]. It is helpful to receive updates via
323
320
/// a [`Stream`].
321
+ #[ cfg( test) ]
324
322
pub ( super ) struct UpdatesSubscriber < Item , Gap > {
325
323
/// Weak reference to [`UpdatesInner`].
326
324
///
327
325
/// Using a weak reference allows [`ObservableUpdates`] to be dropped
328
326
/// freely even if a subscriber exists.
329
- updates : Weak < RwLock < UpdatesInner < Item , Gap > > > ,
327
+ updates : std :: sync :: Weak < RwLock < UpdatesInner < Item , Gap > > > ,
330
328
331
329
/// The token to read the updates.
332
330
token : ReaderToken ,
333
331
}
334
332
333
+ #[ cfg( test) ]
335
334
impl < Item , Gap > UpdatesSubscriber < Item , Gap > {
336
335
/// Create a new [`Self`].
337
336
#[ cfg( test) ]
338
- fn new ( updates : Weak < RwLock < UpdatesInner < Item , Gap > > > , token : ReaderToken ) -> Self {
337
+ fn new ( updates : std :: sync :: Weak < RwLock < UpdatesInner < Item , Gap > > > , token : ReaderToken ) -> Self {
339
338
Self { updates, token }
340
339
}
341
340
}
342
341
343
- impl < Item , Gap > Stream for UpdatesSubscriber < Item , Gap >
342
+ #[ cfg( test) ]
343
+ impl < Item , Gap > futures_core:: Stream for UpdatesSubscriber < Item , Gap >
344
344
where
345
345
Item : Clone ,
346
346
Gap : Clone ,
347
347
{
348
348
type Item = Vec < Update < Item , Gap > > ;
349
349
350
- fn poll_next ( self : Pin < & mut Self > , context : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
350
+ fn poll_next (
351
+ self : std:: pin:: Pin < & mut Self > ,
352
+ context : & mut std:: task:: Context < ' _ > ,
353
+ ) -> std:: task:: Poll < Option < Self :: Item > > {
351
354
let Some ( updates) = self . updates . upgrade ( ) else {
352
355
// The `ObservableUpdates` has been dropped. It's time to close this stream.
353
- return Poll :: Ready ( None ) ;
356
+ return std :: task :: Poll :: Ready ( None ) ;
354
357
} ;
355
358
356
359
let mut updates = updates. write ( ) . unwrap ( ) ;
@@ -362,14 +365,15 @@ where
362
365
updates. wakers . push ( context. waker ( ) . clone ( ) ) ;
363
366
364
367
// The stream is pending.
365
- return Poll :: Pending ;
368
+ return std :: task :: Poll :: Pending ;
366
369
}
367
370
368
371
// There is updates! Let's forward them in this stream.
369
- Poll :: Ready ( Some ( the_updates. to_owned ( ) ) )
372
+ std :: task :: Poll :: Ready ( Some ( the_updates. to_owned ( ) ) )
370
373
}
371
374
}
372
375
376
+ #[ cfg( test) ]
373
377
impl < Item , Gap > Drop for UpdatesSubscriber < Item , Gap > {
374
378
fn drop ( & mut self ) {
375
379
// Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
@@ -394,9 +398,10 @@ mod tests {
394
398
} ;
395
399
396
400
use assert_matches:: assert_matches;
401
+ use futures_core:: Stream ;
397
402
use futures_util:: pin_mut;
398
403
399
- use super :: { super :: LinkedChunk , ChunkIdentifier , Position , Stream , UpdatesInner } ;
404
+ use super :: { super :: LinkedChunk , ChunkIdentifier , Position , UpdatesInner } ;
400
405
401
406
#[ test]
402
407
fn test_updates_take_and_garbage_collector ( ) {
0 commit comments