Skip to content

Commit 9ad7f78

Browse files
committed
feat: add LoggingStream
1 parent be107b0 commit 9ad7f78

File tree

2 files changed

+236
-0
lines changed

2 files changed

+236
-0
lines changed

src/log.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
use crate::context::Context;
66

7+
mod logging_stream;
8+
9+
pub(crate) use logging_stream::LoggingStream;
10+
711
macro_rules! info {
812
($ctx:expr, $msg:expr) => {
913
info!($ctx, $msg,)

src/log/logging_stream.rs

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
//! Stream that logs errors as events.
2+
//!
3+
//! This stream can be used to wrap IMAP,
4+
//! SMTP and HTTP streams so errors
5+
//! that occur are logged before
6+
//! they are processed.
7+
8+
use std::pin::Pin;
9+
use std::task::{Context, Poll};
10+
use std::time::{Duration, Instant};
11+
12+
use pin_project::pin_project;
13+
14+
use crate::events::{Event, EventType, Events};
15+
use crate::net::session::SessionStream;
16+
17+
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
18+
19+
#[derive(Debug)]
20+
struct ThroughputStats {
21+
/// Total number of bytes read.
22+
pub total_read: usize,
23+
24+
/// Number of bytes read since the last flush.
25+
pub span_read: usize,
26+
27+
/// First timestamp of successful non-zero read.
28+
///
29+
/// Reset on flush.
30+
pub first_read_timestamp: Option<Instant>,
31+
32+
/// Last non-zero read.
33+
pub last_read_timestamp: Instant,
34+
35+
pub total_duration: Duration,
36+
37+
/// Whether to collect throughput statistics or not.
38+
///
39+
/// Disabled when read timeout is disabled,
40+
/// i.e. when we are in IMAP IDLE.
41+
pub enabled: bool,
42+
}
43+
44+
impl ThroughputStats {
45+
fn new() -> Self {
46+
Self {
47+
total_read: 0,
48+
span_read: 0,
49+
first_read_timestamp: None,
50+
last_read_timestamp: Instant::now(),
51+
total_duration: Duration::ZERO,
52+
enabled: false,
53+
}
54+
}
55+
56+
/// Returns throughput in bps.
57+
pub fn throughput(&self) -> Option<f64> {
58+
let total_duration_secs = self.total_duration.as_secs_f64();
59+
if total_duration_secs > 0.0 {
60+
Some((self.total_read as f64) / total_duration_secs)
61+
} else {
62+
None
63+
}
64+
}
65+
}
66+
67+
/// Stream that logs errors to the event channel.
68+
#[derive(Debug)]
69+
#[pin_project]
70+
pub(crate) struct LoggingStream<S: SessionStream> {
71+
#[pin]
72+
inner: S,
73+
74+
/// Name of the stream to distinguish log messages produced by it.
75+
tag: String,
76+
77+
/// Account ID for logging.
78+
account_id: u32,
79+
80+
/// Event channel.
81+
events: Events,
82+
83+
throughput: ThroughputStats,
84+
}
85+
86+
impl<S: SessionStream> LoggingStream<S> {
87+
pub fn new(inner: S, tag: String, account_id: u32, events: Events) -> Self {
88+
Self {
89+
inner,
90+
tag,
91+
account_id,
92+
events,
93+
throughput: ThroughputStats::new(),
94+
}
95+
}
96+
}
97+
98+
impl<S: SessionStream> AsyncRead for LoggingStream<S> {
99+
fn poll_read(
100+
self: Pin<&mut Self>,
101+
cx: &mut Context<'_>,
102+
buf: &mut ReadBuf<'_>,
103+
) -> Poll<std::io::Result<()>> {
104+
let projected = self.project();
105+
let old_remaining = buf.remaining();
106+
107+
let now = Instant::now();
108+
let res = projected.inner.poll_read(cx, buf);
109+
110+
if projected.throughput.enabled {
111+
let first_read_timestamp =
112+
if let Some(first_read_timestamp) = projected.throughput.first_read_timestamp {
113+
first_read_timestamp
114+
} else {
115+
projected.throughput.first_read_timestamp = Some(now);
116+
now
117+
};
118+
119+
let n = old_remaining - buf.remaining();
120+
if n > 0 {
121+
projected.throughput.last_read_timestamp = now;
122+
projected.throughput.span_read = projected.throughput.span_read.saturating_add(n);
123+
}
124+
125+
let duration = projected
126+
.throughput
127+
.last_read_timestamp
128+
.duration_since(first_read_timestamp);
129+
130+
let log_message = format!(
131+
"{}: SPAN: {} {}",
132+
projected.tag,
133+
duration.as_secs_f64(),
134+
projected.throughput.span_read
135+
);
136+
projected.events.emit(Event {
137+
id: 0,
138+
typ: EventType::Info(log_message),
139+
});
140+
141+
let log_message = format!("{}: READING {}", projected.tag, n);
142+
projected.events.emit(Event {
143+
id: 0,
144+
typ: EventType::Info(log_message),
145+
});
146+
}
147+
148+
res
149+
}
150+
}
151+
152+
impl<S: SessionStream> AsyncWrite for LoggingStream<S> {
153+
fn poll_write(
154+
self: Pin<&mut Self>,
155+
cx: &mut std::task::Context<'_>,
156+
buf: &[u8],
157+
) -> Poll<std::io::Result<usize>> {
158+
self.project().inner.poll_write(cx, buf)
159+
}
160+
161+
fn poll_flush(
162+
self: Pin<&mut Self>,
163+
cx: &mut std::task::Context<'_>,
164+
) -> Poll<std::io::Result<()>> {
165+
let projected = self.project();
166+
if let Some(first_read_timestamp) = projected.throughput.first_read_timestamp.take() {
167+
let duration = projected
168+
.throughput
169+
.last_read_timestamp
170+
.duration_since(first_read_timestamp);
171+
172+
// Only measure when more than about 2 MTU is transferred.
173+
// We cannot measure throughput on small responses
174+
// like `A1000 OK`.
175+
if projected.throughput.span_read > 3000 {
176+
projected.throughput.total_read = projected
177+
.throughput
178+
.total_read
179+
.saturating_add(projected.throughput.span_read);
180+
projected.throughput.total_duration =
181+
projected.throughput.total_duration.saturating_add(duration);
182+
}
183+
184+
projected.throughput.span_read = 0;
185+
}
186+
187+
if let Some(throughput) = projected.throughput.throughput() {
188+
let log_message = format!("{}: FLUSH: {} kbps", projected.tag, throughput * 8e-3);
189+
190+
projected.events.emit(Event {
191+
id: 0,
192+
typ: EventType::Info(log_message),
193+
});
194+
} else {
195+
let log_message = format!("{}: FLUSH: unknown throughput", projected.tag);
196+
197+
projected.events.emit(Event {
198+
id: 0,
199+
typ: EventType::Info(log_message),
200+
});
201+
}
202+
203+
projected.inner.poll_flush(cx)
204+
}
205+
206+
fn poll_shutdown(
207+
self: Pin<&mut Self>,
208+
cx: &mut std::task::Context<'_>,
209+
) -> Poll<std::io::Result<()>> {
210+
self.project().inner.poll_shutdown(cx)
211+
}
212+
213+
fn poll_write_vectored(
214+
self: Pin<&mut Self>,
215+
cx: &mut Context<'_>,
216+
bufs: &[std::io::IoSlice<'_>],
217+
) -> Poll<std::io::Result<usize>> {
218+
self.project().inner.poll_write_vectored(cx, bufs)
219+
}
220+
221+
fn is_write_vectored(&self) -> bool {
222+
self.inner.is_write_vectored()
223+
}
224+
}
225+
226+
impl<S: SessionStream> SessionStream for LoggingStream<S> {
227+
fn set_read_timeout(&mut self, timeout: Option<Duration>) {
228+
self.throughput.enabled = timeout.is_some();
229+
230+
self.inner.set_read_timeout(timeout)
231+
}
232+
}

0 commit comments

Comments
 (0)