Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

Commit cca0956

Browse files
bors[bot]ljedrz
andauthored
Merge #375
375: A fistful of tweaks r=aphelionz a=ljedrz A pretty wide range of test improvements, small fixes, refactorings and renames. Each commit describes the specific change, but I'll list the more noteworthy ones: - automatically remove `Protocol::P2p(_)` from the `Multiaddr`s passed to `Ipfs::add_peer` - only produce a `RepoEvent::RemovedBlock` when a block was actually removed - add unit tests for the `Node` topologies Co-authored-by: ljedrz <ljedrz@gmail.com>
2 parents b42b913 + 95ffe3b commit cca0956

File tree

10 files changed

+160
-150
lines changed

10 files changed

+160
-150
lines changed

http/src/main.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ enum Options {
2626

2727
fn main() {
2828
if std::env::var_os("RUST_LOG").is_none() {
29-
// FIXME: see if tracing could be used as the frontend for log macros
30-
// FIXME: use log macros here as well
3129
std::env::set_var(
3230
"RUST_LOG",
3331
"ipfs_http=trace,ipfs=trace,bitswap=trace,ipfs_unixfs=trace",

src/lib.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,7 +1542,7 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
15421542
match evt {
15431543
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
15441544
RepoEvent::UnwantBlock(cid) => self.swarm.bitswap().cancel_block(&cid),
1545-
RepoEvent::ProvideBlock(cid, ret) => {
1545+
RepoEvent::NewBlock(cid, ret) => {
15461546
// TODO: consider if cancel is applicable in cases where we provide the
15471547
// associated Block ourselves
15481548
self.swarm.bitswap().cancel_block(&cid);
@@ -1554,7 +1554,7 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
15541554
let _ = ret.send(Err(anyhow!("not actively providing blocks yet")));
15551555
}
15561556
}
1557-
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
1557+
RepoEvent::RemovedBlock(cid) => self.swarm.stop_providing_block(&cid),
15581558
}
15591559
}
15601560

@@ -1665,7 +1665,12 @@ mod node {
16651665
/// Add a known listen address of a peer participating in the DHT to the routing table.
16661666
/// This is mandatory in order for the peer to be discoverable by other members of the
16671667
/// DHT.
1668-
pub async fn add_peer(&self, peer_id: PeerId, addr: Multiaddr) -> Result<(), Error> {
1668+
pub async fn add_peer(&self, peer_id: PeerId, mut addr: Multiaddr) -> Result<(), Error> {
1669+
// Kademlia::add_address requires the address to not contain the PeerId
1670+
if matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
1671+
addr.pop();
1672+
}
1673+
16691674
self.to_task
16701675
.clone()
16711676
.send(IpfsEvent::AddPeer(peer_id, addr))

src/path.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -132,30 +132,6 @@ impl<T: Into<PathRoot>> From<T> for IpfsPath {
132132
}
133133
}
134134

135-
// FIXME: get rid of this; it would mean that there must be a clone to retain the rest of the path.
136-
impl TryInto<Cid> for IpfsPath {
137-
type Error = Error;
138-
139-
fn try_into(self) -> Result<Cid, Self::Error> {
140-
match self.root().cid() {
141-
Some(cid) => Ok(cid.to_owned()),
142-
None => Err(anyhow::anyhow!("expected cid")),
143-
}
144-
}
145-
}
146-
147-
// FIXME: get rid of this; it would mean that there must be a clone to retain the rest of the path.
148-
impl TryInto<PeerId> for IpfsPath {
149-
type Error = Error;
150-
151-
fn try_into(self) -> Result<PeerId, Self::Error> {
152-
match self.root().peer_id() {
153-
Some(peer_id) => Ok(peer_id.to_owned()),
154-
None => Err(anyhow::anyhow!("expected peer id")),
155-
}
156-
}
157-
}
158-
159135
/// SlashedPath is internal to IpfsPath variants, and basically holds a unixfs-compatible path
160136
/// where segments do not contain slashes but can pretty much contain all other valid UTF-8.
161137
///

src/repo/mod.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,11 @@ pub struct Repo<TRepoTypes: RepoTypes> {
219219
pub enum RepoEvent {
220220
WantBlock(Cid),
221221
UnwantBlock(Cid),
222-
ProvideBlock(
222+
NewBlock(
223223
Cid,
224224
oneshot::Sender<Result<SubscriptionFuture<KadResult, String>, anyhow::Error>>,
225225
),
226-
UnprovideBlock(Cid),
226+
RemovedBlock(Cid),
227227
}
228228

229229
impl TryFrom<RequestKind> for RepoEvent {
@@ -292,9 +292,8 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
292292
let (_cid, res) = self.block_store.put(block.clone()).await?;
293293

294294
// FIXME: this doesn't cause actual DHT providing yet, only some
295-
// bitswap housekeeping; RepoEvent::ProvideBlock should probably
296-
// be renamed to ::NewBlock and we might want to not ignore the
297-
// channel errors when we actually start providing on the DHT
295+
// bitswap housekeeping; we might want to not ignore the channel
296+
// errors when we actually start providing on the DHT
298297
if let BlockPut::NewBlock = res {
299298
self.subscriptions
300299
.finish_subscription(cid.clone().into(), Ok(block));
@@ -305,7 +304,7 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
305304

306305
self.events
307306
.clone()
308-
.send(RepoEvent::ProvideBlock(cid.clone(), tx))
307+
.send(RepoEvent::NewBlock(cid.clone(), tx))
309308
.await
310309
.ok();
311310

@@ -323,7 +322,7 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
323322
// FIXME: here's a race: block_store might give Ok(None) and we get to create our
324323
// subscription after the put has completed. So maybe create the subscription first, then
325324
// cancel it?
326-
if let Some(block) = self.block_store.get(&cid).await? {
325+
if let Some(block) = self.get_block_now(&cid).await? {
327326
Ok(block)
328327
} else {
329328
let subscription = self
@@ -342,11 +341,11 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
342341

343342
/// Retrives a block from the block store if it's available locally.
344343
pub async fn get_block_now(&self, cid: &Cid) -> Result<Option<Block>, Error> {
345-
Ok(self.block_store.get(&cid).await?)
344+
self.block_store.get(&cid).await
346345
}
347346

348347
pub async fn list_blocks(&self) -> Result<Vec<Cid>, Error> {
349-
Ok(self.block_store.list().await?)
348+
self.block_store.list().await
350349
}
351350

352351
/// Remove block from the block store.
@@ -355,20 +354,21 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
355354
return Err(anyhow::anyhow!("block to remove is pinned"));
356355
}
357356

358-
// sending only fails if the background task has exited
359-
self.events
360-
.clone()
361-
.send(RepoEvent::UnprovideBlock(cid.clone()))
362-
.await
363-
.ok();
364-
365357
// FIXME: Need to change location of pinning logic.
366358
// I like this pattern of the repo abstraction being some sort of
367359
// "clearing house" for the underlying result enums, but this
368360
// could potentially be pushed out out of here up to Ipfs, idk
369361
match self.block_store.remove(&cid).await? {
370362
Ok(success) => match success {
371-
BlockRm::Removed(_cid) => Ok(cid.clone()),
363+
BlockRm::Removed(_cid) => {
364+
// sending only fails if the background task has exited
365+
self.events
366+
.clone()
367+
.send(RepoEvent::RemovedBlock(cid.clone()))
368+
.await
369+
.ok();
370+
Ok(cid.clone())
371+
}
372372
},
373373
Err(err) => match err {
374374
BlockRmError::NotFound(_cid) => Err(anyhow::anyhow!("block not found")),

src/subscription.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ impl<T: Debug + Clone + PartialEq, E: Debug + Clone> SubscriptionRegistry<T, E>
173173
}
174174
}
175175

176-
trace!("Cancelled {} subscriptions", cancelled);
176+
if cancelled > 0 {
177+
trace!("Cancelled {} subscriptions", cancelled);
178+
}
177179
}
178180
}
179181

tests/block_exchange.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use cid::{Cid, Codec};
2+
use ipfs::Block;
3+
use multihash::Sha2_256;
4+
use std::time::Duration;
5+
use tokio::time::timeout;
6+
7+
mod common;
8+
use common::{spawn_nodes, Topology};
9+
10+
fn create_block() -> Block {
11+
let data = b"hello block\n".to_vec().into_boxed_slice();
12+
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
13+
14+
Block { cid, data }
15+
}
16+
17+
// verify that a put block can be received via get_block and the data matches
18+
#[tokio::test(max_threads = 1)]
19+
async fn two_node_put_get() {
20+
let nodes = spawn_nodes(2, Topology::Line).await;
21+
let block = create_block();
22+
23+
nodes[0].put_block(block.clone()).await.unwrap();
24+
let found_block = timeout(Duration::from_secs(10), nodes[1].get_block(&block.cid))
25+
.await
26+
.expect("get_block did not complete in time")
27+
.unwrap();
28+
29+
assert_eq!(block.data, found_block.data);
30+
}
31+
32+
// check that a long line of nodes still works with get_block
33+
#[tokio::test(max_threads = 1)]
34+
async fn long_get_block() {
35+
// this number could be higher, but it starts hanging above ~24
36+
const N: usize = 10;
37+
let nodes = spawn_nodes(N, Topology::Line).await;
38+
let block = create_block();
39+
40+
// the first node should get the block from the last one...
41+
nodes[N - 1].put_block(block.clone()).await.unwrap();
42+
nodes[0].get_block(&block.cid).await.unwrap();
43+
44+
// ...and the last one from the first one
45+
nodes[0].put_block(block.clone()).await.unwrap();
46+
nodes[N - 1].get_block(&block.cid).await.unwrap();
47+
}

tests/common/mod.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,54 @@ pub async fn spawn_nodes(count: usize, topology: Topology) -> Vec<Node> {
6161

6262
nodes
6363
}
64+
65+
#[cfg(test)]
66+
mod tests {
67+
use super::*;
68+
69+
const N: usize = 5;
70+
71+
#[tokio::test(max_threads = 1)]
72+
async fn check_topology_line() {
73+
let nodes = spawn_nodes(N, Topology::Line).await;
74+
75+
for (i, node) in nodes.iter().enumerate() {
76+
if i == 0 || i == N - 1 {
77+
assert_eq!(node.peers().await.unwrap().len(), 1);
78+
} else {
79+
assert_eq!(node.peers().await.unwrap().len(), 2);
80+
}
81+
}
82+
}
83+
84+
#[tokio::test(max_threads = 1)]
85+
async fn check_topology_ring() {
86+
let nodes = spawn_nodes(N, Topology::Ring).await;
87+
88+
for node in &nodes {
89+
assert_eq!(node.peers().await.unwrap().len(), 2);
90+
}
91+
}
92+
93+
#[tokio::test(max_threads = 1)]
94+
async fn check_topology_mesh() {
95+
let nodes = spawn_nodes(N, Topology::Mesh).await;
96+
97+
for node in &nodes {
98+
assert_eq!(node.peers().await.unwrap().len(), N - 1);
99+
}
100+
}
101+
102+
#[tokio::test(max_threads = 1)]
103+
async fn check_topology_star() {
104+
let nodes = spawn_nodes(N, Topology::Star).await;
105+
106+
for (i, node) in nodes.iter().enumerate() {
107+
if i == 0 {
108+
assert_eq!(node.peers().await.unwrap().len(), N - 1);
109+
} else {
110+
assert_eq!(node.peers().await.unwrap().len(), 1);
111+
}
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)