Skip to content

Commit 8bc11e8

Browse files
Merge pull request #21 from hubertshelley/20-server-sent-events-support
20 server sent events support
2 parents 12bffee + db63705 commit 8bc11e8

File tree

11 files changed

+518
-4
lines changed

11 files changed

+518
-4
lines changed

examples/custom_response/src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use silent::header::CONTENT_TYPE;
12
use silent::prelude::*;
23

34
fn main() {
@@ -15,7 +16,9 @@ fn main() {
1516
}
1617

1718
async fn custom_response(_req: Request) -> Result<Response> {
18-
let res = Response::empty();
19+
let mut res = Response::empty();
20+
res.headers_mut()
21+
.insert(CONTENT_TYPE, HeaderValue::from_static("text"));
1922
let html = r#"
2023
<!DOCTYPE html>
2124
<html lang="en">
@@ -27,7 +30,7 @@ async fn custom_response(_req: Request) -> Result<Response> {
2730
<h1>custom response</h1>
2831
</body>
2932
</html>"#;
30-
let res = res.set_body(full(html));
33+
res.set_body(full(html));
3134
Ok(res)
3235
}
3336

examples/sse-chat/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "example-sse-chat"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
7+
[dependencies]
8+
futures-util = "0.3"
9+
silent = { path = "../../silent", features = ["sse"] }
10+
once_cell = "1"
11+
parking_lot = "0.12"
12+
tokio = { version = "1", features = ["macros"] }
13+
tokio-stream = { version = "0.1", features = ["net"] }

examples/sse-chat/src/main.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Copyright (c) 2018-2020 Sean McArthur
2+
// Licensed under the MIT license http://opensource.org/licenses/MIT
3+
//
4+
// port from https://github.com/seanmonstar/warp/blob/master/examples/sse_chat.rs
5+
6+
use std::collections::HashMap;
7+
use std::sync::atomic::{AtomicUsize, Ordering};
8+
9+
use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt};
10+
use once_cell::sync::Lazy;
11+
use parking_lot::Mutex;
12+
use tokio::sync::mpsc;
13+
use tokio_stream::wrappers::UnboundedReceiverStream;
14+
15+
use silent::prelude::*;
16+
17+
type Users = Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>;
18+
19+
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
20+
static ONLINE_USERS: Lazy<Users> = Lazy::new(Users::default);
21+
22+
fn main() {
23+
logger::fmt().with_max_level(Level::INFO).init();
24+
25+
let route = Route::new("").get(index).append(
26+
Route::new("chat")
27+
.handler(
28+
Method::GET,
29+
HandlerWrapperResponse::new(user_connected).arc(),
30+
)
31+
.append(
32+
Route::new("<id:int>")
33+
.handler(Method::POST, HandlerWrapperResponse::new(chat_send).arc()),
34+
),
35+
);
36+
37+
Server::new()
38+
.bind("0.0.0.0:8001".parse().unwrap())
39+
.bind_route(route)
40+
.run();
41+
}
42+
43+
#[derive(Debug)]
44+
enum Message {
45+
UserId(usize),
46+
Reply(String),
47+
}
48+
49+
async fn chat_send(req: Request) -> Result<Response> {
50+
let my_id = req.get_path_params::<i32>("id")?;
51+
let msg = "hello";
52+
user_message(my_id as usize, msg);
53+
Ok(Response::empty())
54+
}
55+
56+
async fn user_connected(_req: Request) -> Result<Response> {
57+
// Use a counter to assign a new unique ID for this user.
58+
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
59+
60+
info!("new chat user: {}", my_id);
61+
62+
// Use an unbounded channel to handle buffering and flushing of messages
63+
// to the event source...
64+
let (tx, rx) = mpsc::unbounded_channel();
65+
let rx = UnboundedReceiverStream::new(rx);
66+
67+
tx.send(Message::UserId(my_id))
68+
// rx is right above, so this cannot fail
69+
.unwrap();
70+
71+
// Save the sender in our list of connected users.
72+
ONLINE_USERS.lock().insert(my_id, tx);
73+
74+
// Convert messages into Server-Sent Events and returns resulting stream.
75+
let stream = rx.map(|msg| match msg {
76+
Message::UserId(my_id) => {
77+
warn!("user {} disconnected", my_id);
78+
Ok(SSEEvent::default().event("user").data(my_id.to_string()))
79+
}
80+
Message::Reply(reply) => {
81+
warn!("sse_reply: {}", reply);
82+
Ok(SSEEvent::default().data(reply))
83+
}
84+
});
85+
let res = sse_reply(stream);
86+
Ok(res)
87+
}
88+
89+
fn user_message(my_id: usize, msg: &str) {
90+
let new_msg = format!("<User#{my_id}>: {msg}");
91+
92+
// New message from this user, send it to everyone else (except same uid)...
93+
//
94+
// We use `retain` instead of a for loop so that we can reap any user that
95+
// appears to have disconnected.
96+
ONLINE_USERS.lock().retain(|uid, tx| {
97+
if my_id == *uid {
98+
// don't send to same user, but do retain
99+
true
100+
} else {
101+
// If not `is_ok`, the SSE stream is gone, and so don't retain
102+
tx.send(Message::Reply(new_msg.clone())).is_ok()
103+
}
104+
});
105+
}
106+
107+
async fn index<'a>(_req: Request) -> Result<&'a str> {
108+
Ok(INDEX_HTML)
109+
}
110+
111+
static INDEX_HTML: &str = r#"
112+
<!DOCTYPE html>
113+
<html>
114+
<head>
115+
<title>SSE Chat</title>
116+
</head>
117+
<body>
118+
<h1>SSE chat</h1>
119+
<div id="chat">
120+
<p><em>Connecting...</em></p>
121+
</div>
122+
<input type="text" id="text" />
123+
<button type="button" id="send">Send</button>
124+
<script type="text/javascript">
125+
var uri = 'http://' + location.host + '/chat';
126+
var sse = new EventSource(uri);
127+
function message(data) {
128+
var line = document.createElement('p');
129+
line.innerText = data;
130+
chat.appendChild(line);
131+
}
132+
sse.onopen = function() {
133+
chat.innerHTML = "<p><em>Connected!</em></p>";
134+
}
135+
var user_id;
136+
sse.addEventListener("user", function(msg) {
137+
user_id = msg.data;
138+
});
139+
sse.onmessage = function(msg) {
140+
message(msg.data);
141+
};
142+
send.onclick = function() {
143+
var msg = text.value;
144+
var xhr = new XMLHttpRequest();
145+
xhr.open("POST", uri + '/' + user_id, true);
146+
xhr.send(msg);
147+
text.value = '';
148+
message('<You>: ' + msg);
149+
};
150+
</script>
151+
</body>
152+
</html>
153+
"#;

silent/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ default = ["server", "test", "static"]
1919
full = ["server", "wasi", "static", "test"]
2020
server = ["tokio/fs", "tokio/net", "tokio/rt-multi-thread", "tokio/signal"]
2121
ws = []
22+
sse = []
2223
static = []
2324
wasi = ["tokio/sync"]
2425
test = ["tokio/macros", "tokio/rt"]
@@ -47,3 +48,4 @@ chrono = { version = "0.4.24", default-features = false, features = ["clock"] }
4748
tokio-tungstenite = "0.19.0"
4849
headers = "0.3.8"
4950
tokio-stream = { version = "0.1.14", features = ["net"] }
51+
pin-project = "1.0"

silent/src/core/response.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ impl Response {
5858
self.headers.insert(key, value);
5959
self
6060
}
61+
#[inline]
62+
/// 设置响应header
63+
pub fn headers(&self) -> &HeaderMap {
64+
&self.headers
65+
}
66+
#[inline]
67+
/// 设置响应header
68+
pub fn headers_mut(&mut self) -> &mut HeaderMap {
69+
&mut self.headers
70+
}
6171
/// 设置响应header
6272
pub fn set_typed_header<H>(&mut self, header: H)
6373
where

silent/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ mod route;
1212
mod rt;
1313
#[cfg(feature = "server")]
1414
mod service;
15+
#[cfg(feature = "sse")]
16+
mod sse;
1517
#[cfg(feature = "ws")]
1618
mod ws;
1719

@@ -42,6 +44,8 @@ pub mod prelude {
4244
pub use crate::route::Route;
4345
#[cfg(feature = "server")]
4446
pub use crate::service::Server;
47+
#[cfg(feature = "sse")]
48+
pub use crate::sse::{sse_reply, SSEEvent};
4549
#[cfg(feature = "ws")]
4650
pub use crate::ws::{
4751
FnOnClose, FnOnConnect, FnOnNoneResultFut, FnOnReceive, FnOnSend, FnOnSendFut,

silent/src/service/hyper_service.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ impl HyperHandler {
3434
async move {
3535
match routes.clone().handle(req, remote_addr).await {
3636
Ok(res) => {
37-
response.set_status(res.status_code);
38-
response.set_body(res.body);
37+
response = res;
3938
}
4039
Err((mes, code)) => {
4140
tracing::error!("Failed to handle request: {:?}", mes);

silent/src/sse/event.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use serde::Serialize;
2+
use serde_json::{self, Error};
3+
use std::fmt;
4+
use std::fmt::Write;
5+
use std::time::Duration;
6+
7+
// Server-sent event data type
8+
#[derive(Debug)]
9+
enum DataType {
10+
Text(String),
11+
Json(String),
12+
}
13+
14+
/// Server-sent event
15+
#[derive(Default, Debug)]
16+
pub struct SSEEvent {
17+
id: Option<String>,
18+
data: Option<DataType>,
19+
event: Option<String>,
20+
comment: Option<String>,
21+
retry: Option<Duration>,
22+
}
23+
24+
impl SSEEvent {
25+
/// Set Server-sent event data
26+
/// data field(s) ("data:<content>")
27+
pub fn data<T: Into<String>>(mut self, data: T) -> SSEEvent {
28+
self.data = Some(DataType::Text(data.into()));
29+
self
30+
}
31+
32+
/// Set Server-sent event data
33+
/// data field(s) ("data:<content>")
34+
pub fn json_data<T: Serialize>(mut self, data: T) -> Result<SSEEvent, Error> {
35+
self.data = Some(DataType::Json(serde_json::to_string(&data)?));
36+
Ok(self)
37+
}
38+
39+
/// Set Server-sent event comment
40+
/// Comment field (":<comment-text>")
41+
pub fn comment<T: Into<String>>(mut self, comment: T) -> SSEEvent {
42+
self.comment = Some(comment.into());
43+
self
44+
}
45+
46+
/// Set Server-sent event event
47+
/// SSEEvent name field ("event:<event-name>")
48+
pub fn event<T: Into<String>>(mut self, event: T) -> SSEEvent {
49+
self.event = Some(event.into());
50+
self
51+
}
52+
53+
/// Set Server-sent event retry
54+
/// Retry timeout field ("retry:<timeout>")
55+
pub fn retry(mut self, duration: Duration) -> SSEEvent {
56+
self.retry = Some(duration);
57+
self
58+
}
59+
60+
/// Set Server-sent event id
61+
/// Identifier field ("id:<identifier>")
62+
pub fn id<T: Into<String>>(mut self, id: T) -> SSEEvent {
63+
self.id = Some(id.into());
64+
self
65+
}
66+
}
67+
68+
impl fmt::Display for SSEEvent {
69+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70+
if let Some(ref comment) = &self.comment {
71+
":".fmt(f)?;
72+
comment.fmt(f)?;
73+
f.write_char('\n')?;
74+
}
75+
76+
if let Some(ref event) = &self.event {
77+
"event:".fmt(f)?;
78+
event.fmt(f)?;
79+
f.write_char('\n')?;
80+
}
81+
82+
match self.data {
83+
Some(DataType::Text(ref data)) => {
84+
for line in data.split('\n') {
85+
"data:".fmt(f)?;
86+
line.fmt(f)?;
87+
f.write_char('\n')?;
88+
}
89+
}
90+
Some(DataType::Json(ref data)) => {
91+
"data:".fmt(f)?;
92+
data.fmt(f)?;
93+
f.write_char('\n')?;
94+
}
95+
None => {}
96+
}
97+
98+
if let Some(ref id) = &self.id {
99+
"id:".fmt(f)?;
100+
id.fmt(f)?;
101+
f.write_char('\n')?;
102+
}
103+
104+
if let Some(ref duration) = &self.retry {
105+
"retry:".fmt(f)?;
106+
107+
let secs = duration.as_secs();
108+
let millis = duration.subsec_millis();
109+
110+
if secs > 0 {
111+
// format seconds
112+
secs.fmt(f)?;
113+
114+
// pad milliseconds
115+
if millis < 10 {
116+
f.write_str("00")?;
117+
} else if millis < 100 {
118+
f.write_char('0')?;
119+
}
120+
}
121+
122+
// format milliseconds
123+
millis.fmt(f)?;
124+
125+
f.write_char('\n')?;
126+
}
127+
128+
f.write_char('\n')?;
129+
Ok(())
130+
}
131+
}

0 commit comments

Comments
 (0)