Skip to content

feat: move Identify I/O from NetworkBehaviour to ConnectionHandler #3208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 37 additions & 77 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::handler::{self, Proto, Push};
use crate::protocol::{Info, ReplySubstream, UpgradeError};
use futures::prelude::*;
use crate::handler::{self, InEvent, Proto};
use crate::protocol::{Info, UpgradeError};
use libp2p_core::{
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey,
};
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{
dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError,
IntoConnectionHandler, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use lru::LruCache;
use std::num::NonZeroUsize;
use std::{
collections::{HashMap, HashSet, VecDeque},
iter::FromIterator,
pin::Pin,
task::Context,
task::Poll,
time::Duration,
Expand All @@ -51,8 +48,8 @@ pub struct Behaviour {
config: Config,
/// For each peer we're connected to, the observed address to send back to it.
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
/// Pending replies to send.
pending_replies: VecDeque<Reply>,
/// Pending requests to respond.
requests: VecDeque<Request>,
/// Pending events to be emitted when polled.
events: VecDeque<NetworkBehaviourAction<Event, Proto>>,
/// Peers to which an active push with current information about
Expand All @@ -62,19 +59,10 @@ pub struct Behaviour {
discovered_peers: PeerCache,
}

/// A pending reply to an inbound identification request.
enum Reply {
/// The reply is queued for sending.
Queued {
peer: PeerId,
io: ReplySubstream<NegotiatedSubstream>,
observed: Multiaddr,
},
/// The reply is being sent.
Sending {
peer: PeerId,
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
},
/// An inbound identification request.
struct Request {
peer: PeerId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler could learn this via IntoConnectionHandler. It will never change across its lifespan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated!

observed: Multiaddr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, a Handler can learn this via IntoConnectionHandler and it won't change1 across its lifetime.

Footnotes

  1. AddressChange is not emitted at the moment but once it will, it is directly given to the handler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we only get it on connection_established which seems to be called after new_handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to implement IntoConnectionHandler additionally on a struct that you return from new_handler, said struct can then capture the ConnectedPoint and initialise the ConnectionHandler with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Thomas, updated!

}

/// Configuration for the [`identify::Behaviour`](Behaviour).
Expand Down Expand Up @@ -184,7 +172,7 @@ impl Behaviour {
Self {
config,
connected: HashMap::new(),
pending_replies: VecDeque::new(),
requests: VecDeque::new(),
events: VecDeque::new(),
pending_push: HashSet::new(),
discovered_peers,
Expand Down Expand Up @@ -271,13 +259,19 @@ impl NetworkBehaviour for Behaviour {
score: AddressScore::Finite(1),
});
}
handler::Event::Identification(peer) => {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent {
peer_id: peer,
}));
}
handler::Event::IdentificationPushed => {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed {
peer_id,
}));
}
handler::Event::Identify(sender) => {
handler::Event::Identify => {
let observed = self
.connected
.get(&peer_id)
Expand All @@ -287,9 +281,8 @@ impl NetworkBehaviour for Behaviour {
with an established connection and calling `NetworkBehaviour::on_event` \
with `FromSwarm::ConnectionEstablished ensures there is an entry; qed",
);
self.pending_replies.push_back(Reply::Queued {
self.requests.push_back(Request {
peer: peer_id,
io: sender,
observed: observed.clone(),
});
}
Expand All @@ -305,7 +298,7 @@ impl NetworkBehaviour for Behaviour {

fn poll(
&mut self,
cx: &mut Context<'_>,
_cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(event) = self.events.pop_front() {
Expand Down Expand Up @@ -333,7 +326,7 @@ impl NetworkBehaviour for Behaviour {
observed_addr,
};

(*peer, Push(info))
(*peer, InEvent::Push(info))
})
});

Expand All @@ -346,55 +339,21 @@ impl NetworkBehaviour for Behaviour {
});
}

// Check for pending replies to send.
if let Some(r) = self.pending_replies.pop_front() {
let mut sending = 0;
let to_send = self.pending_replies.len() + 1;
let mut reply = Some(r);
loop {
match reply {
Some(Reply::Queued { peer, io, observed }) => {
let info = Info {
listen_addrs: listen_addrs(params),
protocols: supported_protocols(params),
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
observed_addr: observed,
};
let io = Box::pin(io.send(info));
reply = Some(Reply::Sending { peer, io });
}
Some(Reply::Sending { peer, mut io }) => {
sending += 1;
match Future::poll(Pin::new(&mut io), cx) {
Poll::Ready(Ok(())) => {
let event = Event::Sent { peer_id: peer };
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending => {
self.pending_replies.push_back(Reply::Sending { peer, io });
if sending == to_send {
// All remaining futures are NotReady
break;
} else {
reply = self.pending_replies.pop_front();
}
}
Poll::Ready(Err(err)) => {
let event = Event::Error {
peer_id: peer,
error: ConnectionHandlerUpgrErr::Upgrade(
libp2p_core::upgrade::UpgradeError::Apply(err),
),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
}
}
None => unreachable!(),
}
}
// Check for pending requests to send back to the handler for reply.
if let Some(Request { peer, observed }) = self.requests.pop_front() {
let info = Info {
listen_addrs: listen_addrs(params),
protocols: supported_protocols(params),
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
observed_addr: observed,
};
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::Any,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the curlpit! This will be buggy with > 1 connection per peer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Thomas, addressed.

event: InEvent::Identify(info),
});
}

Poll::Pending
Expand Down Expand Up @@ -557,6 +516,7 @@ impl PeerCache {
mod tests {
use super::*;
use futures::pin_mut;
use futures::prelude::*;
use libp2p::mplex::MplexConfig;
use libp2p::noise;
use libp2p::tcp;
Expand Down Expand Up @@ -618,7 +578,7 @@ mod tests {

// nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by
// `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if
// `Handler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly.
async_std::task::block_on(async move {
loop {
Expand Down
Loading