Skip to content

Commit dbe5aa1

Browse files
committed
ThroughputStats
1 parent 7f74c7b commit dbe5aa1

File tree

1 file changed

+63
-49
lines changed

1 file changed

+63
-49
lines changed

src/log/logging_stream.rs

Lines changed: 63 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,58 +16,40 @@ use crate::net::session::SessionStream;
1616

1717
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1818

19-
/// Stream that logs errors to the event channel.
2019
#[derive(Debug)]
21-
#[pin_project]
22-
pub(crate) struct LoggingStream<S: SessionStream> {
23-
#[pin]
24-
inner: S,
25-
26-
/// Name of the stream to distinguish log messages produced by it.
27-
tag: String,
28-
29-
/// Account ID for logging.
30-
account_id: u32,
31-
32-
/// Event channel.
33-
events: Events,
34-
20+
struct ThroughputStats {
3521
/// Total number of bytes read.
36-
total_read: usize,
22+
pub total_read: usize,
3723

3824
/// Number of bytes read since the last flush.
39-
span_read: usize,
25+
pub span_read: usize,
4026

4127
/// First timestamp of successful non-zero read.
4228
///
4329
/// Reset on flush.
44-
first_read_timestamp: Option<Instant>,
30+
pub first_read_timestamp: Option<Instant>,
4531

4632
/// Last non-zero read.
47-
last_read_timestamp: Instant,
33+
pub last_read_timestamp: Instant,
4834

49-
total_duration: Duration,
35+
pub total_duration: Duration,
5036

5137
/// Whether to collect throughput statistics or not.
5238
///
5339
/// Disabled when read timeout is disabled,
5440
/// i.e. when we are in IMAP IDLE.
55-
enable_stats: bool,
41+
pub enabled: bool,
5642
}
5743

58-
impl<S: SessionStream> LoggingStream<S> {
59-
pub fn new(inner: S, tag: String, account_id: u32, events: Events) -> Self {
44+
impl ThroughputStats {
45+
fn new() -> Self {
6046
Self {
61-
inner,
62-
tag,
63-
account_id,
64-
events,
6547
total_read: 0,
6648
span_read: 0,
6749
first_read_timestamp: None,
6850
last_read_timestamp: Instant::now(),
6951
total_duration: Duration::ZERO,
70-
enable_stats: true
52+
enabled: false
7153
}
7254
}
7355

@@ -82,6 +64,37 @@ impl<S: SessionStream> LoggingStream<S> {
8264
}
8365
}
8466

67+
/// Stream that logs errors to the event channel.
68+
#[derive(Debug)]
69+
#[pin_project]
70+
pub(crate) struct LoggingStream<S: SessionStream> {
71+
#[pin]
72+
inner: S,
73+
74+
/// Name of the stream to distinguish log messages produced by it.
75+
tag: String,
76+
77+
/// Account ID for logging.
78+
account_id: u32,
79+
80+
/// Event channel.
81+
events: Events,
82+
83+
throughput: ThroughputStats,
84+
}
85+
86+
impl<S: SessionStream> LoggingStream<S> {
87+
pub fn new(inner: S, tag: String, account_id: u32, events: Events) -> Self {
88+
Self {
89+
inner,
90+
tag,
91+
account_id,
92+
events,
93+
throughput: ThroughputStats::new(),
94+
}
95+
}
96+
}
97+
8598
impl<S: SessionStream> AsyncRead for LoggingStream<S> {
8699
fn poll_read(
87100
self: Pin<&mut Self>,
@@ -94,14 +107,16 @@ impl<S: SessionStream> AsyncRead for LoggingStream<S> {
94107
let res = projected.inner.poll_read(cx, buf);
95108

96109
let n = old_remaining - buf.remaining();
97-
if n > 0 && *projected.enable_stats {
98-
let now = Instant::now();
99-
if projected.first_read_timestamp.is_none() {
100-
*projected.first_read_timestamp = Some(now);
110+
if n > 0 {
111+
if projected.throughput.enabled {
112+
let now = Instant::now();
113+
if projected.throughput.first_read_timestamp.is_none() {
114+
projected.throughput.first_read_timestamp = Some(now);
115+
}
116+
projected.throughput.last_read_timestamp = now;
117+
118+
projected.throughput.span_read = projected.throughput.span_read.saturating_add(n);
101119
}
102-
*projected.last_read_timestamp = now;
103-
104-
*projected.span_read = projected.span_read.saturating_add(n);
105120

106121
let log_message = format!("{}: READING {}", projected.tag, n);
107122
projected.events.emit(Event {
@@ -127,23 +142,22 @@ impl<S: SessionStream> AsyncWrite for LoggingStream<S> {
127142
self: Pin<&mut Self>,
128143
cx: &mut std::task::Context<'_>,
129144
) -> Poll<std::io::Result<()>> {
130-
let throughput = self.throughput();
131-
132145
let projected = self.project();
133-
if let Some(first_read_timestamp) = projected.first_read_timestamp.take() {
134-
let duration = projected.last_read_timestamp.duration_since(first_read_timestamp);
146+
if let Some(first_read_timestamp) = projected.throughput.first_read_timestamp.take() {
147+
let duration = projected.throughput.last_read_timestamp.duration_since(first_read_timestamp);
135148

136-
*projected.total_read = projected.total_read.saturating_add(*projected.span_read);
137-
*projected.span_read = 0;
138-
*projected.total_duration = projected.total_duration.saturating_add(duration);
149+
projected.throughput.total_read = projected.throughput.total_read.saturating_add(projected.throughput.span_read);
150+
projected.throughput.span_read = 0;
151+
projected.throughput.total_duration = projected.throughput.total_duration.saturating_add(duration);
152+
}
139153

140-
let log_message = format!("{}: FLUSH: {} kbps", projected.tag, throughput * 8e-3);
154+
let throughput = projected.throughput.throughput();
155+
let log_message = format!("{}: FLUSH: {} kbps", projected.tag, throughput * 8e-3);
141156

142-
projected.events.emit(Event {
143-
id: 0,
144-
typ: EventType::Info(log_message),
145-
});
146-
}
157+
projected.events.emit(Event {
158+
id: 0,
159+
typ: EventType::Info(log_message),
160+
});
147161

148162
projected.inner.poll_flush(cx)
149163
}
@@ -170,7 +184,7 @@ impl<S: SessionStream> AsyncWrite for LoggingStream<S> {
170184

171185
impl<S: SessionStream> SessionStream for LoggingStream<S> {
172186
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
173-
self.enable_stats = timeout.is_some();
187+
self.throughput.enabled = timeout.is_some();
174188

175189
self.inner.set_read_timeout(timeout)
176190
}

0 commit comments

Comments
 (0)