Skip to content

Commit 242955a

Browse files
Merge pull request #17 from hubertshelley/Websocket-Simplification
Websocket simplification
2 parents 67946c6 + 1d128d4 commit 242955a

File tree

12 files changed

+595
-147
lines changed

12 files changed

+595
-147
lines changed

.DS_Store

0 Bytes
Binary file not shown.

.github/workflows/build.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ jobs:
2727
- uses: Swatinem/rust-cache@v1
2828
- name: Check code format
2929
run: cargo fmt -- --check
30-
- name: Check the package for errors
31-
run: cargo check --all
30+
# - name: Check the package for errors
31+
# run: cargo check --all
3232
- name: Lint rust sources
3333
run: cargo clippy --all-targets --all-features --tests --benches -- -D warnings
3434
- name: Execute rust tests

.pre-commit-config.yaml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ repos:
3535
language: rust
3636
files: \.rs$
3737
args: [ ]
38-
- id: cargo-check
39-
name: cargo check
40-
description: Check the package for errors.
41-
entry: bash -c 'cargo check --all'
42-
language: rust
43-
files: \.rs$
44-
pass_filenames: false
38+
# - id: cargo-check
39+
# name: cargo check
40+
# description: Check the package for errors.
41+
# entry: bash -c 'cargo check --all'
42+
# language: rust
43+
# files: \.rs$
44+
# pass_filenames: false
4545
- id: cargo-clippy
4646
name: cargo clippy
4747
description: Lint rust sources

examples/websocket-chat/src/main.rs

Lines changed: 46 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -5,90 +5,81 @@
55

66
use std::collections::HashMap;
77
use std::sync::atomic::{AtomicUsize, Ordering};
8+
use std::sync::Arc;
89

9-
use futures_util::{FutureExt, StreamExt};
1010
use once_cell::sync::Lazy;
1111
use tokio::sync::{mpsc, RwLock};
12-
use tokio_stream::wrappers::UnboundedReceiverStream;
1312

1413
use silent::prelude::*;
1514

16-
type Users = RwLock<HashMap<usize, mpsc::UnboundedSender<Result<Message>>>>;
15+
type Users = RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>;
1716

1817
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
1918
static ONLINE_USERS: Lazy<Users> = Lazy::new(Users::default);
2019

2120
fn main() {
2221
logger::fmt().init();
23-
let route = Route::new("")
24-
.get(index)
25-
.append(Route::new("chat").ws(None, handle_socket));
22+
let route = Route::new("").get(index).append(
23+
Route::new("chat").ws(
24+
None,
25+
WebSocketHandler::new()
26+
.on_connect(on_connect)
27+
.on_send(on_send)
28+
.on_receive(on_receive)
29+
.on_close(on_close),
30+
),
31+
);
2632
Server::new().bind_route(route).run();
2733
}
2834

29-
async fn handle_socket(ws: WebSocket) {
30-
// Use a counter to assign a new unique ID for this user.
35+
async fn on_connect(
36+
parts: Arc<RwLock<WebSocketParts>>,
37+
sender: mpsc::UnboundedSender<Message>,
38+
) -> Result<()> {
39+
let mut parts = parts.write().await;
40+
info!("{:?}", parts);
3141
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
32-
3342
info!("new chat user: {}", my_id);
43+
parts.extensions_mut().insert(my_id);
44+
sender
45+
.send(Message::text(format!("Hello User#{my_id}")))
46+
.unwrap();
47+
ONLINE_USERS.write().await.insert(my_id, sender);
48+
Ok(())
49+
}
3450

35-
// Split the socket into a sender and receive of messages.
36-
let (user_ws_tx, mut user_ws_rx) = ws.split();
37-
38-
// Use an unbounded channel to handle buffering and flushing of messages
39-
// to the websocket...
40-
let (tx, rx) = mpsc::unbounded_channel();
41-
let rx = UnboundedReceiverStream::new(rx);
42-
let fut = rx.forward(user_ws_tx).map(|result| {
43-
if let Err(e) = result {
44-
error!(error = ?e, "websocket send error");
45-
}
46-
});
47-
tokio::task::spawn(fut);
48-
let fut = async move {
49-
ONLINE_USERS.write().await.insert(my_id, tx);
50-
51-
while let Some(result) = user_ws_rx.next().await {
52-
let msg = match result {
53-
Ok(msg) => msg,
54-
Err(e) => {
55-
eprintln!("websocket error(uid={my_id}): {e}");
56-
break;
57-
}
58-
};
59-
user_message(my_id, msg).await;
60-
}
61-
62-
user_disconnected(my_id).await;
63-
};
64-
tokio::task::spawn(fut);
51+
async fn on_send(message: Message, _parts: Arc<RwLock<WebSocketParts>>) -> Result<Message> {
52+
info!("on_send: {:?}", message);
53+
Ok(message)
6554
}
6655

67-
async fn user_message(my_id: usize, msg: Message) {
68-
let msg = if let Ok(s) = msg.to_str() {
56+
async fn on_receive(message: Message, parts: Arc<RwLock<WebSocketParts>>) -> Result<()> {
57+
let parts = parts.read().await;
58+
let my_id = parts.extensions().get::<usize>().unwrap();
59+
info!("on_receive: {:?}", message);
60+
let msg = if let Ok(s) = message.to_str() {
6961
s
7062
} else {
71-
return;
63+
return Err(SilentError::BusinessError {
64+
code: StatusCode::BAD_REQUEST,
65+
msg: "invalid message".to_string(),
66+
});
7267
};
73-
74-
let new_msg = format!("<User#{my_id}>: {msg}");
75-
76-
// New message from this user, send it to everyone else (except same uid)...
77-
for (&uid, tx) in ONLINE_USERS.read().await.iter() {
68+
let message = Message::text(format!("<User#{my_id}>: {msg}"));
69+
for (uid, tx) in ONLINE_USERS.read().await.iter() {
7870
if my_id != uid {
79-
if let Err(_disconnected) = tx.send(Ok(Message::text(new_msg.clone()))) {
80-
// The tx is disconnected, our `user_disconnected` code
81-
// should be happening in another task, nothing more to
82-
// do here.
83-
}
71+
if let Err(_disconnected) = tx.send(message.clone()) {}
8472
}
8573
}
74+
Ok(())
8675
}
8776

88-
async fn user_disconnected(my_id: usize) {
89-
eprintln!("good bye user: {my_id}");
77+
async fn on_close(parts: Arc<RwLock<WebSocketParts>>) {
78+
let parts = parts.read().await;
79+
let my_id = parts.extensions().get::<usize>().unwrap();
80+
info!("good bye user: {my_id}");
9081
// Stream closed up, so remove from the user list
91-
ONLINE_USERS.write().await.remove(&my_id);
82+
ONLINE_USERS.write().await.remove(my_id);
9283
}
9384

9485
async fn index<'a>(_res: Request) -> Result<&'a str> {

examples/websocket/src/main.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,16 @@ use silent::prelude::*;
22

33
fn main() {
44
logger::fmt().init();
5-
let route = Route::new("")
6-
.get(show_form)
7-
.append(Route::new("ws").ws(None, |_| async {}));
5+
let route = Route::new("").get(show_form).append(
6+
Route::new("ws").ws(
7+
None,
8+
WebSocketHandler::new()
9+
.on_connect(|_, _| async { Ok(()) })
10+
.on_send(|msg, _| async { Ok(msg) })
11+
.on_receive(|_, _| async { Ok(()) })
12+
.on_close(|_| async {}),
13+
),
14+
);
815
Server::new().bind_route(route).run();
916
}
1017

silent/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,6 @@ pub mod prelude {
4444
FnOnClose, FnOnConnect, FnOnNoneResultFut, FnOnReceive, FnOnSend, FnOnSendFut,
4545
};
4646
#[cfg(feature = "ws")]
47-
pub use crate::ws::{HandlerWrapperWebSocket, Message, WebSocket};
47+
pub use crate::ws::{Message, WebSocket, WebSocketHandler, WebSocketParts};
4848
pub use hyper::{header, upgrade, Method, StatusCode};
4949
}

silent/src/route/handler_append.rs

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
use super::Route;
2+
use crate::ws::WebSocketHandler;
23
#[cfg(feature = "ws")]
3-
use crate::ws::{HandlerWrapperWebSocket, WebSocket};
4+
use crate::ws::{HandlerWrapperWebSocket, Message, WebSocketParts};
45
use crate::{Handler, HandlerWrapper, Method, Request, Result};
56
use serde::Serialize;
67
use std::collections::HashMap;
78
use std::future::Future;
89
use std::sync::Arc;
910
#[cfg(feature = "ws")]
11+
use tokio::sync::mpsc::UnboundedSender;
12+
#[cfg(feature = "ws")]
13+
use tokio::sync::RwLock;
14+
#[cfg(feature = "ws")]
1015
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
1116

1217
pub trait HandlerGetter {
@@ -33,13 +38,55 @@ where
3338
}
3439

3540
#[cfg(feature = "ws")]
36-
pub trait WSHandlerAppend<F, Fut>: HandlerGetter
37-
where
38-
Fut: Future<Output = ()> + Send + Sync + 'static,
39-
F: Fn(WebSocket) -> Fut + Send + Sync + 'static,
41+
pub trait WSHandlerAppend<
42+
FnOnConnect,
43+
FnOnConnectFut,
44+
FnOnSend,
45+
FnOnSendFut,
46+
FnOnReceive,
47+
FnOnReceiveFut,
48+
FnOnClose,
49+
FnOnCloseFut,
50+
>: HandlerGetter where
51+
FnOnConnect: Fn(Arc<RwLock<WebSocketParts>>, UnboundedSender<Message>) -> FnOnConnectFut
52+
+ Send
53+
+ Sync
54+
+ 'static,
55+
FnOnConnectFut: Future<Output = Result<()>> + Send + Sync + 'static,
56+
FnOnSend: Fn(Message, Arc<RwLock<WebSocketParts>>) -> FnOnSendFut + Send + Sync + 'static,
57+
FnOnSendFut: Future<Output = Result<Message>> + Send + Sync + 'static,
58+
FnOnReceive: Fn(Message, Arc<RwLock<WebSocketParts>>) -> FnOnReceiveFut + Send + Sync + 'static,
59+
FnOnReceiveFut: Future<Output = Result<()>> + Send + Sync + 'static,
60+
FnOnClose: Fn(Arc<RwLock<WebSocketParts>>) -> FnOnCloseFut + Send + Sync + 'static,
61+
FnOnCloseFut: Future<Output = ()> + Send + Sync + 'static,
4062
{
41-
fn ws(self, config: Option<WebSocketConfig>, handler: F) -> Self;
42-
fn ws_handler_append(&mut self, handler: HandlerWrapperWebSocket<F>) {
63+
fn ws(
64+
self,
65+
config: Option<WebSocketConfig>,
66+
handler: WebSocketHandler<
67+
FnOnConnect,
68+
FnOnConnectFut,
69+
FnOnSend,
70+
FnOnSendFut,
71+
FnOnReceive,
72+
FnOnReceiveFut,
73+
FnOnClose,
74+
FnOnCloseFut,
75+
>,
76+
) -> Self;
77+
fn ws_handler_append(
78+
&mut self,
79+
handler: HandlerWrapperWebSocket<
80+
FnOnConnect,
81+
FnOnConnectFut,
82+
FnOnSend,
83+
FnOnSendFut,
84+
FnOnReceive,
85+
FnOnReceiveFut,
86+
FnOnClose,
87+
FnOnCloseFut,
88+
>,
89+
) {
4390
let handler = Arc::new(handler);
4491
self.get_handler_mut().insert(Method::GET, handler);
4592
}
@@ -105,12 +152,53 @@ where
105152
}
106153

107154
#[cfg(feature = "ws")]
108-
impl<F, Fut> WSHandlerAppend<F, Fut> for Route
155+
impl<
156+
FnOnConnect,
157+
FnOnConnectFut,
158+
FnOnSend,
159+
FnOnSendFut,
160+
FnOnReceive,
161+
FnOnReceiveFut,
162+
FnOnClose,
163+
FnOnCloseFut,
164+
>
165+
WSHandlerAppend<
166+
FnOnConnect,
167+
FnOnConnectFut,
168+
FnOnSend,
169+
FnOnSendFut,
170+
FnOnReceive,
171+
FnOnReceiveFut,
172+
FnOnClose,
173+
FnOnCloseFut,
174+
> for Route
109175
where
110-
Fut: Future<Output = ()> + Send + Sync + 'static,
111-
F: Fn(WebSocket) -> Fut + Send + Sync + 'static,
176+
FnOnConnect: Fn(Arc<RwLock<WebSocketParts>>, UnboundedSender<Message>) -> FnOnConnectFut
177+
+ Send
178+
+ Sync
179+
+ 'static,
180+
FnOnConnectFut: Future<Output = Result<()>> + Send + Sync + 'static,
181+
FnOnSend: Fn(Message, Arc<RwLock<WebSocketParts>>) -> FnOnSendFut + Send + Sync + 'static,
182+
FnOnSendFut: Future<Output = Result<Message>> + Send + Sync + 'static,
183+
FnOnReceive: Fn(Message, Arc<RwLock<WebSocketParts>>) -> FnOnReceiveFut + Send + Sync + 'static,
184+
FnOnReceiveFut: Future<Output = Result<()>> + Send + Sync + 'static,
185+
FnOnClose: Fn(Arc<RwLock<WebSocketParts>>) -> FnOnCloseFut + Send + Sync + 'static,
186+
FnOnCloseFut: Future<Output = ()> + Send + Sync + 'static,
112187
{
113-
fn ws(mut self, config: Option<WebSocketConfig>, handler: F) -> Self {
188+
fn ws(
189+
mut self,
190+
config: Option<WebSocketConfig>,
191+
handler: WebSocketHandler<
192+
FnOnConnect,
193+
FnOnConnectFut,
194+
FnOnSend,
195+
FnOnSendFut,
196+
FnOnReceive,
197+
FnOnReceiveFut,
198+
FnOnClose,
199+
FnOnCloseFut,
200+
>,
201+
) -> Self {
114202
let handler = HandlerWrapperWebSocket::new(config).set_handler(handler);
115203
self.ws_handler_append(handler);
116204
self

0 commit comments

Comments
 (0)