Skip to content

Commit 1d128d4

Browse files
committed
websocket处理简化完成
1 parent 067fd1f commit 1d128d4

File tree

9 files changed

+488
-157
lines changed

9 files changed

+488
-157
lines changed

examples/websocket-chat/src/main.rs

Lines changed: 14 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::collections::HashMap;
77
use std::sync::atomic::{AtomicUsize, Ordering};
88
use std::sync::Arc;
99

10-
use futures_util::{SinkExt, StreamExt};
1110
use once_cell::sync::Lazy;
1211
use tokio::sync::{mpsc, RwLock};
1312

@@ -20,9 +19,16 @@ static ONLINE_USERS: Lazy<Users> = Lazy::new(Users::default);
2019

2120
fn main() {
2221
logger::fmt().init();
23-
let route = Route::new("")
24-
.get(index)
25-
.append(Route::new("chat").ws(None, handle_socket));
22+
let route = Route::new("").get(index).append(
23+
Route::new("chat").ws(
24+
None,
25+
WebSocketHandler::new()
26+
.on_connect(on_connect)
27+
.on_send(on_send)
28+
.on_receive(on_receive)
29+
.on_close(on_close),
30+
),
31+
);
2632
Server::new().bind_route(route).run();
2733
}
2834

@@ -31,7 +37,7 @@ async fn on_connect(
3137
sender: mpsc::UnboundedSender<Message>,
3238
) -> Result<()> {
3339
let mut parts = parts.write().await;
34-
println!("{:?}", parts);
40+
info!("{:?}", parts);
3541
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
3642
info!("new chat user: {}", my_id);
3743
parts.extensions_mut().insert(my_id);
@@ -43,14 +49,14 @@ async fn on_connect(
4349
}
4450

4551
async fn on_send(message: Message, _parts: Arc<RwLock<WebSocketParts>>) -> Result<Message> {
46-
println!("on_send: {:?}", message);
52+
info!("on_send: {:?}", message);
4753
Ok(message)
4854
}
4955

5056
async fn on_receive(message: Message, parts: Arc<RwLock<WebSocketParts>>) -> Result<()> {
5157
let parts = parts.read().await;
5258
let my_id = parts.extensions().get::<usize>().unwrap();
53-
println!("on_receive: {:?}", message);
59+
info!("on_receive: {:?}", message);
5460
let msg = if let Ok(s) = message.to_str() {
5561
s
5662
} else {
@@ -71,48 +77,11 @@ async fn on_receive(message: Message, parts: Arc<RwLock<WebSocketParts>>) -> Res
7177
async fn on_close(parts: Arc<RwLock<WebSocketParts>>) {
7278
let parts = parts.read().await;
7379
let my_id = parts.extensions().get::<usize>().unwrap();
74-
eprintln!("good bye user: {my_id}");
80+
info!("good bye user: {my_id}");
7581
// Stream closed up, so remove from the user list
7682
ONLINE_USERS.write().await.remove(my_id);
7783
}
7884

79-
async fn handle_socket(ws: WebSocket) {
80-
let (parts, ws) = ws.into_parts();
81-
let (mut user_ws_tx, mut user_ws_rx) = ws.split();
82-
// Use a counter to assign a new unique ID for this user.
83-
84-
let (tx, mut rx) = mpsc::unbounded_channel();
85-
on_connect(parts.clone(), tx.clone()).await.unwrap();
86-
let sender_parts = parts.clone();
87-
let receiver_parts = parts;
88-
89-
let fut = async move {
90-
while let Some(message) = rx.recv().await {
91-
let message = on_send(message.clone(), sender_parts.clone())
92-
.await
93-
.unwrap();
94-
println!("before: {:?}", message);
95-
user_ws_tx.send(message).await.unwrap();
96-
}
97-
};
98-
tokio::task::spawn(fut);
99-
let fut = async move {
100-
while let Some(message) = user_ws_rx.next().await {
101-
if let Ok(message) = message {
102-
if message.is_close() {
103-
break;
104-
}
105-
if on_receive(message, receiver_parts.clone()).await.is_err() {
106-
break;
107-
}
108-
}
109-
}
110-
111-
on_close(receiver_parts).await;
112-
};
113-
tokio::task::spawn(fut);
114-
}
115-
11685
async fn index<'a>(_res: Request) -> Result<&'a str> {
11786
Ok(INDEX_HTML)
11887
}

examples/websocket/src/main.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,16 @@ use silent::prelude::*;
22

33
fn main() {
44
logger::fmt().init();
5-
let route = Route::new("")
6-
.get(show_form)
7-
.append(Route::new("ws").ws(None, |_| async {}));
5+
let route = Route::new("").get(show_form).append(
6+
Route::new("ws").ws(
7+
None,
8+
WebSocketHandler::new()
9+
.on_connect(|_, _| async { Ok(()) })
10+
.on_send(|msg, _| async { Ok(msg) })
11+
.on_receive(|_, _| async { Ok(()) })
12+
.on_close(|_| async {}),
13+
),
14+
);
815
Server::new().bind_route(route).run();
916
}
1017

silent/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,6 @@ pub mod prelude {
4444
FnOnClose, FnOnConnect, FnOnNoneResultFut, FnOnReceive, FnOnSend, FnOnSendFut,
4545
};
4646
#[cfg(feature = "ws")]
47-
pub use crate::ws::{HandlerWrapperWebSocket, Message, WebSocket, WebSocketParts};
47+
pub use crate::ws::{Message, WebSocket, WebSocketHandler, WebSocketParts};
4848
pub use hyper::{header, upgrade, Method, StatusCode};
4949
}

silent/src/route/handler_append.rs

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
use super::Route;
2+
use crate::ws::WebSocketHandler;
23
#[cfg(feature = "ws")]
3-
use crate::ws::{HandlerWrapperWebSocket, WebSocket};
4+
use crate::ws::{HandlerWrapperWebSocket, Message, WebSocketParts};
45
use crate::{Handler, HandlerWrapper, Method, Request, Result};
56
use serde::Serialize;
67
use std::collections::HashMap;
78
use std::future::Future;
89
use std::sync::Arc;
910
#[cfg(feature = "ws")]
11+
use tokio::sync::mpsc::UnboundedSender;
12+
#[cfg(feature = "ws")]
13+
use tokio::sync::RwLock;
14+
#[cfg(feature = "ws")]
1015
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
1116

1217
pub trait HandlerGetter {
@@ -33,13 +38,55 @@ where
3338
}
3439

3540
#[cfg(feature = "ws")]
36-
pub trait WSHandlerAppend<F, Fut>: HandlerGetter
37-
where
38-
Fut: Future<Output = ()> + Send + Sync + 'static,
39-
F: Fn(WebSocket) -> Fut + Send + Sync + 'static,
41+
pub trait WSHandlerAppend<
42+
FnOnConnect,
43+
FnOnConnectFut,
44+
FnOnSend,
45+
FnOnSendFut,
46+
FnOnReceive,
47+
FnOnReceiveFut,
48+
FnOnClose,
49+
FnOnCloseFut,
50+
>: HandlerGetter where
51+
FnOnConnect: Fn(Arc<RwLock<WebSocketParts>>, UnboundedSender<Message>) -> FnOnConnectFut
52+
+ Send
53+
+ Sync
54+
+ 'static,
55+
FnOnConnectFut: Future<Output = Result<()>> + Send + Sync + 'static,
56+
FnOnSend: Fn(Message, Arc<RwLock<WebSocketParts>>) -> FnOnSendFut + Send + Sync + 'static,
57+
FnOnSendFut: Future<Output = Result<Message>> + Send + Sync + 'static,
58+
FnOnReceive: Fn(Message, Arc<RwLock<WebSocketParts>>) -> FnOnReceiveFut + Send + Sync + 'static,
59+
FnOnReceiveFut: Future<Output = Result<()>> + Send + Sync + 'static,
60+
FnOnClose: Fn(Arc<RwLock<WebSocketParts>>) -> FnOnCloseFut + Send + Sync + 'static,
61+
FnOnCloseFut: Future<Output = ()> + Send + Sync + 'static,
4062
{
41-
fn ws(self, config: Option<WebSocketConfig>, handler: F) -> Self;
42-
fn ws_handler_append(&mut self, handler: HandlerWrapperWebSocket<F>) {
63+
fn ws(
64+
self,
65+
config: Option<WebSocketConfig>,
66+
handler: WebSocketHandler<
67+
FnOnConnect,
68+
FnOnConnectFut,
69+
FnOnSend,
70+
FnOnSendFut,
71+
FnOnReceive,
72+
FnOnReceiveFut,
73+
FnOnClose,
74+
FnOnCloseFut,
75+
>,
76+
) -> Self;
77+
fn ws_handler_append(
78+
&mut self,
79+
handler: HandlerWrapperWebSocket<
80+
FnOnConnect,
81+
FnOnConnectFut,
82+
FnOnSend,
83+
FnOnSendFut,
84+
FnOnReceive,
85+
FnOnReceiveFut,
86+
FnOnClose,
87+
FnOnCloseFut,
88+
>,
89+
) {
4390
let handler = Arc::new(handler);
4491
self.get_handler_mut().insert(Method::GET, handler);
4592
}
@@ -105,12 +152,53 @@ where
105152
}
106153

107154
#[cfg(feature = "ws")]
108-
impl<F, Fut> WSHandlerAppend<F, Fut> for Route
155+
impl<
156+
FnOnConnect,
157+
FnOnConnectFut,
158+
FnOnSend,
159+
FnOnSendFut,
160+
FnOnReceive,
161+
FnOnReceiveFut,
162+
FnOnClose,
163+
FnOnCloseFut,
164+
>
165+
WSHandlerAppend<
166+
FnOnConnect,
167+
FnOnConnectFut,
168+
FnOnSend,
169+
FnOnSendFut,
170+
FnOnReceive,
171+
FnOnReceiveFut,
172+
FnOnClose,
173+
FnOnCloseFut,
174+
> for Route
109175
where
110-
Fut: Future<Output = ()> + Send + Sync + 'static,
111-
F: Fn(WebSocket) -> Fut + Send + Sync + 'static,
176+
FnOnConnect: Fn(Arc<RwLock<WebSocketParts>>, UnboundedSender<Message>) -> FnOnConnectFut
177+
+ Send
178+
+ Sync
179+
+ 'static,
180+
FnOnConnectFut: Future<Output = Result<()>> + Send + Sync + 'static,
181+
FnOnSend: Fn(Message, Arc<RwLock<WebSocketParts>>) -> FnOnSendFut + Send + Sync + 'static,
182+
FnOnSendFut: Future<Output = Result<Message>> + Send + Sync + 'static,
183+
FnOnReceive: Fn(Message, Arc<RwLock<WebSocketParts>>) -> FnOnReceiveFut + Send + Sync + 'static,
184+
FnOnReceiveFut: Future<Output = Result<()>> + Send + Sync + 'static,
185+
FnOnClose: Fn(Arc<RwLock<WebSocketParts>>) -> FnOnCloseFut + Send + Sync + 'static,
186+
FnOnCloseFut: Future<Output = ()> + Send + Sync + 'static,
112187
{
113-
fn ws(mut self, config: Option<WebSocketConfig>, handler: F) -> Self {
188+
fn ws(
189+
mut self,
190+
config: Option<WebSocketConfig>,
191+
handler: WebSocketHandler<
192+
FnOnConnect,
193+
FnOnConnectFut,
194+
FnOnSend,
195+
FnOnSendFut,
196+
FnOnReceive,
197+
FnOnReceiveFut,
198+
FnOnClose,
199+
FnOnCloseFut,
200+
>,
201+
) -> Self {
114202
let handler = HandlerWrapperWebSocket::new(config).set_handler(handler);
115203
self.ws_handler_append(handler);
116204
self

0 commit comments

Comments
 (0)