Skip to content

Commit 9c3f5a6

Browse files
authored
Add read timeout via TimeoutConnector (#18)
This commit doesn't update the state machine to handle read timeouts. It only adds the new API for users to set a read timeout.
1 parent 2109a90 commit 9c3f5a6

File tree

1 file changed

+31
-4
lines changed

1 file changed

+31
-4
lines changed

eventsource-client/src/client.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
use futures::{ready, Stream};
22
use hyper::{
33
body::{Bytes, HttpBody},
4-
client::{connect::Connect, ResponseFuture},
4+
client::{
5+
connect::{Connect, Connection},
6+
ResponseFuture,
7+
},
58
header::{HeaderMap, HeaderName, HeaderValue},
9+
service::Service,
610
Body, Request, StatusCode, Uri,
711
};
812
#[cfg(feature = "rustls")]
@@ -19,18 +23,25 @@ use std::{
1923
task::{Context, Poll},
2024
time::Duration,
2125
};
22-
use tokio::time::Sleep;
26+
27+
use tokio::{
28+
io::{AsyncRead, AsyncWrite},
29+
time::Sleep,
30+
};
2331

2432
use super::config::ReconnectOptions;
2533
use super::decode::Decoded;
2634
use super::error::{Error, Result};
2735

2836
use crate::Event;
2937
pub use hyper::client::HttpConnector;
38+
use hyper_timeout::TimeoutConnector;
3039

3140
#[cfg(feature = "rustls")]
3241
pub type HttpsConnector = RustlsConnector<HttpConnector>;
3342

43+
type BoxError = Box<dyn std::error::Error + Send + Sync>;
44+
3445
/// Represents a [`Pin`]'d [`Send`] + [`Sync`] stream, returned by [`Client`]'s stream method.
3546
pub type BoxStream<T> = Pin<boxed::Box<dyn Stream<Item = T> + Send + Sync>>;
3647

@@ -51,6 +62,7 @@ pub struct ClientBuilder {
5162
url: Uri,
5263
headers: HeaderMap,
5364
reconnect_opts: ReconnectOptions,
65+
read_timeout: Option<Duration>,
5466
}
5567

5668
impl ClientBuilder {
@@ -63,6 +75,7 @@ impl ClientBuilder {
6375
url,
6476
headers: HeaderMap::new(),
6577
reconnect_opts: ReconnectOptions::default(),
78+
read_timeout: None,
6679
})
6780
}
6881

@@ -78,6 +91,12 @@ impl ClientBuilder {
7891
Ok(self)
7992
}
8093

94+
/// Set a read timeout for the underlying connection. There is no read timeout by default.
95+
pub fn read_timeout(mut self, read_timeout: Duration) -> ClientBuilder {
96+
self.read_timeout = Some(read_timeout);
97+
self
98+
}
99+
81100
/// Configure the client's reconnect behaviour according to the supplied
82101
/// [`ReconnectOptions`].
83102
///
@@ -90,10 +109,18 @@ impl ClientBuilder {
90109
/// Build with a specific client connector.
91110
pub fn build_with_conn<C>(self, conn: C) -> impl Client
92111
where
93-
C: Connect + Clone + Send + Sync + 'static,
112+
C: Service<Uri> + Clone + Send + Sync + 'static,
113+
C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
114+
C::Future: Send + 'static,
115+
C::Error: Into<BoxError>,
94116
{
117+
let mut connector = TimeoutConnector::new(conn);
118+
connector.set_read_timeout(self.read_timeout);
119+
120+
let client = hyper::Client::builder().build::<_, hyper::Body>(connector);
121+
95122
ClientImpl {
96-
http: hyper::Client::builder().build(conn),
123+
http: client,
97124
request_props: RequestProps {
98125
url: self.url,
99126
headers: self.headers,

0 commit comments

Comments
 (0)