Skip to content

Commit 0a46042

Browse files
committed
async server: cancel handler when client disconnected
Release locks/channels in the long running handlers. Signed-off-by: Tim Zhang <tim@hyper.sh>
1 parent eef2004 commit 0a46042

File tree

1 file changed

+83
-57
lines changed

1 file changed

+83
-57
lines changed

src/asynchronous/server.rs

Lines changed: 83 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use tokio::{
2626
self,
2727
io::{split, AsyncRead, AsyncWrite, AsyncWriteExt},
2828
net::UnixListener,
29+
select, spawn,
2930
sync::mpsc::{channel, Receiver, Sender},
3031
sync::watch,
3132
};
@@ -155,13 +156,12 @@ impl Server {
155156
let (stop_listen_tx, mut stop_listen_rx) = channel(1);
156157
self.stop_listen_tx = Some(stop_listen_tx);
157158

158-
tokio::spawn(async move {
159+
spawn(async move {
159160
loop {
160-
tokio::select! {
161+
select! {
161162
conn = incoming.next() => {
162163
if let Some(conn) = conn {
163164
// Accept a new connection
164-
let methods = methods.clone();
165165
match conn {
166166
Ok(stream) => {
167167
let fd = stream.as_raw_fd();
@@ -170,59 +170,14 @@ impl Server {
170170
continue;
171171
}
172172

173-
let mut close_conn_rx = close_conn_rx.clone();
174-
175-
let (req_done_tx, mut all_req_done_rx) = channel::<i32>(1);
176-
let conn_done_tx2 = conn_done_tx.clone();
177-
178-
// The connection handler
179-
tokio::spawn(async move {
180-
let (mut reader, mut writer) = split(stream);
181-
let (tx, mut rx): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = channel(100);
182-
183-
tokio::spawn(async move {
184-
while let Some(buf) = rx.recv().await {
185-
if let Err(e) = writer.write_all(&buf).await {
186-
error!("write_message got error: {:?}", e);
187-
}
188-
}
189-
});
190-
191-
loop {
192-
let tx = tx.clone();
193-
let methods = methods.clone();
194-
let req_done_tx2 = req_done_tx.clone();
195-
196-
tokio::select! {
197-
resp = receive(&mut reader) => {
198-
match resp {
199-
Ok(message) => {
200-
tokio::spawn(async move {
201-
handle_request(tx, listenfd, methods, message).await;
202-
drop(req_done_tx2);
203-
});
204-
}
205-
Err(e) => {
206-
trace!("error {:?}", e);
207-
break;
208-
}
209-
}
210-
}
211-
v = close_conn_rx.changed() => {
212-
// 0 is the init value of this watch, not a valid signal
213-
// is_err means the tx was dropped.
214-
if v.is_err() || *close_conn_rx.borrow() != 0 {
215-
info!("Stop accepting new connections.");
216-
break;
217-
}
218-
}
219-
}
220-
}
221-
222-
drop(req_done_tx);
223-
all_req_done_rx.recv().await;
224-
drop(conn_done_tx2);
225-
});
173+
// spawn a connection handler, would not block
174+
spawn_connection_handler(
175+
listenfd,
176+
stream,
177+
methods.clone(),
178+
close_conn_rx.clone(),
179+
conn_done_tx.clone()
180+
).await;
226181
}
227182
Err(e) => {
228183
error!("{:?}", e)
@@ -281,6 +236,73 @@ impl Server {
281236
}
282237
}
283238

239+
async fn spawn_connection_handler<S>(
240+
listenfd: RawFd,
241+
stream: S,
242+
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
243+
mut close_conn_rx: watch::Receiver<i32>,
244+
conn_done_tx: Sender<i32>,
245+
) where
246+
S: AsyncRead + AsyncWrite + AsRawFd + Send + 'static,
247+
{
248+
let (req_done_tx, mut all_req_done_rx) = channel::<i32>(1);
249+
250+
spawn(async move {
251+
let (mut reader, mut writer) = split(stream);
252+
let (tx, mut rx): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = channel(100);
253+
let (client_disconnected_tx, client_disconnected_rx) = watch::channel(false);
254+
255+
spawn(async move {
256+
while let Some(buf) = rx.recv().await {
257+
if let Err(e) = writer.write_all(&buf).await {
258+
error!("write_message got error: {:?}", e);
259+
}
260+
}
261+
});
262+
263+
loop {
264+
let tx = tx.clone();
265+
let methods = methods.clone();
266+
let req_done_tx2 = req_done_tx.clone();
267+
let mut client_disconnected_rx2 = client_disconnected_rx.clone();
268+
269+
select! {
270+
resp = receive(&mut reader) => {
271+
match resp {
272+
Ok(message) => {
273+
spawn(async move {
274+
select! {
275+
_ = handle_request(tx, listenfd, methods, message) => {}
276+
_ = client_disconnected_rx2.changed() => {}
277+
}
278+
279+
drop(req_done_tx2);
280+
});
281+
}
282+
Err(e) => {
283+
let _ = client_disconnected_tx.send(true);
284+
trace!("error {:?}", e);
285+
break;
286+
}
287+
}
288+
}
289+
v = close_conn_rx.changed() => {
290+
// 0 is the init value of this watch, not a valid signal
291+
// is_err means the tx was dropped.
292+
if v.is_err() || *close_conn_rx.borrow() != 0 {
293+
info!("Stop accepting new connections.");
294+
break;
295+
}
296+
}
297+
}
298+
}
299+
300+
drop(req_done_tx);
301+
all_req_done_rx.recv().await;
302+
drop(conn_done_tx);
303+
});
304+
}
305+
284306
async fn handle_request(
285307
tx: Sender<Vec<u8>>,
286308
fd: RawFd,
@@ -311,7 +333,11 @@ async fn handle_request(
311333
let path = format!("/{}/{}", req.service, req.method);
312334
if let Some(x) = methods.get(&path) {
313335
let method = x;
314-
let ctx = TtrpcContext { fd, mh: header, metadata: context::from_pb(&req.metadata) };
336+
let ctx = TtrpcContext {
337+
fd,
338+
mh: header,
339+
metadata: context::from_pb(&req.metadata),
340+
};
315341

316342
match method.handler(ctx, req).await {
317343
Ok((stream_id, body)) => {

0 commit comments

Comments
 (0)