From 75236d28ed69f8fe85023c064795afe11e66b5ee Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Fri, 3 Jan 2025 23:45:17 -0500 Subject: [PATCH] feat: get multiple blocks in unixfs operations --- src/unixfs/cat.rs | 53 ++++++++++++++------------ src/unixfs/get.rs | 96 +++++++++++++++++++++++++---------------------- 2 files changed, 81 insertions(+), 68 deletions(-) diff --git a/src/unixfs/cat.rs b/src/unixfs/cat.rs index 4ee612363..ce8ffb92c 100644 --- a/src/unixfs/cat.rs +++ b/src/unixfs/cat.rs @@ -215,32 +215,37 @@ impl Stream for UnixfsCat { // would probably always cost many unnecessary clones, but it would be nice to "shut" // the subscriber so that it will only resolve to a value but still keep the operation // going. Not that we have any "operation" concept of the Want yet. - let (next, _) = visit.pending_links(); - + let (next, pending) = visit.pending_links(); + let borrow = &repo; - let block = borrow.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await.map_err(|e| TraversalFailed::Loading(*next, e))?; - - let (bytes, next_visit) = visit.continue_walk(block.data(), &mut cache).map_err(|e| TraversalFailed::Walking(*block.cid(), e))?; + + let list = std::iter::once(next).chain(pending); + + let mut block = borrow.get_blocks(list).providers(&providers).set_local(local_only).timeout(timeout); + + while let Some(block) = block.next().await { + let block = block.map_err(|e| TraversalFailed::Io(std::io::Error::other(e)))?; + let (bytes, next_visit) = visit.continue_walk(block.data(), &mut cache).map_err(|e| TraversalFailed::Walking(*block.cid(), e))?; - size += bytes.len(); - - if let Some(length) = length { - if size > length { - let fn_err = || Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length }); - fn_err()?; - return; + size += bytes.len(); + + if let Some(length) = length { + if size > length { + let fn_err = || Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length }); + fn_err()?; + return; + } + } + + if !bytes.is_empty() { + yield Bytes::copy_from_slice(bytes); + } + + match next_visit { + Some(v) => visit = v, + None => return, } } - - if !bytes.is_empty() { - yield Bytes::copy_from_slice(bytes); - } - - match next_visit { - Some(v) => visit = v, - None => return, - } - } }.boxed(); @@ -265,8 +270,8 @@ impl std::future::IntoFuture for UnixfsCat { } Ok(data.into()) } - .instrument(span) - .boxed() + .instrument(span) + .boxed() } } diff --git a/src/unixfs/get.rs b/src/unixfs/get.rs index d08cc8632..9cea3aa82 100644 --- a/src/unixfs/get.rs +++ b/src/unixfs/get.rs @@ -150,57 +150,65 @@ impl Stream for UnixfsGet { let mut walker = Walker::new(*cid, root_name); while walker.should_continue() { - let (next, _) = walker.pending_links(); - let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await { - Ok(block) => block, - Err(e) => { - yield UnixfsStatus::FailedStatus { written, total_size, error: e }; - return; - } - }; - let block_data = block.data(); + let (next, pending) = walker.pending_links(); - match walker.next(block_data, &mut cache) { - Ok(ContinuedWalk::Bucket(..)) => {} - Ok(ContinuedWalk::File(segment, _, _, _, size)) => { + let list = std::iter::once(next).chain(pending); - if segment.is_first() { - total_size = Some(size as usize); - yield UnixfsStatus::ProgressStatus { written, total_size }; + let mut blocks = repo.get_blocks(list).providers(&providers).set_local(local_only).timeout(timeout); + + while let Some(result) = blocks.next().await { + let block = match result { + Ok(block) => block, + Err(e) => { + yield UnixfsStatus::FailedStatus { written, total_size, error: e }; + return; } - // even if the largest of files can have 256 kB blocks and about the same - // amount of content, try to consume it in small parts not to grow the buffers - // too much. - - let mut n = 0usize; - let slice = segment.as_ref(); - let total = slice.len(); - - while n < total { - let next = &slice[n..]; - n += next.len(); - if let Err(e) = file.write_all(next).await { - yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() }; - return; + }; + + let block_data = block.data(); + + match walker.next(block_data, &mut cache) { + Ok(ContinuedWalk::Bucket(..)) => {} + Ok(ContinuedWalk::File(segment, _, _, _, size)) => { + + if segment.is_first() { + total_size = Some(size as usize); + yield UnixfsStatus::ProgressStatus { written, total_size }; } - if let Err(e) = file.sync_all().await { - yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() }; - return; + // even if the largest of files can have 256 kB blocks and about the same + // amount of content, try to consume it in small parts not to grow the buffers + // too much. + + let mut n = 0usize; + let slice = segment.as_ref(); + let total = slice.len(); + + while n < total { + let next = &slice[n..]; + n += next.len(); + if let Err(e) = file.write_all(next).await { + yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() }; + return; + } + if let Err(e) = file.sync_all().await { + yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() }; + return; + } + + written += n; } - written += n; - } - - yield UnixfsStatus::ProgressStatus { written, total_size }; + yield UnixfsStatus::ProgressStatus { written, total_size }; - }, - Ok(ContinuedWalk::Directory( .. )) | Ok(ContinuedWalk::RootDirectory( .. )) => {}, //TODO - Ok(ContinuedWalk::Symlink( .. )) => {}, - Err(e) => { - yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() }; - return; - } - }; + }, + Ok(ContinuedWalk::Directory( .. )) | Ok(ContinuedWalk::RootDirectory( .. )) => {}, //TODO + Ok(ContinuedWalk::Symlink( .. )) => {}, + Err(e) => { + yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() }; + return; + } + }; + } }; yield UnixfsStatus::CompletedStatus { path, written, total_size }