Skip to content

Commit f35061a

Browse files
committed
Refactor to make compatible with existing client
1 parent f3cbcc9 commit f35061a

File tree

6 files changed

+54
-52
lines changed

6 files changed

+54
-52
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ actix = { version = "0.12.0", optional = true }
2525
actix-web = { version = "4.0.0-beta.10", optional = true }
2626
reqwest = { version = "0.11.6", features = ["json"], optional = true }
2727
env_logger = { version = "0.7.1", optional = true }
28+
hyper-timeout = "0.4.1"
2829

2930
[dev-dependencies]
3031
env_logger = "0.7.1"

examples/tail.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ async fn main() -> Result<(), es::Error> {
3636
Ok(())
3737
}
3838

39-
fn tail_events(client: es::Client<es::HttpsConnector>) -> impl Stream<Item = Result<(), ()>> {
39+
fn tail_events(
40+
client: es::Client<es::TimeoutConnector<es::HttpsConnector>>,
41+
) -> impl Stream<Item = Result<(), ()>> {
4042
client
4143
.stream()
4244
.map_ok(|event| {

src/bin/sse-test-api/main.rs

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ struct Config {
2121
stream_url: String,
2222
/// The URL of a callback endpoint created by the test harness .
2323
callback_url: String,
24-
/// A string describing the current test, if desired for logging.
25-
tag: Option<String>,
2624
/// An optional integer specifying the initial reconnection delay parameter, in
2725
/// milliseconds. Not all SSE client implementations allow this to be configured, but the
2826
/// test harness will send a value anyway in an attempt to avoid having reconnection tests
@@ -31,47 +29,30 @@ struct Config {
3129
/// An optional integer specifying the read timeout for the connection, in
3230
/// milliseconds.
3331
read_timeout_ms: Option<u64>,
34-
/// An optional string which should be sent as the Last-Event-Id header in the initial
35-
/// HTTP request. The test harness will only set this property if the test service has the
36-
/// "last-event-id" capability.
37-
last_event_id: Option<String>,
3832
/// A JSON object containing additional HTTP header names and string values. The SSE
3933
/// client should be configured to add these headers to its HTTP requests; the test harness
4034
/// will then verify that it receives those headers. The test harness will only set this
4135
/// property if the test service has the "headers" capability. Header names can be assumed
4236
/// to all be lowercase.
4337
headers: Option<HashMap<String, String>>,
44-
/// A string specifying an HTTP method to use instead of GET. The test harness will only
45-
/// set this property if the test service has the "post" or "report" capability.
46-
method: Option<String>,
47-
/// A string specifying data to be sent in the HTTP request body. The test harness will
48-
/// only set this property if the test service has the "post" or "report" capability.
49-
body: Option<String>,
5038
}
5139

5240
#[derive(Serialize, Debug)]
5341
#[serde(tag = "kind")]
5442
enum EventType {
5543
#[serde(rename = "event")]
5644
Event { event: Event },
57-
#[serde(rename = "comment")]
58-
Comment { comment: String },
5945
#[serde(rename = "error")]
6046
Error { error: String },
6147
}
6248

63-
impl From<es::SSE> for EventType {
64-
fn from(event: es::SSE) -> Self {
65-
match event {
66-
es::SSE::Event(evt) => Self::Event {
67-
event: Event {
68-
event_type: evt.event_type,
69-
data: String::from_utf8(evt.data.to_vec()).unwrap(),
70-
id: String::from_utf8(evt.id.to_vec()).unwrap(),
71-
},
72-
},
73-
es::SSE::Comment(comment) => Self::Comment {
74-
comment: String::from_utf8(comment).unwrap(),
49+
impl From<es::Event> for EventType {
50+
fn from(event: es::Event) -> Self {
51+
Self::Event {
52+
event: Event {
53+
event_type: event.event_type.clone(),
54+
data: String::from_utf8(event.field("data").unwrap_or_default().to_vec()).unwrap(),
55+
id: String::from_utf8(event.field("id").unwrap_or_default().to_vec()).unwrap(),
7556
},
7657
}
7758
}

src/bin/sse-test-api/stream_entity.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,18 +105,6 @@ impl Inner {
105105
client_builder = client_builder.read_timeout(Duration::from_millis(read_timeout_ms));
106106
}
107107

108-
if let Some(last_event_id) = &config.last_event_id {
109-
client_builder = client_builder.last_event_id(last_event_id.clone());
110-
}
111-
112-
if let Some(method) = &config.method {
113-
client_builder = client_builder.method(method.to_string());
114-
}
115-
116-
if let Some(body) = &config.body {
117-
client_builder = client_builder.body(body.to_string());
118-
}
119-
120108
if let Some(headers) = &config.headers {
121109
for (name, value) in headers {
122110
client_builder = match client_builder.header(name, value) {

src/client.rs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,41 @@ use std::{
33
future::Future,
44
mem,
55
pin::Pin,
6+
str::FromStr,
67
task::{Context, Poll},
78
time::Duration,
89
};
910

1011
use futures::{ready, Stream};
1112
use hyper::{
1213
body::{Bytes, HttpBody},
13-
client::{connect::Connect, ResponseFuture},
14-
header::HeaderMap,
14+
client::{
15+
connect::{Connect, Connection},
16+
ResponseFuture,
17+
},
18+
header::{HeaderMap, HeaderName, HeaderValue},
19+
service::Service,
1520
Body, Request, StatusCode, Uri,
1621
};
1722
#[cfg(feature = "rustls")]
1823
use hyper_rustls::HttpsConnector as RustlsConnector;
1924
use log::{debug, info, trace, warn};
2025
use pin_project::pin_project;
21-
use tokio::time::Sleep;
26+
27+
use tokio::{
28+
io::{AsyncRead, AsyncWrite},
29+
time::Sleep,
30+
};
2231

2332
use super::config::ReconnectOptions;
2433
use super::decode::Decoded;
2534
use super::error::{Error, Result};
2635

2736
pub use hyper::client::HttpConnector;
37+
2838
#[cfg(feature = "rustls")]
2939
pub type HttpsConnector = RustlsConnector<HttpConnector>;
40+
pub use hyper_timeout::TimeoutConnector;
3041

3142
/*
3243
* TODO remove debug output
@@ -37,18 +48,28 @@ pub struct ClientBuilder {
3748
url: Uri,
3849
headers: HeaderMap,
3950
reconnect_opts: ReconnectOptions,
51+
read_timeout: Option<Duration>,
4052
}
4153

4254
impl ClientBuilder {
4355
/// Set a HTTP header on the SSE request.
44-
pub fn header(mut self, key: &'static str, value: &str) -> Result<ClientBuilder> {
45-
let value = value
46-
.parse()
47-
.map_err(|e| Error::InvalidParameter(Box::new(e)))?;
48-
self.headers.insert(key, value);
56+
pub fn header(mut self, name: &str, value: &str) -> Result<ClientBuilder> {
57+
let name =
58+
HeaderName::from_str(name).map_err(|_| Error::HttpRequest(StatusCode::BAD_REQUEST))?;
59+
60+
let value = HeaderValue::from_str(value)
61+
.map_err(|_| Error::HttpRequest(StatusCode::BAD_REQUEST))?;
62+
63+
self.headers.insert(name, value);
4964
Ok(self)
5065
}
5166

67+
/// Set a read timeout for the underlying connection. There is no read timeout by default.
68+
pub fn read_timeout(mut self, read_timeout: Duration) -> ClientBuilder {
69+
self.read_timeout = Some(read_timeout);
70+
self
71+
}
72+
5273
/// Configure the client's reconnect behaviour according to the supplied
5374
/// [`ReconnectOptions`].
5475
///
@@ -58,12 +79,19 @@ impl ClientBuilder {
5879
self
5980
}
6081

61-
pub fn build_with_conn<C>(self, conn: C) -> Client<C>
82+
pub fn build_with_conn<C>(self, conn: C) -> Client<TimeoutConnector<C>>
6283
where
63-
C: Connect + Clone,
84+
C: Service<Uri> + Send + 'static + std::clone::Clone,
85+
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
86+
C::Future: Unpin + Send,
87+
C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send,
6488
{
89+
let mut connector = TimeoutConnector::new(conn);
90+
connector.set_read_timeout(self.read_timeout);
91+
let client = hyper::Client::builder().build::<_, hyper::Body>(connector);
92+
6593
Client {
66-
http: hyper::Client::builder().build(conn),
94+
http: client,
6795
request_props: RequestProps {
6896
url: self.url,
6997
headers: self.headers,
@@ -72,12 +100,12 @@ impl ClientBuilder {
72100
}
73101
}
74102

75-
pub fn build_http(self) -> Client<HttpConnector> {
103+
pub fn build_http(self) -> Client<TimeoutConnector<HttpConnector>> {
76104
self.build_with_conn(HttpConnector::new())
77105
}
78106

79107
#[cfg(feature = "rustls")]
80-
pub fn build(self) -> Client<HttpsConnector> {
108+
pub fn build(self) -> Client<TimeoutConnector<HttpsConnector>> {
81109
let conn = HttpsConnector::with_native_roots();
82110
self.build_with_conn(conn)
83111
}
@@ -122,6 +150,7 @@ impl Client<()> {
122150
url,
123151
headers: HeaderMap::new(),
124152
reconnect_opts: ReconnectOptions::default(),
153+
read_timeout: None,
125154
})
126155
}
127156
}

src/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use hyper::StatusCode;
33
/// Error type returned from this library's functions.
44
#[derive(Debug)]
55
pub enum Error {
6+
StreamClosed,
67
/// An invalid request parameter
78
InvalidParameter(Box<dyn std::error::Error + Send + 'static>),
89
/// The HTTP request failed.

0 commit comments

Comments
 (0)