Skip to content

Commit db63705

Browse files
committed
sse support finished
1 parent e20fc02 commit db63705

File tree

11 files changed

+113
-142
lines changed

11 files changed

+113
-142
lines changed

examples/custom_response/src/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use silent::header::CONTENT_TYPE;
12
use silent::prelude::*;
23

34
fn main() {
@@ -15,7 +16,9 @@ fn main() {
1516
}
1617

1718
async fn custom_response(_req: Request) -> Result<Response> {
18-
let res = Response::empty();
19+
let mut res = Response::empty();
20+
res.headers_mut()
21+
.insert(CONTENT_TYPE, HeaderValue::from_static("text"));
1922
let html = r#"
2023
<!DOCTYPE html>
2124
<html lang="en">
@@ -27,7 +30,7 @@ async fn custom_response(_req: Request) -> Result<Response> {
2730
<h1>custom response</h1>
2831
</body>
2932
</html>"#;
30-
let res = res.set_body(full(html));
33+
res.set_body(full(html));
3134
Ok(res)
3235
}
3336

examples/sse-chat/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ edition = "2021"
66

77
[dependencies]
88
futures-util = "0.3"
9-
silent = { path = "../../silent" }
9+
silent = { path = "../../silent", features = ["sse"] }
1010
once_cell = "1"
1111
parking_lot = "0.12"
1212
tokio = { version = "1", features = ["macros"] }

examples/sse-chat/src/main.rs

Lines changed: 28 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,15 @@
44
// port from https://github.com/seanmonstar/warp/blob/master/examples/sse_chat.rs
55

66
use std::collections::HashMap;
7-
use std::result::Result as IoResult;
87
use std::sync::atomic::{AtomicUsize, Ordering};
98

10-
use futures_util::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
9+
use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt};
1110
use once_cell::sync::Lazy;
1211
use parking_lot::Mutex;
1312
use tokio::sync::mpsc;
1413
use tokio_stream::wrappers::UnboundedReceiverStream;
1514

1615
use silent::prelude::*;
17-
use silent::sse::{self, keep_alive};
1816

1917
type Users = Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>;
2018

@@ -55,7 +53,7 @@ async fn chat_send(req: Request) -> Result<Response> {
5553
Ok(Response::empty())
5654
}
5755

58-
fn get_connected(_req: Request) -> impl Stream<Item = Result<sse::Event>> + Send + 'static {
56+
async fn user_connected(_req: Request) -> Result<Response> {
5957
// Use a counter to assign a new unique ID for this user.
6058
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
6159

@@ -75,47 +73,16 @@ fn get_connected(_req: Request) -> impl Stream<Item = Result<sse::Event>> + Send
7573

7674
// Convert messages into Server-Sent Events and returns resulting stream.
7775
let stream = rx.map(|msg| match msg {
78-
Message::UserId(my_id) => Ok(sse::Event::default().event("user").data(my_id.to_string())),
79-
Message::Reply(reply) => Ok(sse::Event::default().data(reply)),
76+
Message::UserId(my_id) => {
77+
warn!("user {} disconnected", my_id);
78+
Ok(SSEEvent::default().event("user").data(my_id.to_string()))
79+
}
80+
Message::Reply(reply) => {
81+
warn!("sse_reply: {}", reply);
82+
Ok(SSEEvent::default().data(reply))
83+
}
8084
});
81-
stream
82-
}
83-
84-
async fn user_connected(_req: Request) -> Result<Response> {
85-
let stream = get_connected(_req);
86-
// SseKeepAlive::new(stream).streaming(res).ok();
87-
let mut res = Response::empty()
88-
.set_header(
89-
HeaderName::from_static("Content-Type"),
90-
HeaderValue::from_static("text/event-stream"),
91-
)
92-
.set_header(
93-
HeaderName::from_static("Cache-Control"),
94-
HeaderValue::from_static("no-cache"),
95-
)
96-
.set_header(
97-
HeaderName::from_static("Connection"),
98-
HeaderValue::from_static("keep-alive"),
99-
)
100-
.set_header(
101-
HeaderName::from_static("Access-Control-Allow-Origin"),
102-
HeaderValue::from_static("*"),
103-
);
104-
res.set_status(StatusCode::PARTIAL_CONTENT);
105-
// res.set_body(stream_body(stream));
106-
let event_stream = sse::keep_alive().stream(stream);
107-
let body_stream = event_stream
108-
.map_err(|error| {
109-
// FIXME: error logging
110-
error!("sse stream error: {}", error);
111-
SilentError::BusinessError {
112-
code: StatusCode::INTERNAL_SERVER_ERROR,
113-
msg: "sse::keep error".to_string(),
114-
}
115-
})
116-
.into_stream()
117-
.and_then(|event| future::ready(Ok(event.to_string())));
118-
res.set_body(stream_body(body_stream));
85+
let res = sse_reply(stream);
11986
Ok(res)
12087
}
12188

@@ -148,40 +115,38 @@ static INDEX_HTML: &str = r#"
148115
<title>SSE Chat</title>
149116
</head>
150117
<body>
151-
<h1>SSE Chat</h1>
118+
<h1>SSE chat</h1>
152119
<div id="chat">
153120
<p><em>Connecting...</em></p>
154121
</div>
155-
<input type="text" id="msg" />
156-
<button type="button" id="submit">Send</button>
157-
<script>
158-
const chat = document.getElementById('chat');
159-
const msg = document.getElementById('msg');
160-
const submit = document.getElementById('submit');
161-
let sse = new EventSource(`http://${location.host}/chat`);
122+
<input type="text" id="text" />
123+
<button type="button" id="send">Send</button>
124+
<script type="text/javascript">
125+
var uri = 'http://' + location.host + '/chat';
126+
var sse = new EventSource(uri);
127+
function message(data) {
128+
var line = document.createElement('p');
129+
line.innerText = data;
130+
chat.appendChild(line);
131+
}
162132
sse.onopen = function() {
163133
chat.innerHTML = "<p><em>Connected!</em></p>";
164134
}
165-
var userId;
135+
var user_id;
166136
sse.addEventListener("user", function(msg) {
167-
userId = msg.data;
137+
user_id = msg.data;
168138
});
169139
sse.onmessage = function(msg) {
170-
showMessage(msg.data);
140+
message(msg.data);
171141
};
172-
document.getElementById('submit').onclick = function() {
142+
send.onclick = function() {
173143
var msg = text.value;
174144
var xhr = new XMLHttpRequest();
175-
xhr.open("POST", `${uri}/${user_id}`, true);
145+
xhr.open("POST", uri + '/' + user_id, true);
176146
xhr.send(msg);
177147
text.value = '';
178-
showMessage('<You>: ' + msg);
148+
message('<You>: ' + msg);
179149
};
180-
function showMessage(data) {
181-
const line = document.createElement('p');
182-
line.innerText = data;
183-
chat.appendChild(line);
184-
}
185150
</script>
186151
</body>
187152
</html>

silent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ default = ["server", "test", "static"]
1919
full = ["server", "wasi", "static", "test"]
2020
server = ["tokio/fs", "tokio/net", "tokio/rt-multi-thread", "tokio/signal"]
2121
ws = []
22+
sse = []
2223
static = []
2324
wasi = ["tokio/sync"]
2425
test = ["tokio/macros", "tokio/rt"]

silent/src/core/response.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ impl Response {
5858
self.headers.insert(key, value);
5959
self
6060
}
61+
#[inline]
62+
/// 设置响应header
63+
pub fn headers(&self) -> &HeaderMap {
64+
&self.headers
65+
}
66+
#[inline]
67+
/// 设置响应header
68+
pub fn headers_mut(&mut self) -> &mut HeaderMap {
69+
&mut self.headers
70+
}
6171
/// 设置响应header
6272
pub fn set_typed_header<H>(&mut self, header: H)
6373
where

silent/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ mod route;
1212
mod rt;
1313
#[cfg(feature = "server")]
1414
mod service;
15-
pub mod sse;
15+
#[cfg(feature = "sse")]
16+
mod sse;
1617
#[cfg(feature = "ws")]
1718
mod ws;
1819

@@ -43,6 +44,8 @@ pub mod prelude {
4344
pub use crate::route::Route;
4445
#[cfg(feature = "server")]
4546
pub use crate::service::Server;
47+
#[cfg(feature = "sse")]
48+
pub use crate::sse::{sse_reply, SSEEvent};
4649
#[cfg(feature = "ws")]
4750
pub use crate::ws::{
4851
FnOnClose, FnOnConnect, FnOnNoneResultFut, FnOnReceive, FnOnSend, FnOnSendFut,

silent/src/service/hyper_service.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ impl HyperHandler {
3434
async move {
3535
match routes.clone().handle(req, remote_addr).await {
3636
Ok(res) => {
37-
response.set_status(res.status_code);
38-
response.set_body(res.body);
37+
response = res;
3938
}
4039
Err((mes, code)) => {
4140
tracing::error!("Failed to handle request: {:?}", mes);

silent/src/sse/event.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,59 +13,59 @@ enum DataType {
1313

1414
/// Server-sent event
1515
#[derive(Default, Debug)]
16-
pub struct Event {
16+
pub struct SSEEvent {
1717
id: Option<String>,
1818
data: Option<DataType>,
1919
event: Option<String>,
2020
comment: Option<String>,
2121
retry: Option<Duration>,
2222
}
2323

24-
impl Event {
24+
impl SSEEvent {
2525
/// Set Server-sent event data
2626
/// data field(s) ("data:<content>")
27-
pub fn data<T: Into<String>>(mut self, data: T) -> Event {
27+
pub fn data<T: Into<String>>(mut self, data: T) -> SSEEvent {
2828
self.data = Some(DataType::Text(data.into()));
2929
self
3030
}
3131

3232
/// Set Server-sent event data
3333
/// data field(s) ("data:<content>")
34-
pub fn json_data<T: Serialize>(mut self, data: T) -> Result<Event, Error> {
34+
pub fn json_data<T: Serialize>(mut self, data: T) -> Result<SSEEvent, Error> {
3535
self.data = Some(DataType::Json(serde_json::to_string(&data)?));
3636
Ok(self)
3737
}
3838

3939
/// Set Server-sent event comment
4040
/// Comment field (":<comment-text>")
41-
pub fn comment<T: Into<String>>(mut self, comment: T) -> Event {
41+
pub fn comment<T: Into<String>>(mut self, comment: T) -> SSEEvent {
4242
self.comment = Some(comment.into());
4343
self
4444
}
4545

4646
/// Set Server-sent event event
47-
/// Event name field ("event:<event-name>")
48-
pub fn event<T: Into<String>>(mut self, event: T) -> Event {
47+
/// SSEEvent name field ("event:<event-name>")
48+
pub fn event<T: Into<String>>(mut self, event: T) -> SSEEvent {
4949
self.event = Some(event.into());
5050
self
5151
}
5252

5353
/// Set Server-sent event retry
5454
/// Retry timeout field ("retry:<timeout>")
55-
pub fn retry(mut self, duration: Duration) -> Event {
55+
pub fn retry(mut self, duration: Duration) -> SSEEvent {
5656
self.retry = Some(duration);
5757
self
5858
}
5959

6060
/// Set Server-sent event id
6161
/// Identifier field ("id:<identifier>")
62-
pub fn id<T: Into<String>>(mut self, id: T) -> Event {
62+
pub fn id<T: Into<String>>(mut self, id: T) -> SSEEvent {
6363
self.id = Some(id.into());
6464
self
6565
}
6666
}
6767

68-
impl fmt::Display for Event {
68+
impl fmt::Display for SSEEvent {
6969
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7070
if let Some(ref comment) = &self.comment {
7171
":".fmt(f)?;

0 commit comments

Comments
 (0)