Skip to content

Commit 2109a90

Browse files
authored
Refactor Client to a trait (#17)
* Refactor Client to be a trait rather than a parameterized struct This should allow us to change client's implementation details without it being a breaking change for downstream users. Currently, the Client trait only requires a single stream() method.
1 parent 442a418 commit 2109a90

File tree

6 files changed

+115
-97
lines changed

6 files changed

+115
-97
lines changed

contract-tests/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ actix = { version = "0.12.0"}
1313
actix-web = { version = "4.0.0-beta.10"}
1414
reqwest = { version = "0.11.6", default_features = false, features = ["json", "rustls-tls"] }
1515
env_logger = { version = "0.7.1" }
16-
hyper = { version = "0.14.4", features = ["client", "http1", "tcp"] }
16+
hyper = { version = "0.14.17", features = ["client", "http1", "tcp"] }
1717
log = "0.4.6"
1818

1919
[[bin]]

contract-tests/src/bin/sse-test-api/stream_entity.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@ use eventsource_client as es;
1010

1111
use crate::{Config, EventType};
1212

13-
type Connector = es::HttpsConnector;
14-
1513
pub(crate) struct Inner {
1614
callback_counter: Mutex<i32>,
1715
callback_url: String,
18-
client: es::Client<Connector>,
16+
client: Box<dyn es::Client>,
1917
}
2018

2119
impl Inner {
@@ -30,9 +28,8 @@ impl Inner {
3028
}
3129

3230
pub(crate) async fn start(&self) {
33-
let stream = self.client.stream();
31+
let mut stream = self.client.stream();
3432

35-
let mut stream = Box::pin(stream);
3633
let client = reqwest::Client::new();
3734

3835
loop {
@@ -89,8 +86,8 @@ impl Inner {
8986
true
9087
}
9188

92-
fn build_client(config: &Config) -> Result<es::Client<Connector>, String> {
93-
let mut client_builder = match es::Client::for_url(&config.stream_url) {
89+
fn build_client(config: &Config) -> Result<Box<dyn es::Client>, String> {
90+
let mut client_builder = match es::ClientBuilder::for_url(&config.stream_url) {
9491
Ok(cb) => cb,
9592
Err(e) => return Err(format!("Failed to create client builder {:?}", e)),
9693
};
@@ -110,7 +107,9 @@ impl Inner {
110107
}
111108
}
112109

113-
Ok(client_builder.reconnect(reconnect_options.build()).build())
110+
Ok(Box::new(
111+
client_builder.reconnect(reconnect_options.build()).build(),
112+
))
114113
}
115114
}
116115

eventsource-client/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ exclude = [
1313
]
1414

1515
[dependencies]
16-
futures = "0.3.12"
17-
hyper = { version = "0.14.4", features = ["client", "http1", "tcp"] }
16+
futures = "0.3.21"
17+
hyper = { version = "0.14.17", features = ["client", "http1", "tcp"] }
1818
hyper-rustls = { version = "0.22.1", optional = true }
1919
log = "0.4.6"
20-
pin-project = "1.0.5"
21-
tokio = { version = "1.2.0", features = ["time"] }
20+
pin-project = "1.0.10"
21+
tokio = { version = "1.17.0", features = ["time"] }
2222
hyper-timeout = "0.4.1"
2323

2424
[dev-dependencies]

eventsource-client/examples/tail.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use std::{env, process, str::from_utf8, time::Duration};
2-
1+
use es::Client;
32
use futures::{Stream, TryStreamExt};
3+
use std::{env, process, str::from_utf8, time::Duration};
44

55
use eventsource_client as es;
66

@@ -18,7 +18,7 @@ async fn main() -> Result<(), es::Error> {
1818
let url = &args[1];
1919
let auth_header = &args[2];
2020

21-
let client = es::Client::for_url(url)?
21+
let client = es::ClientBuilder::for_url(url)?
2222
.header("Authorization", auth_header)?
2323
.reconnect(
2424
es::ReconnectOptions::reconnect(true)
@@ -30,13 +30,14 @@ async fn main() -> Result<(), es::Error> {
3030
)
3131
.build();
3232

33-
let mut stream = Box::pin(tail_events(client));
33+
let mut stream = tail_events(client);
34+
3435
while let Ok(Some(_)) = stream.try_next().await {}
3536

3637
Ok(())
3738
}
3839

39-
fn tail_events(client: es::Client<es::HttpsConnector>) -> impl Stream<Item = Result<(), ()>> {
40+
fn tail_events(client: impl Client) -> impl Stream<Item = Result<(), ()>> {
4041
client
4142
.stream()
4243
.map_ok(|event| {

eventsource-client/src/client.rs

Lines changed: 94 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,3 @@
1-
use std::{
2-
fmt::{self, Debug, Display, Formatter},
3-
future::Future,
4-
mem,
5-
pin::Pin,
6-
str::FromStr,
7-
task::{Context, Poll},
8-
time::Duration,
9-
};
10-
111
use futures::{ready, Stream};
122
use hyper::{
133
body::{Bytes, HttpBody},
@@ -19,28 +9,63 @@ use hyper::{
199
use hyper_rustls::HttpsConnector as RustlsConnector;
2010
use log::{debug, info, trace, warn};
2111
use pin_project::pin_project;
12+
use std::{
13+
boxed,
14+
fmt::{self, Debug, Display, Formatter},
15+
future::Future,
16+
mem,
17+
pin::Pin,
18+
str::FromStr,
19+
task::{Context, Poll},
20+
time::Duration,
21+
};
2222
use tokio::time::Sleep;
2323

2424
use super::config::ReconnectOptions;
2525
use super::decode::Decoded;
2626
use super::error::{Error, Result};
2727

28+
use crate::Event;
2829
pub use hyper::client::HttpConnector;
30+
2931
#[cfg(feature = "rustls")]
3032
pub type HttpsConnector = RustlsConnector<HttpConnector>;
3133

34+
/// Represents a [`Pin`]'d [`Send`] + [`Sync`] stream, returned by [`Client`]'s stream method.
35+
pub type BoxStream<T> = Pin<boxed::Box<dyn Stream<Item = T> + Send + Sync>>;
36+
37+
/// Client is the Server-Sent-Events interface.
38+
/// This trait is sealed and cannot be implemented for types outside this crate.
39+
pub trait Client: Send + Sync + private::Sealed {
40+
/// Returns a stream of [`Event`]s.
41+
fn stream(&self) -> BoxStream<Result<Event>>;
42+
}
43+
3244
/*
3345
* TODO remove debug output
3446
* TODO specify list of stati to not retry (e.g. 204)
3547
*/
3648

49+
/// ClientBuilder provides a series of builder methods to easily construct a [`Client`].
3750
pub struct ClientBuilder {
3851
url: Uri,
3952
headers: HeaderMap,
4053
reconnect_opts: ReconnectOptions,
4154
}
4255

4356
impl ClientBuilder {
57+
/// Create a builder for a given URL.
58+
pub fn for_url(url: &str) -> Result<ClientBuilder> {
59+
let url = url
60+
.parse()
61+
.map_err(|e| Error::InvalidParameter(Box::new(e)))?;
62+
Ok(ClientBuilder {
63+
url,
64+
headers: HeaderMap::new(),
65+
reconnect_opts: ReconnectOptions::default(),
66+
})
67+
}
68+
4469
/// Set a HTTP header on the SSE request.
4570
pub fn header(mut self, name: &str, value: &str) -> Result<ClientBuilder> {
4671
let name =
@@ -62,11 +87,12 @@ impl ClientBuilder {
6287
self
6388
}
6489

65-
pub fn build_with_conn<C>(self, conn: C) -> Client<C>
90+
/// Build with a specific client connector.
91+
pub fn build_with_conn<C>(self, conn: C) -> impl Client
6692
where
67-
C: Connect + Clone,
93+
C: Connect + Clone + Send + Sync + 'static,
6894
{
69-
Client {
95+
ClientImpl {
7096
http: hyper::Client::builder().build(conn),
7197
request_props: RequestProps {
7298
url: self.url,
@@ -76,18 +102,24 @@ impl ClientBuilder {
76102
}
77103
}
78104

79-
pub fn build_http(self) -> Client<HttpConnector> {
105+
/// Build with an HTTP client connector.
106+
pub fn build_http(self) -> impl Client {
80107
self.build_with_conn(HttpConnector::new())
81108
}
82109

83110
#[cfg(feature = "rustls")]
84-
pub fn build(self) -> Client<HttpsConnector> {
111+
/// Build with an HTTPS client connector, using the OS root certificate store.
112+
pub fn build(self) -> impl Client {
85113
let conn = HttpsConnector::with_native_roots();
86114
self.build_with_conn(conn)
87115
}
88116

89-
pub fn build_with_http_client<C>(self, http: hyper::Client<C>) -> Client<C> {
90-
Client {
117+
/// Build with the given [`hyper::client::Client`].
118+
pub fn build_with_http_client<C>(self, http: hyper::Client<C>) -> impl Client
119+
where
120+
C: Connect + Clone + Send + Sync + 'static,
121+
{
122+
ClientImpl {
91123
http,
92124
request_props: RequestProps {
93125
url: self.url,
@@ -105,62 +137,33 @@ struct RequestProps {
105137
reconnect_opts: ReconnectOptions,
106138
}
107139

108-
/// Client that connects to a server using the Server-Sent Events protocol
140+
/// A client implementation that connects to a server using the Server-Sent Events protocol
109141
/// and consumes the event stream indefinitely.
110-
pub struct Client<C> {
142+
/// Can be parameterized with different hyper Connectors, such as HTTP or HTTPS.
143+
struct ClientImpl<C> {
111144
http: hyper::Client<C>,
112145
request_props: RequestProps,
113146
}
114147

115-
impl Client<()> {
116-
/// Construct a new `Client` (via a [`ClientBuilder`]). This will not
117-
/// perform any network activity until [`.stream()`] is called.
118-
///
119-
/// [`ClientBuilder`]: struct.ClientBuilder.html
120-
/// [`.stream()`]: #method.stream
121-
pub fn for_url(url: &str) -> Result<ClientBuilder> {
122-
let url = url
123-
.parse()
124-
.map_err(|e| Error::InvalidParameter(Box::new(e)))?;
125-
Ok(ClientBuilder {
126-
url,
127-
headers: HeaderMap::new(),
128-
reconnect_opts: ReconnectOptions::default(),
129-
})
130-
}
131-
}
132-
133-
pub type EventStream<C> = Decoded<ReconnectingRequest<C>>;
134-
135-
impl<C> Client<C> {
148+
impl<C> Client for ClientImpl<C>
149+
where
150+
C: Connect + Clone + Send + Sync + 'static,
151+
{
136152
/// Connect to the server and begin consuming the stream. Produces a
137153
/// [`Stream`] of [`Event`](crate::Event)s wrapped in [`Result`].
138154
///
139155
/// Do not use the stream after it returned an error!
140156
///
141157
/// After the first successful connection, the stream will
142158
/// reconnect for retryable errors.
143-
pub fn stream(&self) -> EventStream<C>
144-
where
145-
C: Connect + Clone + Send + Sync + 'static,
146-
{
147-
Decoded::new(ReconnectingRequest::new(
159+
fn stream(&self) -> BoxStream<Result<Event>> {
160+
Box::pin(Decoded::new(ReconnectingRequest::new(
148161
self.http.clone(),
149162
self.request_props.clone(),
150-
))
163+
)))
151164
}
152165
}
153166

154-
#[must_use = "streams do nothing unless polled"]
155-
#[pin_project]
156-
pub struct ReconnectingRequest<C> {
157-
http: hyper::Client<C>,
158-
props: RequestProps,
159-
#[pin]
160-
state: State,
161-
next_reconnect_delay: Duration,
162-
}
163-
164167
#[allow(clippy::large_enum_variant)] // false positive
165168
#[pin_project(project = StateProj)]
166169
enum State {
@@ -192,6 +195,16 @@ impl Debug for State {
192195
}
193196
}
194197

198+
#[must_use = "streams do nothing unless polled"]
199+
#[pin_project]
200+
pub struct ReconnectingRequest<C> {
201+
http: hyper::Client<C>,
202+
props: RequestProps,
203+
#[pin]
204+
state: State,
205+
next_reconnect_delay: Duration,
206+
}
207+
195208
impl<C> ReconnectingRequest<C> {
196209
fn new(http: hyper::Client<C>, props: RequestProps) -> ReconnectingRequest<C> {
197210
let reconnect_delay = props.reconnect_opts.delay;
@@ -202,9 +215,7 @@ impl<C> ReconnectingRequest<C> {
202215
next_reconnect_delay: reconnect_delay,
203216
}
204217
}
205-
}
206218

207-
impl<C> ReconnectingRequest<C> {
208219
fn send_request(&self) -> Result<ResponseFuture>
209220
where
210221
C: Connect + Clone + Send + Sync + 'static,
@@ -235,24 +246,6 @@ impl<C> ReconnectingRequest<C> {
235246
}
236247
}
237248

238-
fn delay(dur: Duration, description: &str) -> Sleep {
239-
info!("Waiting {:?} before {}", dur, description);
240-
tokio::time::sleep(dur)
241-
}
242-
243-
#[derive(Debug)]
244-
struct StatusError {
245-
status: StatusCode,
246-
}
247-
248-
impl Display for StatusError {
249-
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
250-
write!(f, "Invalid status code: {}", self.status)
251-
}
252-
}
253-
254-
impl std::error::Error for StatusError {}
255-
256249
impl<C> Stream for ReconnectingRequest<C>
257250
where
258251
C: Connect + Clone + Send + Sync + 'static,
@@ -343,3 +336,28 @@ where
343336
}
344337
}
345338
}
339+
340+
fn delay(dur: Duration, description: &str) -> Sleep {
341+
info!("Waiting {:?} before {}", dur, description);
342+
tokio::time::sleep(dur)
343+
}
344+
345+
#[derive(Debug)]
346+
struct StatusError {
347+
status: StatusCode,
348+
}
349+
350+
impl Display for StatusError {
351+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
352+
write!(f, "Invalid status code: {}", self.status)
353+
}
354+
}
355+
356+
impl std::error::Error for StatusError {}
357+
358+
mod private {
359+
use crate::client::ClientImpl;
360+
361+
pub trait Sealed {}
362+
impl<C> Sealed for ClientImpl<C> {}
363+
}

0 commit comments

Comments
 (0)