Skip to content

Commit c4ae523

Browse files
dvc94chelenaf9
authored andcommitted
Implement protocol.
1 parent 53bd428 commit c4ae523

File tree

3 files changed

+281
-0
lines changed

3 files changed

+281
-0
lines changed

protocols/autonat/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ categories = ["network-programming", "asynchronous"]
1212
prost-build = "0.6"
1313

1414
[dependencies]
15+
async-trait = "0.1.51"
16+
futures = "0.3.17"
1517
libp2p-core = { version = "0.30.0", path = "../../core" }
1618
libp2p-swarm = { version = "0.31.0", path = "../../swarm" }
19+
libp2p-request-response = { version = "0.13.0", path = "../request-response" }
1720
prost = "0.8.0"

protocols/autonat/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
pub use self::autonat::{AutoNat, AutoNatEvent};
2424

2525
mod autonat;
26+
mod protocol;
2627

2728
mod structs_proto {
2829
include!(concat!(env!("OUT_DIR"), "/structs.rs"));

protocols/autonat/src/protocol.rs

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
// Copyright 2018 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use crate::structs_proto;
22+
pub use crate::structs_proto::message::ResponseStatus;
23+
use async_trait::async_trait;
24+
use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
25+
use libp2p_core::{upgrade, Multiaddr, PeerId};
26+
use libp2p_request_response::{ProtocolName, RequestResponseCodec};
27+
use prost::Message;
28+
use std::{convert::TryFrom, io};
29+
30+
#[derive(Clone, Debug)]
31+
pub struct AutoNatProtocol;
32+
33+
impl ProtocolName for AutoNatProtocol {
34+
fn protocol_name(&self) -> &[u8] {
35+
b"/libp2p/autonat/1.0.0"
36+
}
37+
}
38+
39+
#[derive(Clone)]
40+
pub struct AutoNatCodec;
41+
42+
#[async_trait]
43+
impl RequestResponseCodec for AutoNatCodec {
44+
type Protocol = AutoNatProtocol;
45+
type Request = DialRequest;
46+
type Response = DialResponse;
47+
48+
async fn read_request<T>(
49+
&mut self,
50+
_: &AutoNatProtocol,
51+
io: &mut T,
52+
) -> io::Result<Self::Request>
53+
where
54+
T: AsyncRead + Send + Unpin,
55+
{
56+
let bytes = upgrade::read_length_prefixed(io, 1024)
57+
.await
58+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
59+
let request = DialRequest::from_bytes(&bytes)
60+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
61+
Ok(request)
62+
}
63+
64+
async fn read_response<T>(
65+
&mut self,
66+
_: &AutoNatProtocol,
67+
io: &mut T,
68+
) -> io::Result<Self::Response>
69+
where
70+
T: AsyncRead + Send + Unpin,
71+
{
72+
let bytes = upgrade::read_length_prefixed(io, 1024)
73+
.await
74+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
75+
let response = DialResponse::from_bytes(&bytes)
76+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
77+
Ok(response)
78+
}
79+
80+
async fn write_request<T>(
81+
&mut self,
82+
_: &AutoNatProtocol,
83+
io: &mut T,
84+
data: Self::Request,
85+
) -> io::Result<()>
86+
where
87+
T: AsyncWrite + Send + Unpin,
88+
{
89+
upgrade::write_length_prefixed(io, data.to_bytes()).await?;
90+
io.close().await
91+
}
92+
93+
async fn write_response<T>(
94+
&mut self,
95+
_: &AutoNatProtocol,
96+
io: &mut T,
97+
data: Self::Response,
98+
) -> io::Result<()>
99+
where
100+
T: AsyncWrite + Send + Unpin,
101+
{
102+
upgrade::write_length_prefixed(io, data.to_bytes()).await?;
103+
io.close().await
104+
}
105+
}
106+
107+
#[derive(Clone, Debug, Eq, PartialEq)]
108+
pub struct DialRequest {
109+
pub peer_id: PeerId,
110+
pub addrs: Vec<Multiaddr>,
111+
}
112+
113+
impl DialRequest {
114+
pub fn from_bytes(bytes: &[u8]) -> Result<Self, io::Error> {
115+
let msg = structs_proto::Message::decode(bytes)
116+
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
117+
if msg.r#type != Some(structs_proto::message::MessageType::Dial as _) {
118+
return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid type"));
119+
}
120+
let (peer_id, addrs) = if let Some(structs_proto::message::Dial {
121+
peer:
122+
Some(structs_proto::message::PeerInfo {
123+
id: Some(peer_id),
124+
addrs,
125+
}),
126+
}) = msg.dial
127+
{
128+
(peer_id, addrs)
129+
} else {
130+
return Err(io::Error::new(
131+
io::ErrorKind::InvalidData,
132+
"invalid dial message",
133+
));
134+
};
135+
136+
let peer_id = {
137+
PeerId::try_from(peer_id)
138+
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid peer id"))?
139+
};
140+
let addrs = {
141+
let mut maddrs = vec![];
142+
for addr in addrs.into_iter() {
143+
let maddr = Multiaddr::try_from(addr)
144+
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
145+
maddrs.push(maddr);
146+
}
147+
maddrs
148+
};
149+
Ok(Self { peer_id, addrs })
150+
}
151+
152+
pub fn to_bytes(self) -> Vec<u8> {
153+
let peer_id = self.peer_id.to_bytes();
154+
let addrs = self.addrs.into_iter().map(|addr| addr.to_vec()).collect();
155+
156+
let msg = structs_proto::Message {
157+
r#type: Some(structs_proto::message::MessageType::Dial as _),
158+
dial: Some(structs_proto::message::Dial {
159+
peer: Some(structs_proto::message::PeerInfo {
160+
id: Some(peer_id),
161+
addrs: addrs,
162+
}),
163+
}),
164+
dial_response: None,
165+
};
166+
167+
let mut bytes = Vec::with_capacity(msg.encoded_len());
168+
msg.encode(&mut bytes)
169+
.expect("Vec<u8> provides capacity as needed");
170+
bytes
171+
}
172+
}
173+
174+
#[derive(Clone, Debug, Eq, PartialEq)]
175+
pub enum DialResponse {
176+
Ok(Multiaddr),
177+
Err(ResponseStatus, String),
178+
}
179+
180+
impl DialResponse {
181+
pub fn from_bytes(bytes: &[u8]) -> Result<Self, io::Error> {
182+
let msg = structs_proto::Message::decode(bytes)
183+
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
184+
if msg.r#type != Some(structs_proto::message::MessageType::DialResponse as _) {
185+
return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid type"));
186+
}
187+
188+
Ok(match msg.dial_response {
189+
Some(structs_proto::message::DialResponse {
190+
status: Some(0),
191+
status_text: None,
192+
addr: Some(addr),
193+
}) => {
194+
let addr = Multiaddr::try_from(addr)
195+
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
196+
Self::Ok(addr)
197+
}
198+
Some(structs_proto::message::DialResponse {
199+
status: Some(status),
200+
status_text,
201+
addr: None,
202+
}) => {
203+
let status = ResponseStatus::from_i32(status).ok_or_else(|| {
204+
io::Error::new(io::ErrorKind::InvalidData, "invalid status code")
205+
})?;
206+
Self::Err(status, status_text.unwrap_or_default())
207+
}
208+
_ => {
209+
return Err(io::Error::new(
210+
io::ErrorKind::InvalidData,
211+
"invalid dial response message",
212+
));
213+
}
214+
})
215+
}
216+
217+
pub fn to_bytes(self) -> Vec<u8> {
218+
let dial_response = match self {
219+
Self::Ok(addr) => structs_proto::message::DialResponse {
220+
status: Some(0),
221+
status_text: None,
222+
addr: Some(addr.to_vec()),
223+
},
224+
Self::Err(status, status_text) => structs_proto::message::DialResponse {
225+
status: Some(status as _),
226+
status_text: Some(status_text),
227+
addr: None,
228+
},
229+
};
230+
231+
let msg = structs_proto::Message {
232+
r#type: Some(structs_proto::message::MessageType::DialResponse as _),
233+
dial: None,
234+
dial_response: Some(dial_response),
235+
};
236+
237+
let mut bytes = Vec::with_capacity(msg.encoded_len());
238+
msg.encode(&mut bytes)
239+
.expect("Vec<u8> provides capacity as needed");
240+
bytes
241+
}
242+
}
243+
244+
#[cfg(test)]
245+
mod tests {
246+
use super::*;
247+
248+
#[test]
249+
fn test_request_encode_decode() {
250+
let request = DialRequest {
251+
peer_id: PeerId::random(),
252+
addrs: vec![
253+
"/ip4/8.8.8.8/tcp/30333".parse().unwrap(),
254+
"/ip4/192.168.1.42/tcp/30333".parse().unwrap(),
255+
],
256+
};
257+
let bytes = request.clone().to_bytes();
258+
let request2 = DialRequest::from_bytes(&bytes).unwrap();
259+
assert_eq!(request, request2);
260+
}
261+
262+
#[test]
263+
fn test_response_ok_encode_decode() {
264+
let response = DialResponse::Ok("/ip4/8.8.8.8/tcp/30333".parse().unwrap());
265+
let bytes = response.clone().to_bytes();
266+
let response2 = DialResponse::from_bytes(&bytes).unwrap();
267+
assert_eq!(response, response2);
268+
}
269+
270+
#[test]
271+
fn test_response_err_encode_decode() {
272+
let response = DialResponse::Err(ResponseStatus::EDialError, "failed to dial".into());
273+
let bytes = response.clone().to_bytes();
274+
let response2 = DialResponse::from_bytes(&bytes).unwrap();
275+
assert_eq!(response, response2);
276+
}
277+
}

0 commit comments

Comments
 (0)