Skip to content

Commit 4c701d2

Browse files
committed
docs(examples): screening-connection example
1 parent 518400b commit 4c701d2

File tree

1 file changed

+149
-0
lines changed

1 file changed

+149
-0
lines changed

iroh/examples/screening-connection.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
//! Very basic example to showcase how to write a protocol that rejects new
2+
//! connections based on internal state. Useful when you want an endpoint to
3+
//! stop accepting new connections for some reason only known to the node. Maybe
4+
//! it's doing a migration, starting up, in a "maintenance mode", or serving
5+
//! too many connections.
6+
//!
7+
//! ## Usage
8+
//!
9+
//! cargo run --example screening-connection --features=examples
10+
use std::sync::atomic::{AtomicU64, Ordering};
11+
use std::sync::Arc;
12+
13+
use iroh::endpoint::Connecting;
14+
use iroh::{
15+
endpoint::Connection,
16+
protocol::{AcceptError, ProtocolHandler, Router},
17+
Endpoint, NodeAddr,
18+
};
19+
use n0_snafu::{Result, ResultExt};
20+
use n0_watcher::Watcher as _;
21+
22+
/// Each protocol is identified by its ALPN string.
23+
///
24+
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
25+
/// and the connection is aborted unless both nodes pass the same bytestring.
26+
const ALPN: &[u8] = b"iroh-example/screening-connection/0";
27+
28+
#[tokio::main]
29+
async fn main() -> Result<()> {
30+
let router = start_accept_side().await?;
31+
let node_addr = router.endpoint().node_addr().initialized().await?;
32+
33+
connect_side(&node_addr).await?;
34+
match connect_side(&node_addr).await {
35+
Ok(()) => {}
36+
Err(err) => {
37+
eprintln!("Error connecting: {}", err);
38+
}
39+
}
40+
connect_side(&node_addr).await?;
41+
42+
// This makes sure the endpoint in the router is closed properly and connections close gracefully
43+
router.shutdown().await.e()?;
44+
45+
Ok(())
46+
}
47+
48+
async fn connect_side(addr: &NodeAddr) -> Result<()> {
49+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
50+
51+
// Open a connection to the accepting node
52+
let conn = endpoint.connect(addr.clone(), ALPN).await?;
53+
54+
// Open a bidirectional QUIC stream
55+
let (mut send, mut recv) = conn.open_bi().await.e()?;
56+
57+
// Send some data to be echoed
58+
send.write_all(b"Hello, world!").await.e()?;
59+
60+
// Signal the end of data for this particular stream
61+
send.finish().e()?;
62+
63+
// Receive the echo, but limit reading up to maximum 1000 bytes
64+
let response = recv.read_to_end(1000).await.e()?;
65+
assert_eq!(&response, b"Hello, world!");
66+
67+
// Explicitly close the whole connection.
68+
conn.close(0u32.into(), b"bye!");
69+
70+
// The above call only queues a close message to be sent (see how it's not async!).
71+
// We need to actually call this to make sure this message is sent out.
72+
endpoint.close().await;
73+
// If we don't call this, but continue using the endpoint, we then the queued
74+
// close call will eventually be picked up and sent.
75+
// But always try to wait for endpoint.close().await to go through before dropping
76+
// the endpoint to ensure any queued messages are sent through and connections are
77+
// closed gracefully.
78+
Ok(())
79+
}
80+
81+
async fn start_accept_side() -> Result<Router> {
82+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
83+
84+
let ping = ScreenedEcho {
85+
conn_attempt_count: Arc::new(AtomicU64::new(0)),
86+
};
87+
88+
// Build our protocol handler and add our protocol, identified by its ALPN, and spawn the node.
89+
let router = Router::builder(endpoint).accept(ALPN, ping).spawn();
90+
91+
Ok(router)
92+
}
93+
94+
/// This is the same as the echo example, but keeps an internal count of the
95+
/// number of connections that have been attempted
96+
#[derive(Debug, Clone)]
97+
struct ScreenedEcho {
98+
conn_attempt_count: Arc<AtomicU64>,
99+
}
100+
101+
impl ProtocolHandler for ScreenedEcho {
102+
/// `on_connecting` allows us to intercept a connection as it's being formed,
103+
/// which is the right place to cut off a connection as early as possible.
104+
/// This is an optional method on the ProtocolHandler trait.
105+
async fn on_connecting(&self, connecting: Connecting) -> Result<Connection, AcceptError> {
106+
self.conn_attempt_count.fetch_add(1, Ordering::Relaxed);
107+
let count = self.conn_attempt_count.load(Ordering::Relaxed);
108+
109+
// reject every other connection
110+
if count % 2 == 0 {
111+
println!("rejecting connection");
112+
return Err(AcceptError::NotAllowed {});
113+
}
114+
115+
// To allow the connection to consturct as normal, we simply await the
116+
// connecting future & return the newly-fromed connection
117+
let conn = connecting.await?;
118+
Ok(conn)
119+
}
120+
121+
/// The `accept` method is called for each incoming connection for our ALPN.
122+
///
123+
/// The returned future runs on a newly spawned tokio task, so it can run as long as
124+
/// the connection lasts.
125+
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
126+
// We can get the remote's node id from the connection.
127+
let node_id = connection.remote_node_id()?;
128+
println!("accepted connection from {node_id}");
129+
130+
// Our protocol is a simple request-response protocol, so we expect the
131+
// connecting peer to open a single bi-directional stream.
132+
let (mut send, mut recv) = connection.accept_bi().await?;
133+
134+
// Echo any bytes received back directly.
135+
// This will keep copying until the sender signals the end of data on the stream.
136+
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
137+
println!("Copied over {bytes_sent} byte(s)");
138+
139+
// By calling `finish` on the send stream we signal that we will not send anything
140+
// further, which makes the receive stream on the other end terminate.
141+
send.finish()?;
142+
143+
// Wait until the remote closes the connection, which it does once it
144+
// received the response.
145+
connection.closed().await;
146+
147+
Ok(())
148+
}
149+
}

0 commit comments

Comments
 (0)