@@ -16,58 +16,40 @@ use crate::net::session::SessionStream;
16
16
17
17
use tokio:: io:: { AsyncRead , AsyncWrite , ReadBuf } ;
18
18
19
- /// Stream that logs errors to the event channel.
20
19
#[ derive( Debug ) ]
21
- #[ pin_project]
22
- pub ( crate ) struct LoggingStream < S : SessionStream > {
23
- #[ pin]
24
- inner : S ,
25
-
26
- /// Name of the stream to distinguish log messages produced by it.
27
- tag : String ,
28
-
29
- /// Account ID for logging.
30
- account_id : u32 ,
31
-
32
- /// Event channel.
33
- events : Events ,
34
-
20
+ struct ThroughputStats {
35
21
/// Total number of bytes read.
36
- total_read : usize ,
22
+ pub total_read : usize ,
37
23
38
24
/// Number of bytes read since the last flush.
39
- span_read : usize ,
25
+ pub span_read : usize ,
40
26
41
27
/// First timestamp of successful non-zero read.
42
28
///
43
29
/// Reset on flush.
44
- first_read_timestamp : Option < Instant > ,
30
+ pub first_read_timestamp : Option < Instant > ,
45
31
46
32
/// Last non-zero read.
47
- last_read_timestamp : Instant ,
33
+ pub last_read_timestamp : Instant ,
48
34
49
- total_duration : Duration ,
35
+ pub total_duration : Duration ,
50
36
51
37
/// Whether to collect throughput statistics or not.
52
38
///
53
39
/// Disabled when read timeout is disabled,
54
40
/// i.e. when we are in IMAP IDLE.
55
- enable_stats : bool ,
41
+ pub enabled : bool ,
56
42
}
57
43
58
- impl < S : SessionStream > LoggingStream < S > {
59
- pub fn new ( inner : S , tag : String , account_id : u32 , events : Events ) -> Self {
44
+ impl ThroughputStats {
45
+ fn new ( ) -> Self {
60
46
Self {
61
- inner,
62
- tag,
63
- account_id,
64
- events,
65
47
total_read : 0 ,
66
48
span_read : 0 ,
67
49
first_read_timestamp : None ,
68
50
last_read_timestamp : Instant :: now ( ) ,
69
51
total_duration : Duration :: ZERO ,
70
- enable_stats : true
52
+ enabled : false
71
53
}
72
54
}
73
55
@@ -82,6 +64,37 @@ impl<S: SessionStream> LoggingStream<S> {
82
64
}
83
65
}
84
66
67
+ /// Stream that logs errors to the event channel.
68
+ #[ derive( Debug ) ]
69
+ #[ pin_project]
70
+ pub ( crate ) struct LoggingStream < S : SessionStream > {
71
+ #[ pin]
72
+ inner : S ,
73
+
74
+ /// Name of the stream to distinguish log messages produced by it.
75
+ tag : String ,
76
+
77
+ /// Account ID for logging.
78
+ account_id : u32 ,
79
+
80
+ /// Event channel.
81
+ events : Events ,
82
+
83
+ throughput : ThroughputStats ,
84
+ }
85
+
86
+ impl < S : SessionStream > LoggingStream < S > {
87
+ pub fn new ( inner : S , tag : String , account_id : u32 , events : Events ) -> Self {
88
+ Self {
89
+ inner,
90
+ tag,
91
+ account_id,
92
+ events,
93
+ throughput : ThroughputStats :: new ( ) ,
94
+ }
95
+ }
96
+ }
97
+
85
98
impl < S : SessionStream > AsyncRead for LoggingStream < S > {
86
99
fn poll_read (
87
100
self : Pin < & mut Self > ,
@@ -94,14 +107,16 @@ impl<S: SessionStream> AsyncRead for LoggingStream<S> {
94
107
let res = projected. inner . poll_read ( cx, buf) ;
95
108
96
109
let n = old_remaining - buf. remaining ( ) ;
97
- if n > 0 && * projected. enable_stats {
98
- let now = Instant :: now ( ) ;
99
- if projected. first_read_timestamp . is_none ( ) {
100
- * projected. first_read_timestamp = Some ( now) ;
110
+ if n > 0 {
111
+ if projected. throughput . enabled {
112
+ let now = Instant :: now ( ) ;
113
+ if projected. throughput . first_read_timestamp . is_none ( ) {
114
+ projected. throughput . first_read_timestamp = Some ( now) ;
115
+ }
116
+ projected. throughput . last_read_timestamp = now;
117
+
118
+ projected. throughput . span_read = projected. throughput . span_read . saturating_add ( n) ;
101
119
}
102
- * projected. last_read_timestamp = now;
103
-
104
- * projected. span_read = projected. span_read . saturating_add ( n) ;
105
120
106
121
let log_message = format ! ( "{}: READING {}" , projected. tag, n) ;
107
122
projected. events . emit ( Event {
@@ -127,23 +142,22 @@ impl<S: SessionStream> AsyncWrite for LoggingStream<S> {
127
142
self : Pin < & mut Self > ,
128
143
cx : & mut std:: task:: Context < ' _ > ,
129
144
) -> Poll < std:: io:: Result < ( ) > > {
130
- let throughput = self . throughput ( ) ;
131
-
132
145
let projected = self . project ( ) ;
133
- if let Some ( first_read_timestamp) = projected. first_read_timestamp . take ( ) {
134
- let duration = projected. last_read_timestamp . duration_since ( first_read_timestamp) ;
146
+ if let Some ( first_read_timestamp) = projected. throughput . first_read_timestamp . take ( ) {
147
+ let duration = projected. throughput . last_read_timestamp . duration_since ( first_read_timestamp) ;
135
148
136
- * projected. total_read = projected. total_read . saturating_add ( * projected. span_read ) ;
137
- * projected. span_read = 0 ;
138
- * projected. total_duration = projected. total_duration . saturating_add ( duration) ;
149
+ projected. throughput . total_read = projected. throughput . total_read . saturating_add ( projected. throughput . span_read ) ;
150
+ projected. throughput . span_read = 0 ;
151
+ projected. throughput . total_duration = projected. throughput . total_duration . saturating_add ( duration) ;
152
+ }
139
153
140
- let log_message = format ! ( "{}: FLUSH: {} kbps" , projected. tag, throughput * 8e-3 ) ;
154
+ let throughput = projected. throughput . throughput ( ) ;
155
+ let log_message = format ! ( "{}: FLUSH: {} kbps" , projected. tag, throughput * 8e-3 ) ;
141
156
142
- projected. events . emit ( Event {
143
- id : 0 ,
144
- typ : EventType :: Info ( log_message) ,
145
- } ) ;
146
- }
157
+ projected. events . emit ( Event {
158
+ id : 0 ,
159
+ typ : EventType :: Info ( log_message) ,
160
+ } ) ;
147
161
148
162
projected. inner . poll_flush ( cx)
149
163
}
@@ -170,7 +184,7 @@ impl<S: SessionStream> AsyncWrite for LoggingStream<S> {
170
184
171
185
impl < S : SessionStream > SessionStream for LoggingStream < S > {
172
186
fn set_read_timeout ( & mut self , timeout : Option < Duration > ) {
173
- self . enable_stats = timeout. is_some ( ) ;
187
+ self . throughput . enabled = timeout. is_some ( ) ;
174
188
175
189
self . inner . set_read_timeout ( timeout)
176
190
}
0 commit comments