Skip to content

Commit 5a34795

Browse files
authored
Ignore invalid event IDs & persist existing event IDs (#25)
* Ignore invalid event IDs * Persist already-seen event IDs into new events that lack IDs * Push utf decoding earlier into event parsing process * Enable 'headers' and 'last-event-id' capabilities
1 parent fd8fd2e commit 5a34795

File tree

7 files changed

+221
-182
lines changed

7 files changed

+221
-182
lines changed

Makefile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
TEMP_TEST_OUTPUT=/tmp/contract-test-service.log
2-
SKIPFLAGS = -skip 'reconnection' -skip 'basic parsing/ID field is ignored if it contains a null' -skip 'basic parsing/last ID persists if not overridden by later event' \
3-
-skip 'HTTP behavior/client follows 301 redirect' -skip 'HTTP behavior/client follows 307 redirect'
2+
SKIPFLAGS = -skip 'reconnection' -skip 'HTTP behavior/client follows 301 redirect' -skip 'HTTP behavior/client follows 307 redirect'
43

54

65
build-contract-tests:

contract-tests/src/bin/sse-test-api/main.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,11 @@ impl From<es::SSE> for EventType {
6161
es::SSE::Event(evt) => Self::Event {
6262
event: Event {
6363
event_type: evt.event_type,
64-
data: String::from_utf8(evt.data.to_vec()).unwrap(),
65-
id: String::from_utf8(evt.id.to_vec()).unwrap(),
64+
data: evt.data,
65+
id: evt.id,
6666
},
6767
},
68-
es::SSE::Comment(comment) => Self::Comment {
69-
comment: String::from_utf8(comment).unwrap(),
70-
},
68+
es::SSE::Comment(comment) => Self::Comment { comment },
7169
}
7270
}
7371
}
@@ -77,7 +75,7 @@ struct Event {
7775
#[serde(rename = "type")]
7876
event_type: String,
7977
data: String,
80-
id: String,
78+
id: Option<String>,
8179
}
8280

8381
async fn status() -> impl Responder {
@@ -86,8 +84,8 @@ async fn status() -> impl Responder {
8684
// "comments".to_string(),
8785
// "post".to_string(),
8886
// "report".to_string(),
89-
// "headers".to_string(),
90-
// "last-event-id".to_string(),
87+
"headers".to_string(),
88+
"last-event-id".to_string(),
9189
],
9290
})
9391
}

eventsource-client/examples/tail.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use es::Client;
22
use futures::{Stream, TryStreamExt};
3-
use std::{env, process, str::from_utf8, time::Duration};
3+
use std::{env, process, time::Duration};
44

55
use eventsource_client as es;
66

@@ -42,17 +42,10 @@ fn tail_events(client: impl Client) -> impl Stream<Item = Result<(), ()>> {
4242
.stream()
4343
.map_ok(|event| match event {
4444
es::SSE::Event(ev) => {
45-
println!(
46-
"got an event: {}\n{}",
47-
ev.event_type,
48-
from_utf8(&ev.data).unwrap_or_default()
49-
)
45+
println!("got an event: {}\n{}", ev.event_type, ev.data)
5046
}
5147
es::SSE::Comment(comment) => {
52-
println!(
53-
"got a comment: \n{}",
54-
from_utf8(&comment).unwrap_or_default()
55-
)
48+
println!("got a comment: \n{}", comment)
5649
}
5750
})
5851
.map_err(|err| eprintln!("error streaming events: {:?}", err))

eventsource-client/src/client.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub struct ClientBuilder {
6565
headers: HeaderMap,
6666
reconnect_opts: ReconnectOptions,
6767
read_timeout: Option<Duration>,
68-
last_event_id: String,
68+
last_event_id: Option<String>,
6969
method: String,
7070
body: Option<String>,
7171
}
@@ -86,7 +86,7 @@ impl ClientBuilder {
8686
headers: header_map,
8787
reconnect_opts: ReconnectOptions::default(),
8888
read_timeout: None,
89-
last_event_id: String::new(),
89+
last_event_id: None,
9090
method: String::from("GET"),
9191
body: None,
9292
})
@@ -107,7 +107,7 @@ impl ClientBuilder {
107107
/// Set the last event id for a stream when it is created. If it is set, it will be sent to the
108108
/// server in case it can replay missed events.
109109
pub fn last_event_id(mut self, last_event_id: String) -> ClientBuilder {
110-
self.last_event_id = last_event_id;
110+
self.last_event_id = Some(last_event_id);
111111
self
112112
}
113113

@@ -209,7 +209,7 @@ struct RequestProps {
209209
struct ClientImpl<C> {
210210
http: hyper::Client<C>,
211211
request_props: RequestProps,
212-
last_event_id: String,
212+
last_event_id: Option<String>,
213213
}
214214

215215
impl<C> Client for ClientImpl<C>
@@ -274,14 +274,14 @@ pub struct ReconnectingRequest<C> {
274274
state: State,
275275
next_reconnect_delay: Duration,
276276
event_parser: EventParser,
277-
last_event_id: String,
277+
last_event_id: Option<String>,
278278
}
279279

280280
impl<C> ReconnectingRequest<C> {
281281
fn new(
282282
http: hyper::Client<C>,
283283
props: RequestProps,
284-
last_event_id: String,
284+
last_event_id: Option<String>,
285285
) -> ReconnectingRequest<C> {
286286
let reconnect_delay = props.reconnect_opts.delay;
287287
ReconnectingRequest {
@@ -306,11 +306,11 @@ impl<C> ReconnectingRequest<C> {
306306
request_builder = request_builder.header(name, value);
307307
}
308308

309-
if !self.last_event_id.is_empty() {
310-
request_builder = request_builder.header(
311-
"last-event-id",
312-
HeaderValue::from_str(&self.last_event_id.clone()).unwrap(),
313-
);
309+
if self.last_event_id.is_some() {
310+
let id_as_header = HeaderValue::from_str(self.last_event_id.as_ref().unwrap())
311+
.map_err(|e| Error::InvalidParameter(Box::new(e)))?;
312+
313+
request_builder = request_builder.header("last-event-id", id_as_header);
314314
}
315315

316316
let body = match &self.props.body {
@@ -357,8 +357,8 @@ where
357357
if let Some(event) = this.event_parser.get_event() {
358358
return match event {
359359
SSE::Event(ref evt) => {
360-
if !evt.id.is_empty() {
361-
*this.last_event_id = String::from_utf8(evt.id.clone()).unwrap();
360+
if evt.id.is_some() {
361+
*this.last_event_id = evt.id.clone();
362362
}
363363

364364
if let Some(retry) = evt.retry {

eventsource-client/src/error.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ pub enum Error {
1818
UnexpectedEof,
1919
/// Encountered a line not conforming to the SSE protocol.
2020
InvalidLine(String),
21-
/// Encountered an event type that is not a valid UTF-8 byte sequence.
22-
InvalidEventType(std::str::Utf8Error),
2321
InvalidEvent,
2422
/// An unexpected failure occurred.
2523
Unexpected(Box<dyn std::error::Error + Send + 'static>),

0 commit comments

Comments
 (0)