1
- use async_broadcast:: { broadcast, InactiveReceiver , Sender as Broadcaster } ;
1
+ use async_broadcast:: { broadcast, Receiver as BroadcastReceiver , Sender as Broadcaster } ;
2
2
use async_channel:: { bounded, Receiver , Sender } ;
3
3
use async_executor:: Executor ;
4
4
#[ cfg( feature = "internal-executor" ) ]
@@ -31,8 +31,9 @@ use zvariant::ObjectPath;
31
31
use futures_core:: { stream, Future } ;
32
32
use futures_sink:: Sink ;
33
33
use futures_util:: {
34
+ future:: { select, Either } ,
34
35
sink:: SinkExt ,
35
- stream:: { select as stream_select , StreamExt } ,
36
+ stream:: StreamExt ,
36
37
} ;
37
38
38
39
use crate :: {
@@ -128,12 +129,6 @@ struct ConnectionInner<S> {
128
129
// Message receiver task
129
130
msg_receiver_task : sync:: Mutex < Option < Task < ( ) > > > ,
130
131
131
- // We're using sync Mutex here as we don't intend to keep it locked while awaiting.
132
- msg_receiver : sync:: RwLock < InactiveReceiver < Arc < Message > > > ,
133
-
134
- // Receiver side of the error channel
135
- error_receiver : Receiver < Error > ,
136
-
137
132
signal_subscriptions : Mutex < HashMap < u64 , SignalSubscription > > ,
138
133
}
139
134
@@ -223,8 +218,7 @@ impl MessageReceiverTask<Box<dyn Socket>> {
223
218
///
224
219
/// Unlike [`zbus::Connection`], there is no direct async equivalent of
225
220
/// [`zbus::Connection::receive_message`] method provided. This is because the `futures` crate
226
- /// already provides a nice rich API that makes use of the [`stream::Stream`] implementation that is
227
- /// returned by [`Connection::stream`] method.
221
+ /// already provides a nice rich API that makes use of the [`stream::Stream`] implementation.
228
222
///
229
223
/// # Caveats
230
224
///
@@ -283,7 +277,7 @@ impl MessageReceiverTask<Box<dyn Socket>> {
283
277
/// )
284
278
/// .await?;
285
279
///
286
- /// while let Some(msg) = connection.stream().await. try_next().await? {
280
+ /// while let Some(msg) = connection.try_next().await? {
287
281
/// println!("Got message: {}", msg);
288
282
/// }
289
283
///
@@ -307,6 +301,11 @@ impl MessageReceiverTask<Box<dyn Socket>> {
307
301
#[ derive( Clone , Debug ) ]
308
302
pub struct Connection {
309
303
inner : Arc < ConnectionInner < Box < dyn Socket > > > ,
304
+
305
+ msg_receiver : BroadcastReceiver < Arc < Message > > ,
306
+
307
+ // Receiver side of the error channel
308
+ error_receiver : Receiver < Error > ,
310
309
}
311
310
312
311
assert_impl_all ! ( Connection : Send , Sync , Unpin ) ;
@@ -366,22 +365,6 @@ impl Connection {
366
365
Self :: new ( auth, false ) . await
367
366
}
368
367
369
- /// Get a stream to receive incoming messages.
370
- pub async fn stream ( & self ) -> MessageStream {
371
- let msg_receiver = self
372
- . inner
373
- . msg_receiver
374
- . read ( )
375
- // SAFETY: Not much we can do about a poisoned mutex.
376
- . expect ( "poisoned lock" )
377
- . activate_cloned ( )
378
- . map ( Ok ) ;
379
- let error_stream = self . inner . error_receiver . clone ( ) . map ( Err ) ;
380
- let stream = stream_select ( error_stream, msg_receiver) . boxed ( ) ;
381
-
382
- MessageStream { stream }
383
- }
384
-
385
368
/// Send `msg` to the peer.
386
369
///
387
370
/// Unlike our [`Sink`] implementation, this method sets a unique (to this connection) serial
@@ -415,7 +398,7 @@ impl Connection {
415
398
B : serde:: ser:: Serialize + zvariant:: Type ,
416
399
E : Into < MessageError > ,
417
400
{
418
- let stream = self . stream ( ) . await ;
401
+ let stream = self . clone ( ) ;
419
402
let m = Message :: method (
420
403
self . unique_name ( ) ,
421
404
destination,
@@ -544,11 +527,7 @@ impl Connection {
544
527
545
528
/// Max number of messages to queue.
546
529
pub fn max_queued ( & self ) -> usize {
547
- self . inner
548
- . msg_receiver
549
- . read ( )
550
- . expect ( "poisoned lock" )
551
- . capacity ( )
530
+ self . msg_receiver . capacity ( )
552
531
}
553
532
554
533
/// Set the max number of messages to queue.
@@ -576,12 +555,8 @@ impl Connection {
576
555
/// // Do something useful with `conn`..
577
556
///# Ok::<_, Box<dyn Error + Send + Sync>>(())
578
557
/// ```
579
- pub fn set_max_queued ( self , max : usize ) -> Self {
580
- self . inner
581
- . msg_receiver
582
- . write ( )
583
- . expect ( "poisoned lock" )
584
- . set_capacity ( max) ;
558
+ pub fn set_max_queued ( mut self , max : usize ) -> Self {
559
+ self . msg_receiver . set_capacity ( max) ;
585
560
586
561
self
587
562
}
@@ -742,8 +717,6 @@ impl Connection {
742
717
// ourselves in parallel to making the method call.
743
718
#[ cfg( not( feature = "internal-executor" ) ) ]
744
719
let name = {
745
- use futures_util:: future:: { select, Either } ;
746
-
747
720
let executor = self . inner . executor . clone ( ) ;
748
721
let ticking_future = async move {
749
722
// Keep running as long as this task/future is not cancelled.
@@ -785,7 +758,6 @@ impl Connection {
785
758
786
759
let ( mut msg_sender, msg_receiver) = broadcast ( DEFAULT_MAX_QUEUED ) ;
787
760
msg_sender. set_overflow ( true ) ;
788
- let msg_receiver = msg_receiver. deactivate ( ) ;
789
761
let ( error_sender, error_receiver) = bounded ( 1 ) ;
790
762
let executor = Arc :: new ( Executor :: new ( ) ) ;
791
763
let raw_in_conn = Arc :: new ( Mutex :: new ( auth. conn ) ) ;
@@ -796,17 +768,17 @@ impl Connection {
796
768
. spawn ( & executor) ;
797
769
798
770
let connection = Self {
771
+ error_receiver,
772
+ msg_receiver,
799
773
inner : Arc :: new ( ConnectionInner {
800
774
raw_in_conn,
801
775
sink,
802
- error_receiver,
803
776
server_guid : auth. server_guid ,
804
777
cap_unix_fd,
805
778
bus_conn : bus_connection,
806
779
serial : AtomicU32 :: new ( 1 ) ,
807
780
unique_name : OnceCell :: new ( ) ,
808
781
signal_subscriptions : Mutex :: new ( HashMap :: new ( ) ) ,
809
- msg_receiver : sync:: RwLock :: new ( msg_receiver) ,
810
782
executor : executor. clone ( ) ,
811
783
msg_receiver_task : sync:: Mutex :: new ( Some ( msg_receiver_task) ) ,
812
784
} ) ,
@@ -942,20 +914,19 @@ impl Sink<Message> for Connection {
942
914
}
943
915
}
944
916
945
- /// A [`stream::Stream`] implementation that yields [`Message`] items.
946
- ///
947
- /// Use [`Connection::stream`] to create an instance of this type.
948
- pub struct MessageStream {
949
- stream : stream:: BoxStream < ' static , Result < Arc < Message > > > ,
950
- }
951
-
952
- assert_impl_all ! ( MessageStream : Send , Unpin ) ;
953
-
954
- impl stream:: Stream for MessageStream {
917
+ impl stream:: Stream for Connection {
955
918
type Item = Result < Arc < Message > > ;
956
919
957
920
fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
958
- stream:: Stream :: poll_next ( self . get_mut ( ) . stream . as_mut ( ) , cx)
921
+ let stream = self . get_mut ( ) ;
922
+ let msg_fut = stream. msg_receiver . next ( ) ;
923
+ let err_fut = stream. error_receiver . next ( ) ;
924
+ let mut select_fut = select ( msg_fut, err_fut) ;
925
+
926
+ match futures_core:: ready!( Pin :: new( & mut select_fut) . poll( cx) ) {
927
+ Either :: Left ( ( msg, _) ) => Poll :: Ready ( msg. map ( Ok ) ) ,
928
+ Either :: Right ( ( error, _) ) => Poll :: Ready ( error. map ( Err ) ) ,
929
+ }
959
930
}
960
931
}
961
932
@@ -1016,13 +987,11 @@ mod tests {
1016
987
let server = Connection :: new_unix_server ( p0, & guid) ;
1017
988
let client = Connection :: new_unix_client ( p1, false ) ;
1018
989
1019
- let ( client_conn, server_conn) = futures_util:: try_join!( client, server) ?;
1020
- let mut client_stream = client_conn. stream ( ) . await ;
1021
- let mut server_stream = server_conn. stream ( ) . await ;
990
+ let ( mut client_conn, mut server_conn) = futures_util:: try_join!( client, server) ?;
1022
991
1023
992
let server_future = async {
1024
993
let mut method: Option < Arc < Message > > = None ;
1025
- while let Some ( m) = server_stream . try_next ( ) . await ? {
994
+ while let Some ( m) = server_conn . try_next ( ) . await ? {
1026
995
if m. to_string ( ) == "Method call Test" {
1027
996
method. replace ( m) ;
1028
997
@@ -1044,7 +1013,7 @@ mod tests {
1044
1013
. await ?;
1045
1014
assert_eq ! ( reply. to_string( ) , "Method return" ) ;
1046
1015
// Check we didn't miss the signal that was sent during the call.
1047
- let m = client_stream . try_next ( ) . await ?. unwrap ( ) ;
1016
+ let m = client_conn . try_next ( ) . await ?. unwrap ( ) ;
1048
1017
assert_eq ! ( m. to_string( ) , "Signal ASignalForYou" ) ;
1049
1018
reply. body :: < String > ( ) . map_err ( |e| e. into ( ) )
1050
1019
} ;
0 commit comments