Skip to content

Commit 2e7e369

Browse files
committed
docs(examples): screening-connection example
1 parent 518400b commit 2e7e369

File tree

1 file changed

+147
-0
lines changed

1 file changed

+147
-0
lines changed

iroh/examples/screening-connection.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
//! Very basic example to showcase how to write a protocol that rejects new
2+
//! connections based on internal state. Useful when you want an endpoint to
3+
//! stop accepting new connections for some reason only known to the node. Maybe
4+
//! it's doing a migration, starting up, in a "maintenance mode", or serving
5+
//! too many connections.
6+
//!
7+
//! ## Usage
8+
//!
9+
//! cargo run --example screening-connection --features=examples
10+
use std::sync::atomic::{AtomicU64, Ordering};
11+
use std::sync::Arc;
12+
13+
use iroh::endpoint::Connecting;
14+
use iroh::{
15+
endpoint::Connection,
16+
protocol::{AcceptError, ProtocolHandler, Router},
17+
Endpoint, NodeAddr,
18+
};
19+
use n0_snafu::{Result, ResultExt};
20+
use n0_watcher::Watcher as _;
21+
22+
/// Each protocol is identified by its ALPN string.
23+
///
24+
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
25+
/// and the connection is aborted unless both nodes pass the same bytestring.
26+
const ALPN: &[u8] = b"iroh-example/screening-connection/0";
27+
28+
#[tokio::main]
29+
async fn main() -> Result<()> {
30+
let router = start_accept_side().await?;
31+
let node_addr = router.endpoint().node_addr().initialized().await?;
32+
33+
// call connect three times. connection index 1 will be an odd number, and rejected.
34+
connect_side(&node_addr).await?;
35+
if let Err(err) = connect_side(&node_addr).await {
36+
println!("Error connecting: {}", err);
37+
}
38+
connect_side(&node_addr).await?;
39+
40+
// This makes sure the endpoint in the router is closed properly and connections close gracefully
41+
router.shutdown().await.e()?;
42+
43+
Ok(())
44+
}
45+
46+
async fn connect_side(addr: &NodeAddr) -> Result<()> {
47+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
48+
49+
// Open a connection to the accepting node
50+
let conn = endpoint.connect(addr.clone(), ALPN).await?;
51+
52+
// Open a bidirectional QUIC stream
53+
let (mut send, mut recv) = conn.open_bi().await.e()?;
54+
55+
// Send some data to be echoed
56+
send.write_all(b"Hello, world!").await.e()?;
57+
58+
// Signal the end of data for this particular stream
59+
send.finish().e()?;
60+
61+
// Receive the echo, but limit reading up to maximum 1000 bytes
62+
let response = recv.read_to_end(1000).await.e()?;
63+
assert_eq!(&response, b"Hello, world!");
64+
65+
// Explicitly close the whole connection.
66+
conn.close(0u32.into(), b"bye!");
67+
68+
// The above call only queues a close message to be sent (see how it's not async!).
69+
// We need to actually call this to make sure this message is sent out.
70+
endpoint.close().await;
71+
// If we don't call this, but continue using the endpoint, we then the queued
72+
// close call will eventually be picked up and sent.
73+
// But always try to wait for endpoint.close().await to go through before dropping
74+
// the endpoint to ensure any queued messages are sent through and connections are
75+
// closed gracefully.
76+
Ok(())
77+
}
78+
79+
async fn start_accept_side() -> Result<Router> {
80+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
81+
82+
let echo = ScreenedEcho {
83+
conn_attempt_count: Arc::new(AtomicU64::new(0)),
84+
};
85+
86+
// Build our protocol handler and add our protocol, identified by its ALPN, and spawn the node.
87+
let router = Router::builder(endpoint).accept(ALPN, echo).spawn();
88+
89+
Ok(router)
90+
}
91+
92+
/// This is the same as the echo example, but keeps an internal count of the
93+
/// number of connections that have been attempted
94+
#[derive(Debug, Clone)]
95+
struct ScreenedEcho {
96+
conn_attempt_count: Arc<AtomicU64>,
97+
}
98+
99+
impl ProtocolHandler for ScreenedEcho {
100+
/// `on_connecting` allows us to intercept a connection as it's being formed,
101+
/// which is the right place to cut off a connection as early as possible.
102+
/// This is an optional method on the ProtocolHandler trait.
103+
async fn on_connecting(&self, connecting: Connecting) -> Result<Connection, AcceptError> {
104+
self.conn_attempt_count.fetch_add(1, Ordering::Relaxed);
105+
let count = self.conn_attempt_count.load(Ordering::Relaxed);
106+
107+
// reject every other connection
108+
if count % 2 == 0 {
109+
println!("rejecting connection");
110+
return Err(AcceptError::NotAllowed {});
111+
}
112+
113+
// To allow the connection to consturct as normal, we simply await the
114+
// connecting future & return the newly-fromed connection
115+
let conn = connecting.await?;
116+
Ok(conn)
117+
}
118+
119+
/// The `accept` method is called for each incoming connection for our ALPN.
120+
///
121+
/// The returned future runs on a newly spawned tokio task, so it can run as long as
122+
/// the connection lasts.
123+
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
124+
// We can get the remote's node id from the connection.
125+
let node_id = connection.remote_node_id()?;
126+
println!("accepted connection from {node_id}");
127+
128+
// Our protocol is a simple request-response protocol, so we expect the
129+
// connecting peer to open a single bi-directional stream.
130+
let (mut send, mut recv) = connection.accept_bi().await?;
131+
132+
// Echo any bytes received back directly.
133+
// This will keep copying until the sender signals the end of data on the stream.
134+
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
135+
println!("Copied over {bytes_sent} byte(s)");
136+
137+
// By calling `finish` on the send stream we signal that we will not send anything
138+
// further, which makes the receive stream on the other end terminate.
139+
send.finish()?;
140+
141+
// Wait until the remote closes the connection, which it does once it
142+
// received the response.
143+
connection.closed().await;
144+
145+
Ok(())
146+
}
147+
}

0 commit comments

Comments
 (0)