Skip to content

Commit 0bbf5b8

Browse files
committed
log starttls
1 parent dbe5aa1 commit 0bbf5b8

File tree

2 files changed

+45
-21
lines changed

2 files changed

+45
-21
lines changed

src/imap/client.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ impl Client {
129129
Client::connect_secure(&context, resolved_addr, host, strict_tls).await
130130
}
131131
ConnectionSecurity::Starttls => {
132-
Client::connect_starttls(resolved_addr, host, strict_tls).await
132+
Client::connect_starttls(&context, resolved_addr, host, strict_tls).await
133133
}
134134
ConnectionSecurity::Plain => Client::connect_insecure(&context, resolved_addr).await,
135135
};
@@ -247,7 +247,7 @@ impl Client {
247247
Ok(client)
248248
}
249249

250-
async fn connect_starttls(addr: SocketAddr, host: &str, strict_tls: bool) -> Result<Self> {
250+
async fn connect_starttls(context: &Context, addr: SocketAddr, host: &str, strict_tls: bool) -> Result<Self> {
251251
let tcp_stream = connect_tcp_inner(addr).await?;
252252

253253
// Run STARTTLS command and convert the client back into a stream.
@@ -268,7 +268,15 @@ impl Client {
268268
.await
269269
.context("STARTTLS upgrade failed")?;
270270

271-
let buffered_stream = BufWriter::new(tls_stream);
271+
let account_id = context.get_id();
272+
let events = context.events.clone();
273+
let logging_stream = LoggingStream::new(
274+
tls_stream,
275+
format!("STARTTLS IMAP stream {host} ({addr})"),
276+
account_id,
277+
events,
278+
);
279+
let buffered_stream = BufWriter::new(logging_stream);
272280
let session_stream: Box<dyn SessionStream> = Box::new(buffered_stream);
273281
let client = Client::new(session_stream);
274282
Ok(client)

src/log/logging_stream.rs

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,17 @@ impl ThroughputStats {
4949
first_read_timestamp: None,
5050
last_read_timestamp: Instant::now(),
5151
total_duration: Duration::ZERO,
52-
enabled: false
52+
enabled: false,
5353
}
5454
}
5555

5656
/// Returns throughput in bps.
57-
pub fn throughput(&self) -> f64 {
57+
pub fn throughput(&self) -> Option<f64> {
5858
let total_duration_secs = self.total_duration.as_secs_f64();
5959
if total_duration_secs > 0.0 {
60-
(self.total_read as f64) / total_duration_secs
60+
Some((self.total_read as f64) / total_duration_secs)
6161
} else {
62-
0.0
62+
None
6363
}
6464
}
6565
}
@@ -104,15 +104,16 @@ impl<S: SessionStream> AsyncRead for LoggingStream<S> {
104104
let projected = self.project();
105105
let old_remaining = buf.remaining();
106106

107+
let now = Instant::now();
108+
if projected.throughput.first_read_timestamp.is_none() && projected.throughput.enabled {
109+
projected.throughput.first_read_timestamp = Some(now);
110+
}
111+
107112
let res = projected.inner.poll_read(cx, buf);
108113

109114
let n = old_remaining - buf.remaining();
110115
if n > 0 {
111116
if projected.throughput.enabled {
112-
let now = Instant::now();
113-
if projected.throughput.first_read_timestamp.is_none() {
114-
projected.throughput.first_read_timestamp = Some(now);
115-
}
116117
projected.throughput.last_read_timestamp = now;
117118

118119
projected.throughput.span_read = projected.throughput.span_read.saturating_add(n);
@@ -144,20 +145,35 @@ impl<S: SessionStream> AsyncWrite for LoggingStream<S> {
144145
) -> Poll<std::io::Result<()>> {
145146
let projected = self.project();
146147
if let Some(first_read_timestamp) = projected.throughput.first_read_timestamp.take() {
147-
let duration = projected.throughput.last_read_timestamp.duration_since(first_read_timestamp);
148-
149-
projected.throughput.total_read = projected.throughput.total_read.saturating_add(projected.throughput.span_read);
148+
let duration = projected
149+
.throughput
150+
.last_read_timestamp
151+
.duration_since(first_read_timestamp);
152+
153+
projected.throughput.total_read = projected
154+
.throughput
155+
.total_read
156+
.saturating_add(projected.throughput.span_read);
150157
projected.throughput.span_read = 0;
151-
projected.throughput.total_duration = projected.throughput.total_duration.saturating_add(duration);
158+
projected.throughput.total_duration =
159+
projected.throughput.total_duration.saturating_add(duration);
152160
}
153161

154-
let throughput = projected.throughput.throughput();
155-
let log_message = format!("{}: FLUSH: {} kbps", projected.tag, throughput * 8e-3);
162+
if let Some(throughput) = projected.throughput.throughput() {
163+
let log_message = format!("{}: FLUSH: {} kbps", projected.tag, throughput * 8e-3);
164+
165+
projected.events.emit(Event {
166+
id: 0,
167+
typ: EventType::Info(log_message),
168+
});
169+
} else {
170+
let log_message = format!("{}: FLUSH: unknown throughput", projected.tag);
156171

157-
projected.events.emit(Event {
158-
id: 0,
159-
typ: EventType::Info(log_message),
160-
});
172+
projected.events.emit(Event {
173+
id: 0,
174+
typ: EventType::Info(log_message),
175+
});
176+
}
161177

162178
projected.inner.poll_flush(cx)
163179
}

0 commit comments

Comments
 (0)