Skip to content

Commit d45ec7f

Browse files
committed
feat: advance next UID even if connection fails while fetching
Connection sometimes fails while processing FETCH responses. In this case `fetch_new_messages` exits early and does not advance next expected UID even if some messages were processed. This results in prefetching the same messages after reconnection and log messages similar to "Not moving the message ab05c85a-e191-4fd2-a951-9972bc7e167f@localhost that we have seen before.". With this change we advance next expected UID even if `fetch_new_messages` returns a network error.
1 parent 752f45f commit d45ec7f

File tree

2 files changed

+97
-63
lines changed

2 files changed

+97
-63
lines changed

src/download.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,18 @@ impl Session {
213213

214214
let mut uid_message_ids: BTreeMap<u32, String> = BTreeMap::new();
215215
uid_message_ids.insert(uid, rfc724_mid);
216-
let (last_uid, _received) = self
217-
.fetch_many_msgs(
218-
context,
219-
folder,
220-
uidvalidity,
221-
vec![uid],
222-
&uid_message_ids,
223-
false,
224-
)
225-
.await?;
226-
if last_uid.is_none() {
216+
let (sender, receiver) = async_channel::unbounded();
217+
self.fetch_many_msgs(
218+
context,
219+
folder,
220+
uidvalidity,
221+
vec![uid],
222+
&uid_message_ids,
223+
false,
224+
sender,
225+
)
226+
.await?;
227+
if receiver.recv().await.is_err() {
227228
bail!("Failed to fetch UID {uid}");
228229
}
229230
Ok(())

src/imap.rs

Lines changed: 85 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
};
1515

1616
use anyhow::{Context as _, Result, bail, ensure, format_err};
17-
use async_channel::Receiver;
17+
use async_channel::{self, Receiver, Sender};
1818
use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse};
1919
use deltachat_contact_tools::ContactAddress;
2020
use futures::{FutureExt as _, StreamExt, TryStreamExt};
@@ -562,7 +562,7 @@ impl Imap {
562562
let read_cnt = msgs.len();
563563

564564
let download_limit = context.download_limit().await?;
565-
let mut uids_fetch = Vec::<(_, bool /* partially? */)>::with_capacity(msgs.len() + 1);
565+
let mut uids_fetch = Vec::<(u32, bool /* partially? */)>::with_capacity(msgs.len() + 1);
566566
let mut uid_message_ids = BTreeMap::new();
567567
let mut largest_uid_skipped = None;
568568
let delete_target = context.get_delete_msgs_target().await?;
@@ -695,51 +695,72 @@ impl Imap {
695695
self.connectivity.set_working(context).await;
696696
}
697697

698-
// Actually download messages.
699-
let mut largest_uid_fetched: u32 = 0;
700-
let mut received_msgs = Vec::with_capacity(uids_fetch.len());
701-
let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1));
702-
let mut fetch_partially = false;
703-
uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1));
704-
for (uid, fp) in uids_fetch {
705-
if fp != fetch_partially {
706-
let (largest_uid_fetched_in_batch, received_msgs_in_batch) = session
707-
.fetch_many_msgs(
708-
context,
709-
folder,
710-
uid_validity,
711-
uids_fetch_in_batch.split_off(0),
712-
&uid_message_ids,
713-
fetch_partially,
714-
)
715-
.await
716-
.context("fetch_many_msgs")?;
717-
received_msgs.extend(received_msgs_in_batch);
718-
largest_uid_fetched = max(
719-
largest_uid_fetched,
720-
largest_uid_fetched_in_batch.unwrap_or(0),
721-
);
722-
fetch_partially = fp;
723-
}
724-
uids_fetch_in_batch.push(uid);
725-
}
698+
let (sender, receiver) = async_channel::unbounded();
726699

727-
// Advance uid_next to the maximum of the largest known UID plus 1
728-
// and mailbox UIDNEXT.
729-
// Largest known UID is normally less than UIDNEXT,
730-
// but a message may have arrived between determining UIDNEXT
731-
// and executing the FETCH command.
700+
let mut received_msgs = Vec::with_capacity(uids_fetch.len());
732701
let mailbox_uid_next = session
733702
.selected_mailbox
734703
.as_ref()
735704
.with_context(|| format!("Expected {folder:?} to be selected"))?
736705
.uid_next
737706
.unwrap_or_default();
738-
let new_uid_next = max(
739-
max(largest_uid_fetched, largest_uid_skipped.unwrap_or(0)) + 1,
740-
mailbox_uid_next,
741-
);
742707

708+
let update_uids_future = async {
709+
let mut largest_uid_fetched: u32 = 0;
710+
711+
while let Ok((uid, received_msg_opt)) = receiver.recv().await {
712+
largest_uid_fetched = max(largest_uid_fetched, uid);
713+
if let Some(received_msg) = received_msg_opt {
714+
received_msgs.push(received_msg)
715+
}
716+
}
717+
718+
largest_uid_fetched
719+
};
720+
721+
let actually_download_messages_future = async move {
722+
let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1));
723+
let mut fetch_partially = false;
724+
uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1));
725+
for (uid, fp) in uids_fetch {
726+
if fp != fetch_partially {
727+
session
728+
.fetch_many_msgs(
729+
context,
730+
folder,
731+
uid_validity,
732+
uids_fetch_in_batch.split_off(0),
733+
&uid_message_ids,
734+
fetch_partially,
735+
sender.clone(),
736+
)
737+
.await
738+
.context("fetch_many_msgs")?;
739+
fetch_partially = fp;
740+
}
741+
uids_fetch_in_batch.push(uid);
742+
}
743+
744+
anyhow::Ok(())
745+
};
746+
747+
let (largest_uid_fetched, fetch_res) =
748+
tokio::join!(update_uids_future, actually_download_messages_future);
749+
750+
// Advance uid_next to the largest fetched UID plus 1.
751+
//
752+
// This may be larger than `mailbox_uid_next`
753+
// if the message has arrived after selecting mailbox
754+
// and determining its UIDNEXT and before prefetch.
755+
let mut new_uid_next = largest_uid_fetched + 1;
756+
if fetch_res.is_ok() {
757+
// If we have successfully fetched all messages we planned during prefetch,
758+
// then we have covered at least the range between old UIDNEXT
759+
// and UIDNEXT of the mailbox at the time of selecting it.
760+
new_uid_next = max(new_uid_next, mailbox_uid_next);
761+
762+
new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
763+
}
743764
if new_uid_next > old_uid_next {
744765
set_uid_next(context, folder, new_uid_next).await?;
745766
}
@@ -752,6 +773,10 @@ impl Imap {
752773

753774
chat::mark_old_messages_as_noticed(context, received_msgs).await?;
754775

776+
// Now fail if fetching failed, so we will
777+
// establish a new session if this one is broken.
778+
fetch_res?;
779+
755780
Ok(read_cnt > 0)
756781
}
757782

@@ -1300,9 +1325,19 @@ impl Session {
13001325

13011326
/// Fetches a list of messages by server UID.
13021327
///
1303-
/// Returns the last UID fetched successfully and the info about each downloaded message.
1328+
/// Sends pairs of UID and info about each downloaded message to the provided channel.
1329+
/// Received message info is optional because UID may be ignored
1330+
/// if the message has a `\Deleted` flag.
1331+
///
1332+
/// The channel is used to return the results because the function may fail
1333+
/// due to network errors before it finishes fetching all the messages.
1334+
/// In this case caller still may want to process all the results
1335+
/// received over the channel and persist last seen UID in the database
1336+
/// before bubbling up the failure.
1337+
///
13041338
/// If the message is incorrect or there is a failure to write a message to the database,
13051339
/// it is skipped and the error is logged.
1340+
#[expect(clippy::too_many_arguments)]
13061341
pub(crate) async fn fetch_many_msgs(
13071342
&mut self,
13081343
context: &Context,
@@ -1311,12 +1346,10 @@ impl Session {
13111346
request_uids: Vec<u32>,
13121347
uid_message_ids: &BTreeMap<u32, String>,
13131348
fetch_partially: bool,
1314-
) -> Result<(Option<u32>, Vec<ReceivedMsg>)> {
1315-
let mut last_uid = None;
1316-
let mut received_msgs = Vec::new();
1317-
1349+
received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
1350+
) -> Result<()> {
13181351
if request_uids.is_empty() {
1319-
return Ok((last_uid, received_msgs));
1352+
return Ok(());
13201353
}
13211354

13221355
for (request_uids, set) in build_sequence_sets(&request_uids)? {
@@ -1402,7 +1435,7 @@ impl Session {
14021435

14031436
if is_deleted {
14041437
info!(context, "Not processing deleted msg {}.", request_uid);
1405-
last_uid = Some(request_uid);
1438+
received_msgs_channel.send((request_uid, None)).await?;
14061439
continue;
14071440
}
14081441

@@ -1413,7 +1446,7 @@ impl Session {
14131446
context,
14141447
"Not processing message {} without a BODY.", request_uid
14151448
);
1416-
last_uid = Some(request_uid);
1449+
received_msgs_channel.send((request_uid, None)).await?;
14171450
continue;
14181451
};
14191452

@@ -1445,15 +1478,15 @@ impl Session {
14451478
.await
14461479
{
14471480
Ok(received_msg) => {
1448-
if let Some(m) = received_msg {
1449-
received_msgs.push(m);
1450-
}
1481+
received_msgs_channel
1482+
.send((request_uid, received_msg))
1483+
.await?;
14511484
}
14521485
Err(err) => {
14531486
warn!(context, "receive_imf error: {:#}.", err);
1487+
received_msgs_channel.send((request_uid, None)).await?;
14541488
}
14551489
};
1456-
last_uid = Some(request_uid)
14571490
}
14581491

14591492
// If we don't process the whole response, IMAP client is left in a broken state where
@@ -1477,7 +1510,7 @@ impl Session {
14771510
}
14781511
}
14791512

1480-
Ok((last_uid, received_msgs))
1513+
Ok(())
14811514
}
14821515

14831516
/// Retrieves server metadata if it is supported.

0 commit comments

Comments
 (0)