@@ -33,12 +33,13 @@ where
33
33
Message ( M , oneshot:: Sender < M > ) ,
34
34
Done ( Result < ( ) , Error > , oneshot:: Sender < ( ) > ) ,
35
35
}
36
- /// A handle that allows message to be sent to a write queue` .
36
+ /// A handle that allows messages to be sent to a write queue.
37
37
pub struct Sender < M >
38
38
where
39
39
M : AsOutputSegments ,
40
40
{
41
41
sender : futures:: channel:: mpsc:: UnboundedSender < Item < M > > ,
42
+ in_flight : std:: sync:: Arc < std:: sync:: atomic:: AtomicI32 > ,
42
43
}
43
44
44
45
impl < M > Clone for Sender < M >
@@ -48,25 +49,33 @@ where
48
49
fn clone ( & self ) -> Self {
49
50
Self {
50
51
sender : self . sender . clone ( ) ,
52
+ in_flight : self . in_flight . clone ( ) ,
51
53
}
52
54
}
53
55
}
54
56
55
- /// Creates a new WriteQueue that wraps the given writer .
57
+ /// Creates a new write queue that wraps the given `AsyncWrite` .
56
58
pub fn write_queue < W , M > ( mut writer : W ) -> ( Sender < M > , impl Future < Output = Result < ( ) , Error > > )
57
59
where
58
60
W : AsyncWrite + Unpin ,
59
61
M : AsOutputSegments ,
60
62
{
61
63
let ( tx, mut rx) = futures:: channel:: mpsc:: unbounded ( ) ;
62
64
63
- let sender = Sender { sender : tx } ;
65
+ let in_flight = std:: sync:: Arc :: new ( std:: sync:: atomic:: AtomicI32 :: new ( 0 ) ) ;
66
+
67
+ let sender = Sender {
68
+ sender : tx,
69
+ in_flight : in_flight. clone ( ) ,
70
+ } ;
64
71
65
72
let queue = async move {
66
73
while let Some ( item) = rx. next ( ) . await {
67
74
match item {
68
75
Item :: Message ( m, returner) => {
69
- crate :: serialize:: write_message ( & mut writer, & m) . await ?;
76
+ let result = crate :: serialize:: write_message ( & mut writer, & m) . await ;
77
+ in_flight. fetch_sub ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) ;
78
+ result?;
70
79
writer. flush ( ) . await ?;
71
80
let _ = returner. send ( m) ;
72
81
}
@@ -96,12 +105,14 @@ where
96
105
oneshot. map_err ( |oneshot:: Canceled | Error :: disconnected ( "WriteQueue has terminated" . into ( ) ) )
97
106
}
98
107
99
- /// Returns the number of messages queued to be written, not including any in-progress write.
100
- pub fn len ( & mut self ) -> usize {
101
- unimplemented ! ( )
108
+ /// Returns the number of messages queued to be written.
109
+ pub fn len ( & self ) -> usize {
110
+ let result = self . in_flight . load ( std:: sync:: atomic:: Ordering :: SeqCst ) ;
111
+ assert ! ( result >= 0 ) ;
112
+ result as usize
102
113
}
103
114
104
- pub fn is_empty ( & mut self ) -> bool {
115
+ pub fn is_empty ( & self ) -> bool {
105
116
self . len ( ) == 0
106
117
}
107
118
0 commit comments