Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

Commit e224b71

Browse files
bors[bot]ljedrz
andauthored
Merge #274
274: fix bogus empty bitswap messages r=ljedrz a=ljedrz We've been seeing `(empty message)` bitswap for a while, but these logs were bogus - this PR fixes those, and adds a small bitswap stress test that can be used for future bitswap debugging. Also removes some noisy logging and, as a drive-by, fixes a subscription-related log. Co-authored-by: ljedrz <ljedrz@gmail.com> Co-authored-by: ljedrz <ljedrz@users.noreply.github.com>
2 parents 9b316a3 + dc48504 commit e224b71

File tree

4 files changed

+93
-8
lines changed

4 files changed

+93
-8
lines changed

bitswap/src/behaviour.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
//! will allow providing and reciving IPFS blocks.
88
use crate::block::Block;
99
use crate::ledger::{Ledger, Message, Priority};
10-
use crate::protocol::BitswapConfig;
10+
use crate::protocol::{BitswapConfig, MessageWrapper};
1111
use cid::Cid;
1212
use fnv::FnvHashSet;
1313
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
@@ -208,7 +208,7 @@ impl Bitswap {
208208
}
209209

210210
impl NetworkBehaviour for Bitswap {
211-
type ProtocolsHandler = OneShotHandler<BitswapConfig, Message, Message>;
211+
type ProtocolsHandler = OneShotHandler<BitswapConfig, Message, MessageWrapper>;
212212
type OutEvent = BitswapEvent;
213213

214214
fn new_handler(&mut self) -> Self::ProtocolsHandler {
@@ -234,7 +234,16 @@ impl NetworkBehaviour for Bitswap {
234234
//self.connected_peers.remove(peer_id);
235235
}
236236

237-
fn inject_event(&mut self, source: PeerId, _connection: ConnectionId, mut message: Message) {
237+
fn inject_event(&mut self, source: PeerId, _connection: ConnectionId, message: MessageWrapper) {
238+
let mut message = match message {
239+
// we just sent an outgoing bitswap message, nothing to do here
240+
// FIXME: we could commit any pending stats accounting for this peer now
241+
// that the message may have sent, if we'd do such accounting
242+
MessageWrapper::Tx => return,
243+
// we've received a bitswap message, process it
244+
MessageWrapper::Rx(msg) => msg,
245+
};
246+
238247
debug!("bitswap: inject_event from {}: {:?}", source, message);
239248

240249
let current_wantlist = self.local_wantlist();

bitswap/src/protocol.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,37 @@ where
6868
type Future = FutureResult<Self::Output, Self::Error>;
6969

7070
#[inline]
71-
fn upgrade_outbound(self, mut socket: TSocket, info: Self::Info) -> Self::Future {
71+
fn upgrade_outbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future {
7272
Box::pin(async move {
73-
debug!("upgrade_outbound: {}", std::str::from_utf8(info).unwrap());
7473
let bytes = self.to_bytes();
75-
upgrade::write_one(&mut socket, bytes).await?;
76-
Ok(())
74+
upgrade::write_one(&mut socket, bytes).await
7775
})
7876
}
7977
}
8078

79+
/// An object to facilitate communication between the `OneShotHandler` and the `BitswapHandler`.
80+
#[derive(Debug)]
81+
pub enum MessageWrapper {
82+
/// We received a `Message` from a remote.
83+
Rx(Message),
84+
/// We successfully sent a `Message`.
85+
Tx,
86+
}
87+
88+
impl From<Message> for MessageWrapper {
89+
#[inline]
90+
fn from(message: Message) -> Self {
91+
Self::Rx(message)
92+
}
93+
}
94+
95+
impl From<()> for MessageWrapper {
96+
#[inline]
97+
fn from(_: ()) -> Self {
98+
Self::Tx
99+
}
100+
}
101+
81102
#[cfg(test)]
82103
mod tests {
83104
/*

src/subscription.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,15 @@ impl<TRes: Debug + Clone + PartialEq> SubscriptionRegistry<TRes> {
128128
/// Finalizes all pending subscriptions of the specified kind with the given `result`.
129129
///
130130
pub fn finish_subscription(&self, req_kind: RequestKind, result: TRes) {
131-
debug!("Finishing the subscription to {}", req_kind);
132131
let mut subscriptions = task::block_on(async { self.subscriptions.lock().await });
133132
let related_subs = subscriptions.get_mut(&req_kind);
134133

135134
// find all the matching `Subscription`s and wake up their tasks; only `Pending`
136135
// ones have an associated `SubscriptionFuture` and there can be multiple of them
137136
// depending on how many times the given request kind was filed
138137
if let Some(related_subs) = related_subs {
138+
debug!("Finishing the subscription to {}", req_kind);
139+
139140
for sub in related_subs.values_mut() {
140141
if let Subscription::Pending { .. } = sub {
141142
sub.wake(result.clone());

tests/bitswap_stress_test.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use ipfs::{Block, Node};
2+
use libipld::cid::{Cid, Codec};
3+
use multihash::Sha2_256;
4+
5+
fn filter(i: usize) -> bool {
6+
i % 2 == 0
7+
}
8+
9+
// this test is designed to trigger unfavorable conditions for the bitswap
10+
// protocol by putting blocks in every second node and attempting to get
11+
// them from the other nodes; intended to be used for debugging or stress
12+
// testing the bitswap protocol (though it would be advised to uncomment
13+
// the tracing_subscriber for stress-testing purposes)
14+
#[ignore]
15+
#[async_std::test]
16+
async fn bitswap_stress_test() {
17+
tracing_subscriber::fmt::init();
18+
19+
let data = b"hello block\n".to_vec().into_boxed_slice();
20+
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
21+
22+
const NODE_COUNT: usize = 3;
23+
24+
let mut nodes = Vec::with_capacity(NODE_COUNT);
25+
for i in 0..NODE_COUNT {
26+
nodes.push(Node::new(i.to_string()).await);
27+
}
28+
29+
for i in 0..NODE_COUNT {
30+
for (j, peer) in nodes.iter().enumerate() {
31+
if i != j {
32+
let (_, mut addrs) = peer.identity().await.unwrap();
33+
nodes[i].connect(addrs.pop().unwrap()).await.unwrap();
34+
}
35+
}
36+
}
37+
38+
for (i, node) in nodes.iter().enumerate() {
39+
if filter(i) {
40+
node.put_block(Block {
41+
cid: cid.clone(),
42+
data: data.clone(),
43+
})
44+
.await
45+
.unwrap();
46+
}
47+
}
48+
49+
for (i, node) in nodes.iter().enumerate() {
50+
if !filter(i) {
51+
node.get_block(&cid).await.unwrap();
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)