Skip to content

Commit a8a7cec

Browse files
committed
refactor: use CancellationToken instead of a 1-message channel
1 parent 7f6beee commit a8a7cec

File tree

1 file changed

+28
-33
lines changed

1 file changed

+28
-33
lines changed

src/scheduler.rs

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use futures_lite::FutureExt;
1010
use rand::Rng;
1111
use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
1212
use tokio::task;
13+
use tokio_util::sync::CancellationToken;
1314

1415
use self::connectivity::ConnectivityStore;
1516
use crate::config::{self, Config};
@@ -389,7 +390,7 @@ async fn inbox_loop(
389390
info!(ctx, "Starting inbox loop.");
390391
let ImapConnectionHandlers {
391392
mut connection,
392-
stop_receiver,
393+
stop_token,
393394
} = inbox_handlers;
394395

395396
let ctx1 = ctx.clone();
@@ -428,8 +429,8 @@ async fn inbox_loop(
428429
}
429430
};
430431

431-
stop_receiver
432-
.recv()
432+
stop_token
433+
.cancelled()
433434
.map(|_| {
434435
info!(ctx, "Shutting down inbox loop.");
435436
})
@@ -703,7 +704,7 @@ async fn simple_imap_loop(
703704
info!(ctx, "Starting simple loop for {folder_meaning}.");
704705
let ImapConnectionHandlers {
705706
mut connection,
706-
stop_receiver,
707+
stop_token,
707708
} = inbox_handlers;
708709

709710
let ctx1 = ctx.clone();
@@ -749,8 +750,8 @@ async fn simple_imap_loop(
749750
}
750751
};
751752

752-
stop_receiver
753-
.recv()
753+
stop_token
754+
.cancelled()
754755
.map(|_| {
755756
info!(ctx, "Shutting down IMAP loop for {folder_meaning}.");
756757
})
@@ -768,7 +769,7 @@ async fn smtp_loop(
768769
info!(ctx, "Starting SMTP loop.");
769770
let SmtpConnectionHandlers {
770771
mut connection,
771-
stop_receiver,
772+
stop_token,
772773
idle_interrupt_receiver,
773774
} = smtp_handlers;
774775

@@ -836,8 +837,8 @@ async fn smtp_loop(
836837
}
837838
};
838839

839-
stop_receiver
840-
.recv()
840+
stop_token
841+
.cancelled()
841842
.map(|_| {
842843
info!(ctx, "Shutting down SMTP loop.");
843844
})
@@ -982,9 +983,9 @@ impl Scheduler {
982983
pub(crate) async fn stop(self, context: &Context) {
983984
// Send stop signals to tasks so they can shutdown cleanly.
984985
for b in self.boxes() {
985-
b.conn_state.stop().await.log_err(context).ok();
986+
b.conn_state.stop();
986987
}
987-
self.smtp.stop().await.log_err(context).ok();
988+
self.smtp.stop();
988989

989990
// Actually shutdown tasks.
990991
let timeout_duration = std::time::Duration::from_secs(30);
@@ -1014,8 +1015,8 @@ impl Scheduler {
10141015
/// Connection state logic shared between imap and smtp connections.
10151016
#[derive(Debug)]
10161017
struct ConnectionState {
1017-
/// Channel to interrupt the whole connection.
1018-
stop_sender: Sender<()>,
1018+
/// Cancellation token to interrupt the whole connection.
1019+
stop_token: CancellationToken,
10191020
/// Channel to interrupt idle.
10201021
idle_interrupt_sender: Sender<()>,
10211022
/// Mutex to pass connectivity info between IMAP/SMTP threads and the API
@@ -1024,13 +1025,9 @@ struct ConnectionState {
10241025

10251026
impl ConnectionState {
10261027
/// Shutdown this connection completely.
1027-
async fn stop(&self) -> Result<()> {
1028+
fn stop(&self) {
10281029
// Trigger shutdown of the run loop.
1029-
self.stop_sender
1030-
.send(())
1031-
.await
1032-
.context("failed to stop, missing receiver")?;
1033-
Ok(())
1030+
self.stop_token.cancel();
10341031
}
10351032

10361033
fn interrupt(&self) {
@@ -1046,17 +1043,17 @@ pub(crate) struct SmtpConnectionState {
10461043

10471044
impl SmtpConnectionState {
10481045
fn new() -> (Self, SmtpConnectionHandlers) {
1049-
let (stop_sender, stop_receiver) = channel::bounded(1);
1046+
let stop_token = CancellationToken::new();
10501047
let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
10511048

10521049
let handlers = SmtpConnectionHandlers {
10531050
connection: Smtp::new(),
1054-
stop_receiver,
1051+
stop_token: stop_token.clone(),
10551052
idle_interrupt_receiver,
10561053
};
10571054

10581055
let state = ConnectionState {
1059-
stop_sender,
1056+
stop_token,
10601057
idle_interrupt_sender,
10611058
connectivity: handlers.connection.connectivity.clone(),
10621059
};
@@ -1072,15 +1069,14 @@ impl SmtpConnectionState {
10721069
}
10731070

10741071
/// Shutdown this connection completely.
1075-
async fn stop(&self) -> Result<()> {
1076-
self.state.stop().await?;
1077-
Ok(())
1072+
fn stop(&self) {
1073+
self.state.stop();
10781074
}
10791075
}
10801076

10811077
struct SmtpConnectionHandlers {
10821078
connection: Smtp,
1083-
stop_receiver: Receiver<()>,
1079+
stop_token: CancellationToken,
10841080
idle_interrupt_receiver: Receiver<()>,
10851081
}
10861082

@@ -1092,16 +1088,16 @@ pub(crate) struct ImapConnectionState {
10921088
impl ImapConnectionState {
10931089
/// Construct a new connection.
10941090
async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
1095-
let (stop_sender, stop_receiver) = channel::bounded(1);
1091+
let stop_token = CancellationToken::new();
10961092
let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
10971093

10981094
let handlers = ImapConnectionHandlers {
10991095
connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
1100-
stop_receiver,
1096+
stop_token: stop_token.clone(),
11011097
};
11021098

11031099
let state = ConnectionState {
1104-
stop_sender,
1100+
stop_token,
11051101
idle_interrupt_sender,
11061102
connectivity: handlers.connection.connectivity.clone(),
11071103
};
@@ -1117,14 +1113,13 @@ impl ImapConnectionState {
11171113
}
11181114

11191115
/// Shutdown this connection completely.
1120-
async fn stop(&self) -> Result<()> {
1121-
self.state.stop().await?;
1122-
Ok(())
1116+
fn stop(&self) {
1117+
self.state.stop();
11231118
}
11241119
}
11251120

11261121
#[derive(Debug)]
11271122
struct ImapConnectionHandlers {
11281123
connection: Imap,
1129-
stop_receiver: Receiver<()>,
1124+
stop_token: CancellationToken,
11301125
}

0 commit comments

Comments
 (0)