Skip to content

Commit e20fc02

Browse files
committed
功能编写中
1 parent 12bffee commit e20fc02

File tree

7 files changed

+543
-0
lines changed

7 files changed

+543
-0
lines changed

examples/sse-chat/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "example-sse-chat"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
7+
[dependencies]
8+
futures-util = "0.3"
9+
silent = { path = "../../silent" }
10+
once_cell = "1"
11+
parking_lot = "0.12"
12+
tokio = { version = "1", features = ["macros"] }
13+
tokio-stream = { version = "0.1", features = ["net"] }

examples/sse-chat/src/main.rs

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
// Copyright (c) 2018-2020 Sean McArthur
2+
// Licensed under the MIT license http://opensource.org/licenses/MIT
3+
//
4+
// port from https://github.com/seanmonstar/warp/blob/master/examples/sse_chat.rs
5+
6+
use std::collections::HashMap;
7+
use std::result::Result as IoResult;
8+
use std::sync::atomic::{AtomicUsize, Ordering};
9+
10+
use futures_util::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
11+
use once_cell::sync::Lazy;
12+
use parking_lot::Mutex;
13+
use tokio::sync::mpsc;
14+
use tokio_stream::wrappers::UnboundedReceiverStream;
15+
16+
use silent::prelude::*;
17+
use silent::sse::{self, keep_alive};
18+
19+
type Users = Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>;
20+
21+
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
22+
static ONLINE_USERS: Lazy<Users> = Lazy::new(Users::default);
23+
24+
fn main() {
25+
logger::fmt().with_max_level(Level::INFO).init();
26+
27+
let route = Route::new("").get(index).append(
28+
Route::new("chat")
29+
.handler(
30+
Method::GET,
31+
HandlerWrapperResponse::new(user_connected).arc(),
32+
)
33+
.append(
34+
Route::new("<id:int>")
35+
.handler(Method::POST, HandlerWrapperResponse::new(chat_send).arc()),
36+
),
37+
);
38+
39+
Server::new()
40+
.bind("0.0.0.0:8001".parse().unwrap())
41+
.bind_route(route)
42+
.run();
43+
}
44+
45+
#[derive(Debug)]
46+
enum Message {
47+
UserId(usize),
48+
Reply(String),
49+
}
50+
51+
async fn chat_send(req: Request) -> Result<Response> {
52+
let my_id = req.get_path_params::<i32>("id")?;
53+
let msg = "hello";
54+
user_message(my_id as usize, msg);
55+
Ok(Response::empty())
56+
}
57+
58+
fn get_connected(_req: Request) -> impl Stream<Item = Result<sse::Event>> + Send + 'static {
59+
// Use a counter to assign a new unique ID for this user.
60+
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
61+
62+
info!("new chat user: {}", my_id);
63+
64+
// Use an unbounded channel to handle buffering and flushing of messages
65+
// to the event source...
66+
let (tx, rx) = mpsc::unbounded_channel();
67+
let rx = UnboundedReceiverStream::new(rx);
68+
69+
tx.send(Message::UserId(my_id))
70+
// rx is right above, so this cannot fail
71+
.unwrap();
72+
73+
// Save the sender in our list of connected users.
74+
ONLINE_USERS.lock().insert(my_id, tx);
75+
76+
// Convert messages into Server-Sent Events and returns resulting stream.
77+
let stream = rx.map(|msg| match msg {
78+
Message::UserId(my_id) => Ok(sse::Event::default().event("user").data(my_id.to_string())),
79+
Message::Reply(reply) => Ok(sse::Event::default().data(reply)),
80+
});
81+
stream
82+
}
83+
84+
async fn user_connected(_req: Request) -> Result<Response> {
85+
let stream = get_connected(_req);
86+
// SseKeepAlive::new(stream).streaming(res).ok();
87+
let mut res = Response::empty()
88+
.set_header(
89+
HeaderName::from_static("Content-Type"),
90+
HeaderValue::from_static("text/event-stream"),
91+
)
92+
.set_header(
93+
HeaderName::from_static("Cache-Control"),
94+
HeaderValue::from_static("no-cache"),
95+
)
96+
.set_header(
97+
HeaderName::from_static("Connection"),
98+
HeaderValue::from_static("keep-alive"),
99+
)
100+
.set_header(
101+
HeaderName::from_static("Access-Control-Allow-Origin"),
102+
HeaderValue::from_static("*"),
103+
);
104+
res.set_status(StatusCode::PARTIAL_CONTENT);
105+
// res.set_body(stream_body(stream));
106+
let event_stream = sse::keep_alive().stream(stream);
107+
let body_stream = event_stream
108+
.map_err(|error| {
109+
// FIXME: error logging
110+
error!("sse stream error: {}", error);
111+
SilentError::BusinessError {
112+
code: StatusCode::INTERNAL_SERVER_ERROR,
113+
msg: "sse::keep error".to_string(),
114+
}
115+
})
116+
.into_stream()
117+
.and_then(|event| future::ready(Ok(event.to_string())));
118+
res.set_body(stream_body(body_stream));
119+
Ok(res)
120+
}
121+
122+
fn user_message(my_id: usize, msg: &str) {
123+
let new_msg = format!("<User#{my_id}>: {msg}");
124+
125+
// New message from this user, send it to everyone else (except same uid)...
126+
//
127+
// We use `retain` instead of a for loop so that we can reap any user that
128+
// appears to have disconnected.
129+
ONLINE_USERS.lock().retain(|uid, tx| {
130+
if my_id == *uid {
131+
// don't send to same user, but do retain
132+
true
133+
} else {
134+
// If not `is_ok`, the SSE stream is gone, and so don't retain
135+
tx.send(Message::Reply(new_msg.clone())).is_ok()
136+
}
137+
});
138+
}
139+
140+
async fn index<'a>(_req: Request) -> Result<&'a str> {
141+
Ok(INDEX_HTML)
142+
}
143+
144+
static INDEX_HTML: &str = r#"
145+
<!DOCTYPE html>
146+
<html>
147+
<head>
148+
<title>SSE Chat</title>
149+
</head>
150+
<body>
151+
<h1>SSE Chat</h1>
152+
<div id="chat">
153+
<p><em>Connecting...</em></p>
154+
</div>
155+
<input type="text" id="msg" />
156+
<button type="button" id="submit">Send</button>
157+
<script>
158+
const chat = document.getElementById('chat');
159+
const msg = document.getElementById('msg');
160+
const submit = document.getElementById('submit');
161+
let sse = new EventSource(`http://${location.host}/chat`);
162+
sse.onopen = function() {
163+
chat.innerHTML = "<p><em>Connected!</em></p>";
164+
}
165+
var userId;
166+
sse.addEventListener("user", function(msg) {
167+
userId = msg.data;
168+
});
169+
sse.onmessage = function(msg) {
170+
showMessage(msg.data);
171+
};
172+
document.getElementById('submit').onclick = function() {
173+
var msg = text.value;
174+
var xhr = new XMLHttpRequest();
175+
xhr.open("POST", `${uri}/${user_id}`, true);
176+
xhr.send(msg);
177+
text.value = '';
178+
showMessage('<You>: ' + msg);
179+
};
180+
function showMessage(data) {
181+
const line = document.createElement('p');
182+
line.innerText = data;
183+
chat.appendChild(line);
184+
}
185+
</script>
186+
</body>
187+
</html>
188+
"#;

silent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@ chrono = { version = "0.4.24", default-features = false, features = ["clock"] }
4747
tokio-tungstenite = "0.19.0"
4848
headers = "0.3.8"
4949
tokio-stream = { version = "0.1.14", features = ["net"] }
50+
pin-project = "1.0"

silent/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ mod route;
1212
mod rt;
1313
#[cfg(feature = "server")]
1414
mod service;
15+
pub mod sse;
1516
#[cfg(feature = "ws")]
1617
mod ws;
1718

silent/src/sse/event.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use serde::Serialize;
2+
use serde_json::{self, Error};
3+
use std::fmt;
4+
use std::fmt::Write;
5+
use std::time::Duration;
6+
7+
// Server-sent event data type
8+
#[derive(Debug)]
9+
enum DataType {
10+
Text(String),
11+
Json(String),
12+
}
13+
14+
/// Server-sent event
15+
#[derive(Default, Debug)]
16+
pub struct Event {
17+
id: Option<String>,
18+
data: Option<DataType>,
19+
event: Option<String>,
20+
comment: Option<String>,
21+
retry: Option<Duration>,
22+
}
23+
24+
impl Event {
25+
/// Set Server-sent event data
26+
/// data field(s) ("data:<content>")
27+
pub fn data<T: Into<String>>(mut self, data: T) -> Event {
28+
self.data = Some(DataType::Text(data.into()));
29+
self
30+
}
31+
32+
/// Set Server-sent event data
33+
/// data field(s) ("data:<content>")
34+
pub fn json_data<T: Serialize>(mut self, data: T) -> Result<Event, Error> {
35+
self.data = Some(DataType::Json(serde_json::to_string(&data)?));
36+
Ok(self)
37+
}
38+
39+
/// Set Server-sent event comment
40+
/// Comment field (":<comment-text>")
41+
pub fn comment<T: Into<String>>(mut self, comment: T) -> Event {
42+
self.comment = Some(comment.into());
43+
self
44+
}
45+
46+
/// Set Server-sent event event
47+
/// Event name field ("event:<event-name>")
48+
pub fn event<T: Into<String>>(mut self, event: T) -> Event {
49+
self.event = Some(event.into());
50+
self
51+
}
52+
53+
/// Set Server-sent event retry
54+
/// Retry timeout field ("retry:<timeout>")
55+
pub fn retry(mut self, duration: Duration) -> Event {
56+
self.retry = Some(duration);
57+
self
58+
}
59+
60+
/// Set Server-sent event id
61+
/// Identifier field ("id:<identifier>")
62+
pub fn id<T: Into<String>>(mut self, id: T) -> Event {
63+
self.id = Some(id.into());
64+
self
65+
}
66+
}
67+
68+
impl fmt::Display for Event {
69+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70+
if let Some(ref comment) = &self.comment {
71+
":".fmt(f)?;
72+
comment.fmt(f)?;
73+
f.write_char('\n')?;
74+
}
75+
76+
if let Some(ref event) = &self.event {
77+
"event:".fmt(f)?;
78+
event.fmt(f)?;
79+
f.write_char('\n')?;
80+
}
81+
82+
match self.data {
83+
Some(DataType::Text(ref data)) => {
84+
for line in data.split('\n') {
85+
"data:".fmt(f)?;
86+
line.fmt(f)?;
87+
f.write_char('\n')?;
88+
}
89+
}
90+
Some(DataType::Json(ref data)) => {
91+
"data:".fmt(f)?;
92+
data.fmt(f)?;
93+
f.write_char('\n')?;
94+
}
95+
None => {}
96+
}
97+
98+
if let Some(ref id) = &self.id {
99+
"id:".fmt(f)?;
100+
id.fmt(f)?;
101+
f.write_char('\n')?;
102+
}
103+
104+
if let Some(ref duration) = &self.retry {
105+
"retry:".fmt(f)?;
106+
107+
let secs = duration.as_secs();
108+
let millis = duration.subsec_millis();
109+
110+
if secs > 0 {
111+
// format seconds
112+
secs.fmt(f)?;
113+
114+
// pad milliseconds
115+
if millis < 10 {
116+
f.write_str("00")?;
117+
} else if millis < 100 {
118+
f.write_char('0')?;
119+
}
120+
}
121+
122+
// format milliseconds
123+
millis.fmt(f)?;
124+
125+
f.write_char('\n')?;
126+
}
127+
128+
f.write_char('\n')?;
129+
Ok(())
130+
}
131+
}

0 commit comments

Comments
 (0)