Skip to content

Commit 14c6d1d

Browse files
committed
tests(iroh): add test to reconnect after forceful abort
1 parent ac2a3f2 commit 14c6d1d

File tree

1 file changed

+176
-0
lines changed

1 file changed

+176
-0
lines changed

iroh/tests/reconnect.rs

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
use std::time::Duration;
2+
3+
use iroh::{
4+
endpoint::{BindError, Connection},
5+
protocol::{AcceptError, ProtocolHandler, Router},
6+
Endpoint, NodeAddr, NodeId, RelayMap, RelayMode, SecretKey, Watcher,
7+
};
8+
use n0_future::time::timeout;
9+
use n0_snafu::{Result, ResultExt};
10+
use rand::SeedableRng;
11+
use tokio::sync::mpsc;
12+
use tokio_util::sync::CancellationToken;
13+
use tracing::info;
14+
15+
#[cfg(feature = "test-utils")]
16+
#[tokio::test]
17+
// #[traced_test]
18+
async fn can_die_and_reconnect() -> Result {
19+
tracing_subscriber::fmt::init();
20+
21+
let max_wait = Duration::from_secs(5);
22+
23+
#[derive(Debug, Clone)]
24+
struct TestProtocol(mpsc::Sender<(NodeId, String)>);
25+
26+
const TEST_ALPN: &[u8] = b"/iroh/test/1";
27+
28+
impl ProtocolHandler for TestProtocol {
29+
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
30+
let remote_node_id = connection.remote_node_id()?;
31+
let mut stream = connection.accept_uni().await?;
32+
let data = stream
33+
.read_to_end(64)
34+
.await
35+
.map_err(AcceptError::from_err)?;
36+
let s = String::from_utf8(data).map_err(AcceptError::from_err)?;
37+
self.0
38+
.send((remote_node_id, s))
39+
.await
40+
.map_err(AcceptError::from_err)?;
41+
Ok(())
42+
}
43+
}
44+
45+
/// Runs a future in a separate runtime on a separate thread, cancelling everything
46+
/// abruptly once `cancel` is invoked.
47+
fn run_in_thread<T: Send + 'static>(
48+
cancel: CancellationToken,
49+
fut: impl std::future::Future<Output = T> + Send + 'static,
50+
) -> std::thread::JoinHandle<Option<T>> {
51+
std::thread::spawn(move || {
52+
let rt = tokio::runtime::Builder::new_current_thread()
53+
.enable_all()
54+
.build()
55+
.unwrap();
56+
rt.block_on(async move { cancel.run_until_cancelled(fut).await })
57+
})
58+
}
59+
60+
/// Spawns a new client endpoint
61+
async fn spawn_client(
62+
secret_key: SecretKey,
63+
relay_map: RelayMap,
64+
) -> Result<Endpoint, BindError> {
65+
let ep = Endpoint::builder()
66+
.secret_key(secret_key)
67+
.relay_mode(RelayMode::Custom(relay_map))
68+
.insecure_skip_relay_cert_verify(true)
69+
.bind()
70+
.await?;
71+
Ok(ep)
72+
}
73+
74+
/// Spawn a server endpoint, sending incoming messages on `tx`.
75+
async fn spawn_server(
76+
secret_key: SecretKey,
77+
relay_map: RelayMap,
78+
tx: mpsc::Sender<(NodeId, String)>,
79+
) -> Result<Router, BindError> {
80+
let ep = Endpoint::builder()
81+
.secret_key(secret_key)
82+
.relay_mode(RelayMode::Custom(relay_map))
83+
.insecure_skip_relay_cert_verify(true)
84+
.bind()
85+
.await?;
86+
let router = Router::builder(ep)
87+
.accept(TEST_ALPN, TestProtocol(tx))
88+
.spawn();
89+
Ok(router)
90+
}
91+
92+
/// Binds an endpoint, connects to `server_addr`, sends a message, and then do nothing until aborted externally.
93+
async fn connect_once(
94+
secret_key: SecretKey,
95+
relay_map: RelayMap,
96+
server_addr: NodeAddr,
97+
msg: String,
98+
) -> Result {
99+
let endpoint = spawn_client(secret_key, relay_map).await?;
100+
info!(node_id = %endpoint.node_id().fmt_short(), "client node spawned");
101+
let conn = endpoint.connect(server_addr, TEST_ALPN).await?;
102+
let mut stream = conn.open_uni().await.e()?;
103+
stream.write_all(msg.as_bytes()).await.e()?;
104+
stream.finish().e()?;
105+
std::future::pending::<()>().await;
106+
Ok(())
107+
}
108+
109+
let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
110+
let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
111+
112+
let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
113+
let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
114+
let recv_task = tokio::task::spawn({
115+
let relay_map = relay_map.clone();
116+
let secret_key = SecretKey::generate(&mut rng);
117+
async move {
118+
let router = spawn_server(secret_key, relay_map, msgs_recv_tx).await?;
119+
let addr = router.endpoint().node_addr().initialized().await?;
120+
info!(node_id = %addr.node_id.fmt_short(), "server node spawned");
121+
addr_tx.send(addr).unwrap();
122+
std::future::pending::<()>().await;
123+
Result::<_, n0_snafu::Error>::Ok(())
124+
}
125+
});
126+
127+
let server_addr = addr_rx.await.e()?;
128+
129+
// spawn a node, send a message, and then abruptly terminate the node ungracefully
130+
// after the message was received on our receiver node.
131+
let cancel = CancellationToken::new();
132+
let client_secret_key = SecretKey::generate(&mut rng);
133+
info!("spawn client node");
134+
let join_handle_1 = run_in_thread(
135+
cancel.clone(),
136+
connect_once(
137+
client_secret_key.clone(),
138+
relay_map.clone(),
139+
server_addr.clone(),
140+
"msg1".to_string(),
141+
),
142+
);
143+
// assert that we received the message on the receiver node.
144+
let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap();
145+
assert_eq!(msg.0, client_secret_key.public());
146+
assert_eq!(&msg.1, "msg1");
147+
info!("kill client node");
148+
cancel.cancel();
149+
150+
// spawns the node again with the same node id, and send another message
151+
let cancel = CancellationToken::new();
152+
info!("respawn client node");
153+
let join_handle_2 = run_in_thread(
154+
cancel.clone(),
155+
connect_once(
156+
client_secret_key.clone(),
157+
relay_map.clone(),
158+
server_addr.clone(),
159+
"msg2".to_string(),
160+
),
161+
);
162+
// assert that we received the message on the server node.
163+
// this means that the reconnect with the same node id worked.
164+
let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap();
165+
assert_eq!(msg.0, client_secret_key.public());
166+
assert_eq!(&msg.1, "msg2");
167+
info!("kill client node");
168+
cancel.cancel();
169+
170+
info!("kill recv node");
171+
recv_task.abort();
172+
assert!(join_handle_1.join().unwrap().is_none());
173+
assert!(join_handle_2.join().unwrap().is_none());
174+
175+
Ok(())
176+
}

0 commit comments

Comments
 (0)