Skip to content

Commit 5be558e

Browse files
authored
feat(imex) Connect to all provider addresses concurrently (#4240)
This uses the new iroh API to connect to all provider addresses concurrently. It simplifies the implementation as well as we no longer need to try the addresses manually.
1 parent fc25bba commit 5be558e

File tree

3 files changed

+43
-89
lines changed

3 files changed

+43
-89
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
### Changes
66
- Update iroh, remove `default-net` from `[patch.crates-io]` section.
7-
7+
- transfer backup: Connect to mutliple provider addresses concurrently. This should speed up connection time significantly on the getter side. #4240
88

99
## [1.112.1] - 2023-03-27
1010

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/imex/transfer.rs

Lines changed: 41 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
//! getter can not connect to an impersonated provider and the provider does not offer the
2323
//! download to an impersonated getter.
2424
25-
use std::cmp::Ordering;
2625
use std::future::Future;
2726
use std::net::Ipv4Addr;
2827
use std::ops::Deref;
@@ -33,7 +32,8 @@ use std::task::Poll;
3332
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
3433
use async_channel::Receiver;
3534
use futures_lite::StreamExt;
36-
use iroh::get::{DataStream, Options};
35+
use iroh::blobs::Collection;
36+
use iroh::get::DataStream;
3737
use iroh::progress::ProgressEmitter;
3838
use iroh::protocol::AuthToken;
3939
use iroh::provider::{DataSource, Event, Provider, Ticket};
@@ -53,6 +53,8 @@ use crate::{e2ee, EventType};
5353

5454
use super::{export_database, DBFILE_BACKUP_NAME};
5555

56+
const MAX_CONCURRENT_DIALS: u8 = 16;
57+
5658
/// Provide or send a backup of this device.
5759
///
5860
/// This creates a backup of the current device and starts a service which offers another
@@ -387,122 +389,74 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
387389
}
388390

389391
async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> {
390-
let mut ticket = match qr {
392+
let ticket = match qr {
391393
Qr::Backup { ticket } => ticket,
392394
_ => bail!("QR code for backup must be of type DCBACKUP"),
393395
};
394396
if ticket.addrs.is_empty() {
395397
bail!("ticket is missing addresses to dial");
396398
}
397399

398-
// Crude sorting, most local wifi's are in the 192.168.0.0/24 range so this will try
399-
// them first.
400-
ticket.addrs.sort_by(|a, b| {
401-
let a = a.to_string();
402-
let b = b.to_string();
403-
if a.starts_with("192.168.") && !b.starts_with("192.168.") {
404-
Ordering::Less
405-
} else if b.starts_with("192.168.") && !a.starts_with("192.168.") {
406-
Ordering::Greater
407-
} else {
408-
Ordering::Equal
400+
match transfer_from_provider(context, &ticket).await {
401+
Ok(()) => {
402+
delete_and_reset_all_device_msgs(context).await?;
403+
context.emit_event(ReceiveProgress::Completed.into());
404+
Ok(())
409405
}
410-
});
411-
for addr in &ticket.addrs {
412-
let opts = Options {
413-
addr: *addr,
414-
peer_id: Some(ticket.peer),
415-
keylog: false,
416-
};
417-
info!(context, "attempting to contact {}", addr);
418-
match transfer_from_provider(context, &ticket, opts).await {
419-
Ok(_) => {
420-
delete_and_reset_all_device_msgs(context).await?;
421-
context.emit_event(ReceiveProgress::Completed.into());
422-
return Ok(());
423-
}
424-
Err(TransferError::ConnectionError(err)) => {
425-
warn!(context, "Connection error: {err:#}.");
426-
continue;
427-
}
428-
Err(TransferError::Other(err)) => {
429-
// Clean up any blobs we already wrote.
430-
let readdir = fs::read_dir(context.get_blobdir()).await?;
431-
let mut readdir = ReadDirStream::new(readdir);
432-
while let Some(dirent) = readdir.next().await {
433-
if let Ok(dirent) = dirent {
434-
fs::remove_file(dirent.path()).await.ok();
435-
}
406+
Err(err) => {
407+
// Clean up any blobs we already wrote.
408+
let readdir = fs::read_dir(context.get_blobdir()).await?;
409+
let mut readdir = ReadDirStream::new(readdir);
410+
while let Some(dirent) = readdir.next().await {
411+
if let Ok(dirent) = dirent {
412+
fs::remove_file(dirent.path()).await.ok();
436413
}
437-
context.emit_event(ReceiveProgress::Failed.into());
438-
return Err(err);
439414
}
415+
context.emit_event(ReceiveProgress::Failed.into());
416+
Err(err)
440417
}
441418
}
442-
Err(anyhow!("failed to contact provider"))
443-
}
444-
445-
/// Error during a single transfer attempt.
446-
///
447-
/// Mostly exists to distinguish between `ConnectionError` and any other errors.
448-
#[derive(Debug, thiserror::Error)]
449-
enum TransferError {
450-
#[error("connection error")]
451-
ConnectionError(#[source] anyhow::Error),
452-
#[error("other")]
453-
Other(#[source] anyhow::Error),
454419
}
455420

456-
async fn transfer_from_provider(
457-
context: &Context,
458-
ticket: &Ticket,
459-
opts: Options,
460-
) -> Result<(), TransferError> {
421+
async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()> {
461422
let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress());
462423
spawn_progress_proxy(context.clone(), progress.subscribe());
463-
let mut connected = false;
464424
let on_connected = || {
465425
context.emit_event(ReceiveProgress::Connected.into());
466-
connected = true;
426+
async { Ok(()) }
427+
};
428+
let on_collection = |collection: &Collection| {
429+
context.emit_event(ReceiveProgress::CollectionReceived.into());
430+
progress.set_total(collection.total_blobs_size());
467431
async { Ok(()) }
468432
};
469433
let jobs = Mutex::new(JoinSet::default());
470434
let on_blob =
471435
|hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name);
472-
let res = iroh::get::run(
473-
ticket.hash,
474-
ticket.token,
475-
opts,
436+
437+
// Perform the transfer.
438+
let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality
439+
let stats = iroh::get::run_ticket(
440+
ticket,
441+
keylog,
442+
MAX_CONCURRENT_DIALS,
476443
on_connected,
477-
|collection| {
478-
context.emit_event(ReceiveProgress::CollectionReceived.into());
479-
progress.set_total(collection.total_blobs_size());
480-
async { Ok(()) }
481-
},
444+
on_collection,
482445
on_blob,
483446
)
484-
.await;
447+
.await?;
485448

486449
let mut jobs = jobs.lock().await;
487450
while let Some(job) = jobs.join_next().await {
488-
job.context("job failed").map_err(TransferError::Other)?;
451+
job.context("job failed")?;
489452
}
490-
491453
drop(progress);
492-
match res {
493-
Ok(stats) => {
494-
info!(
495-
context,
496-
"Backup transfer finished, transfer rate is {} Mbps.",
497-
stats.mbits()
498-
);
499-
Ok(())
500-
}
501-
Err(err) => match connected {
502-
true => Err(TransferError::Other(err)),
503-
false => Err(TransferError::ConnectionError(err)),
504-
},
505-
}
454+
info!(
455+
context,
456+
"Backup transfer finished, transfer rate was {} Mbps.",
457+
stats.mbits()
458+
);
459+
Ok(())
506460
}
507461

508462
/// Get callback when a blob is received from the provider.

0 commit comments

Comments
 (0)