@@ -49,17 +49,17 @@ impl ThroughputStats {
49
49
first_read_timestamp : None ,
50
50
last_read_timestamp : Instant :: now ( ) ,
51
51
total_duration : Duration :: ZERO ,
52
- enabled : false
52
+ enabled : false ,
53
53
}
54
54
}
55
55
56
56
/// Returns throughput in bps.
57
- pub fn throughput ( & self ) -> f64 {
57
+ pub fn throughput ( & self ) -> Option < f64 > {
58
58
let total_duration_secs = self . total_duration . as_secs_f64 ( ) ;
59
59
if total_duration_secs > 0.0 {
60
- ( self . total_read as f64 ) / total_duration_secs
60
+ Some ( ( self . total_read as f64 ) / total_duration_secs)
61
61
} else {
62
- 0.0
62
+ None
63
63
}
64
64
}
65
65
}
@@ -104,15 +104,16 @@ impl<S: SessionStream> AsyncRead for LoggingStream<S> {
104
104
let projected = self . project ( ) ;
105
105
let old_remaining = buf. remaining ( ) ;
106
106
107
+ let now = Instant :: now ( ) ;
108
+ if projected. throughput . first_read_timestamp . is_none ( ) && projected. throughput . enabled {
109
+ projected. throughput . first_read_timestamp = Some ( now) ;
110
+ }
111
+
107
112
let res = projected. inner . poll_read ( cx, buf) ;
108
113
109
114
let n = old_remaining - buf. remaining ( ) ;
110
115
if n > 0 {
111
116
if projected. throughput . enabled {
112
- let now = Instant :: now ( ) ;
113
- if projected. throughput . first_read_timestamp . is_none ( ) {
114
- projected. throughput . first_read_timestamp = Some ( now) ;
115
- }
116
117
projected. throughput . last_read_timestamp = now;
117
118
118
119
projected. throughput . span_read = projected. throughput . span_read . saturating_add ( n) ;
@@ -144,20 +145,35 @@ impl<S: SessionStream> AsyncWrite for LoggingStream<S> {
144
145
) -> Poll < std:: io:: Result < ( ) > > {
145
146
let projected = self . project ( ) ;
146
147
if let Some ( first_read_timestamp) = projected. throughput . first_read_timestamp . take ( ) {
147
- let duration = projected. throughput . last_read_timestamp . duration_since ( first_read_timestamp) ;
148
-
149
- projected. throughput . total_read = projected. throughput . total_read . saturating_add ( projected. throughput . span_read ) ;
148
+ let duration = projected
149
+ . throughput
150
+ . last_read_timestamp
151
+ . duration_since ( first_read_timestamp) ;
152
+
153
+ projected. throughput . total_read = projected
154
+ . throughput
155
+ . total_read
156
+ . saturating_add ( projected. throughput . span_read ) ;
150
157
projected. throughput . span_read = 0 ;
151
- projected. throughput . total_duration = projected. throughput . total_duration . saturating_add ( duration) ;
158
+ projected. throughput . total_duration =
159
+ projected. throughput . total_duration . saturating_add ( duration) ;
152
160
}
153
161
154
- let throughput = projected. throughput . throughput ( ) ;
155
- let log_message = format ! ( "{}: FLUSH: {} kbps" , projected. tag, throughput * 8e-3 ) ;
162
+ if let Some ( throughput) = projected. throughput . throughput ( ) {
163
+ let log_message = format ! ( "{}: FLUSH: {} kbps" , projected. tag, throughput * 8e-3 ) ;
164
+
165
+ projected. events . emit ( Event {
166
+ id : 0 ,
167
+ typ : EventType :: Info ( log_message) ,
168
+ } ) ;
169
+ } else {
170
+ let log_message = format ! ( "{}: FLUSH: unknown throughput" , projected. tag) ;
156
171
157
- projected. events . emit ( Event {
158
- id : 0 ,
159
- typ : EventType :: Info ( log_message) ,
160
- } ) ;
172
+ projected. events . emit ( Event {
173
+ id : 0 ,
174
+ typ : EventType :: Info ( log_message) ,
175
+ } ) ;
176
+ }
161
177
162
178
projected. inner . poll_flush ( cx)
163
179
}
0 commit comments