Skip to content

Commit bebcbc0

Browse files
committed
measure throughput
1 parent 41df85a commit bebcbc0

File tree

1 file changed

+59
-13
lines changed

1 file changed

+59
-13
lines changed

src/log/logging_stream.rs

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
88
use std::pin::Pin;
99
use std::task::{Context, Poll};
10-
use std::time::Duration;
10+
use std::time::{Duration, Instant};
1111

1212
use pin_project::pin_project;
1313

@@ -31,6 +31,22 @@ pub(crate) struct LoggingStream<S: SessionStream> {
3131

3232
/// Event channel.
3333
events: Events,
34+
35+
/// Total number of bytes read.
36+
total_read: usize,
37+
38+
/// Number of bytes read since the last flush.
39+
span_read: usize,
40+
41+
/// First timestamp of successful non-zero read.
42+
///
43+
/// Reset on flush.
44+
first_read_timestamp: Option<Instant>,
45+
46+
/// Last non-zero read.
47+
last_read_timestamp: Instant,
48+
49+
total_duration: Duration,
3450
}
3551

3652
impl<S: SessionStream> LoggingStream<S> {
@@ -40,6 +56,11 @@ impl<S: SessionStream> LoggingStream<S> {
4056
tag,
4157
account_id,
4258
events,
59+
total_read: 0,
60+
span_read: 0,
61+
first_read_timestamp: None,
62+
last_read_timestamp: Instant::now(),
63+
total_duration: Duration::ZERO,
4364
}
4465
}
4566
}
@@ -56,12 +77,21 @@ impl<S: SessionStream> AsyncRead for LoggingStream<S> {
5677
let res = projected.inner.poll_read(cx, buf);
5778

5879
let n = old_remaining - buf.remaining();
59-
let log_message = format!("{}: READING {}", projected.tag, n);
60-
projected.events.emit(Event {
61-
id: 0,
62-
typ: EventType::Info(log_message),
63-
});
64-
80+
if n > 0 {
81+
let now = Instant::now();
82+
if projected.first_read_timestamp.is_none() {
83+
*projected.first_read_timestamp = Some(now);
84+
}
85+
*projected.last_read_timestamp = now;
86+
87+
*projected.span_read = projected.span_read.saturating_add(n);
88+
89+
let log_message = format!("{}: READING {}", projected.tag, n);
90+
projected.events.emit(Event {
91+
id: 0,
92+
typ: EventType::Info(log_message),
93+
});
94+
}
6595

6696
res
6797
}
@@ -88,13 +118,29 @@ impl<S: SessionStream> AsyncWrite for LoggingStream<S> {
88118
self: Pin<&mut Self>,
89119
cx: &mut std::task::Context<'_>,
90120
) -> Poll<std::io::Result<()>> {
91-
let log_message = format!("{}: FLUSH", self.tag);
92-
93121
let projected = self.project();
94-
projected.events.emit(Event {
95-
id: 0,
96-
typ: EventType::Info(log_message),
97-
});
122+
123+
if let Some(first_read_timestamp) = projected.first_read_timestamp.take() {
124+
let duration = projected.last_read_timestamp.duration_since(first_read_timestamp);
125+
126+
*projected.total_read = projected.total_read.saturating_add(*projected.span_read);
127+
*projected.span_read = 0;
128+
*projected.total_duration = projected.total_duration.saturating_add(duration);
129+
130+
let total_duration_secs = projected.total_duration.as_secs_f64();
131+
let throughput = if total_duration_secs > 0.0 {
132+
(*projected.total_read as f64) / total_duration_secs
133+
} else {
134+
0.0
135+
};
136+
137+
let log_message = format!("{}: FLUSH: read={}, duration={}, {} kbps", projected.tag, *projected.total_read, total_duration_secs, throughput * 8e-3);
138+
139+
projected.events.emit(Event {
140+
id: 0,
141+
typ: EventType::Info(log_message),
142+
});
143+
}
98144

99145
projected.inner.poll_flush(cx)
100146
}

0 commit comments

Comments
 (0)