Skip to content

Commit 1ea430d

Browse files
authored
Merge pull request #15 from launchdarkly/cw/sc-132539/test-harness
Implement minimal SSE test harness.
2 parents 1a62a96 + 941b7bb commit 1ea430d

22 files changed

+481
-87
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
**/*.rs.bk
33
Cargo.lock
44
.idea
5-
.DS_Store
5+
.DS_Store

.ldrelease/build.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
3+
set -ue
4+
5+
cd eventsource-client
6+
cargo package

.ldrelease/publish-dry-run.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
3+
set -ue
4+
5+
echo "DRY RUN: not publishing to crates.io, only copying crate"
6+
cp ./target/package/*.crate "${LD_RELEASE_ARTIFACTS_DIR}"

.ldrelease/publish.sh

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#!/bin/bash
2+
3+
set -ue
4+
5+
export CARGO_REGISTRY_TOKEN="$(cat "${LD_RELEASE_SECRETS_DIR}/rust_cratesio_api_token")"
6+
7+
cd eventsource-client
8+
cargo publish

.ldrelease/update-version.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/bin/bash
2+
3+
set -ue
4+
5+
sed -i "/^version\b/c version = \"${LD_RELEASE_VERSION}\"" ./eventsource-client/Cargo.toml

Cargo.toml

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,6 @@
1-
[package]
2-
name = "eventsource-client"
3-
version = "0.8.2"
4-
description = "Client for the Server-Sent Events protocol (aka EventSource)"
5-
repository = "https://github.com/launchdarkly/rust-eventsource-client"
6-
authors = ["LaunchDarkly"]
7-
edition = "2018"
8-
license = "Apache-2.0"
9-
keywords = ["launchdarkly", "feature-flags", "feature-toggles", "eventsource", "server-sent-events"]
10-
exclude = [
11-
".circleci",
12-
".ldrelease"
13-
]
14-
15-
[dependencies]
16-
futures = "0.3.12"
17-
hyper = { version = "0.14.4", features = ["client", "http1", "tcp"] }
18-
hyper-rustls = { version = "0.22.1", optional = true }
19-
log = "0.4.6"
20-
pin-project = "1.0.5"
21-
tokio = { version = "1.2.0", features = ["time"] }
22-
23-
[dev-dependencies]
24-
env_logger = "0.7.1"
25-
maplit = "1.0.1"
26-
simplelog = "0.5.3"
27-
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] }
1+
[workspace]
282

29-
[features]
30-
default = ["rustls"]
31-
rustls = ["hyper-rustls", "hyper/http2"]
32-
33-
[[example]]
34-
name = "tail"
35-
required-features = ["rustls"]
3+
members = [
4+
"contract-tests",
5+
"eventsource-client"
6+
]

README.md

Lines changed: 0 additions & 46 deletions
This file was deleted.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
eventsource-client/README.md

contract-tests/Cargo.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "contract-tests"
3+
version = "0.1.0"
4+
edition = "2018"
5+
license = "Apache-2.0"
6+
7+
[dependencies]
8+
futures = { version = "0.3.21" }
9+
serde = { version = "1.0", features = ["derive"] }
10+
eventsource-client = { path = "../eventsource-client" }
11+
serde_json = { version = "1.0.39"}
12+
actix = { version = "0.12.0"}
13+
actix-web = { version = "4.0.0-beta.10"}
14+
reqwest = { version = "0.11.6", default_features = false, features = ["json", "rustls-tls"] }
15+
env_logger = { version = "0.7.1" }
16+
hyper = { version = "0.14.4", features = ["client", "http1", "tcp"] }
17+
log = "0.4.6"
18+
19+
[[bin]]
20+
name = "sse-test-api"
21+
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
mod stream_entity;
2+
3+
use actix_web::{guard, web, App, HttpRequest, HttpResponse, HttpServer, Responder};
4+
use eventsource_client as es;
5+
use futures::executor;
6+
use serde::{self, Deserialize, Serialize};
7+
use std::collections::HashMap;
8+
use std::sync::{mpsc, Mutex};
9+
use std::thread;
10+
use stream_entity::StreamEntity;
11+
12+
#[derive(Serialize)]
13+
struct Status {
14+
capabilities: Vec<String>,
15+
}
16+
17+
#[derive(Deserialize, Debug)]
18+
#[serde(rename_all = "camelCase")]
19+
struct Config {
20+
/// The URL of an SSE endpoint created by the test harness.
21+
stream_url: String,
22+
/// The URL of a callback endpoint created by the test harness .
23+
callback_url: String,
24+
/// An optional integer specifying the initial reconnection delay parameter, in
25+
/// milliseconds. Not all SSE client implementations allow this to be configured, but the
26+
/// test harness will send a value anyway in an attempt to avoid having reconnection tests
27+
/// run unnecessarily slowly.
28+
initial_delay_ms: Option<u64>,
29+
/// A JSON object containing additional HTTP header names and string values. The SSE
30+
/// client should be configured to add these headers to its HTTP requests; the test harness
31+
/// will then verify that it receives those headers. The test harness will only set this
32+
/// property if the test service has the "headers" capability. Header names can be assumed
33+
/// to all be lowercase.
34+
headers: Option<HashMap<String, String>>,
35+
}
36+
37+
#[derive(Serialize, Debug)]
38+
#[serde(tag = "kind")]
39+
enum EventType {
40+
#[serde(rename = "event")]
41+
Event { event: Event },
42+
#[serde(rename = "error")]
43+
Error { error: String },
44+
}
45+
46+
impl From<es::Event> for EventType {
47+
fn from(event: es::Event) -> Self {
48+
Self::Event {
49+
event: Event {
50+
event_type: event.event_type.clone(),
51+
data: String::from_utf8(event.field("data").unwrap_or_default().to_vec()).unwrap(),
52+
id: String::from_utf8(event.field("id").unwrap_or_default().to_vec()).unwrap(),
53+
},
54+
}
55+
}
56+
}
57+
58+
#[derive(Serialize, Debug)]
59+
struct Event {
60+
#[serde(rename = "type")]
61+
event_type: String,
62+
data: String,
63+
id: String,
64+
}
65+
66+
async fn status() -> impl Responder {
67+
web::Json(Status {
68+
capabilities: vec![
69+
"comments".to_string(),
70+
"post".to_string(),
71+
"report".to_string(),
72+
"headers".to_string(),
73+
"last-event-id".to_string(),
74+
],
75+
})
76+
}
77+
78+
async fn stream(
79+
req: HttpRequest,
80+
config: web::Json<Config>,
81+
app_state: web::Data<AppState>,
82+
) -> HttpResponse {
83+
let mut stream_entity = match StreamEntity::new(config.into_inner()) {
84+
Ok(se) => se,
85+
Err(e) => return HttpResponse::InternalServerError().body(e),
86+
};
87+
88+
let mut counter = match app_state.counter.lock() {
89+
Ok(c) => c,
90+
Err(_) => return HttpResponse::InternalServerError().body("Unable to retrieve counter"),
91+
};
92+
93+
let mut entities = match app_state.stream_entities.lock() {
94+
Ok(h) => h,
95+
Err(_) => return HttpResponse::InternalServerError().body("Unable to retrieve handles"),
96+
};
97+
98+
let stream_resource = match req.url_for("stop_stream", &[counter.to_string()]) {
99+
Ok(sr) => sr,
100+
Err(_) => {
101+
return HttpResponse::InternalServerError()
102+
.body("Unable to generate stream response URL")
103+
}
104+
};
105+
106+
*counter += 1;
107+
stream_entity.start();
108+
entities.insert(*counter, stream_entity);
109+
110+
let mut response = HttpResponse::Ok();
111+
response.insert_header(("Location", stream_resource.to_string()));
112+
response.finish()
113+
}
114+
115+
async fn shutdown(stopper: web::Data<mpsc::Sender<()>>) -> HttpResponse {
116+
match stopper.send(()) {
117+
Ok(_) => HttpResponse::NoContent().finish(),
118+
Err(_) => HttpResponse::InternalServerError().body("Unable to send shutdown signal"),
119+
}
120+
}
121+
122+
async fn stop_stream(req: HttpRequest, app_state: web::Data<AppState>) -> HttpResponse {
123+
if let Some(stream_id) = req.match_info().get("id") {
124+
let stream_id: u32 = match stream_id.parse() {
125+
Ok(id) => id,
126+
Err(_) => return HttpResponse::BadRequest().body("Unable to parse stream id"),
127+
};
128+
129+
match app_state.stream_entities.lock() {
130+
Ok(mut entities) => {
131+
if let Some(mut entity) = entities.remove(&stream_id) {
132+
entity.stop();
133+
}
134+
}
135+
Err(_) => {
136+
return HttpResponse::InternalServerError().body("Unable to retrieve handles")
137+
}
138+
};
139+
140+
HttpResponse::NoContent().finish()
141+
} else {
142+
HttpResponse::BadRequest().body("No stream id was provided in the URL")
143+
}
144+
}
145+
146+
struct AppState {
147+
counter: Mutex<u32>,
148+
stream_entities: Mutex<HashMap<u32, StreamEntity>>,
149+
}
150+
151+
#[actix_web::main]
152+
async fn main() -> std::io::Result<()> {
153+
env_logger::init();
154+
155+
let (tx, rx) = mpsc::channel::<()>();
156+
157+
let state = web::Data::new(AppState {
158+
counter: Mutex::new(0),
159+
stream_entities: Mutex::new(HashMap::new()),
160+
});
161+
162+
let server = HttpServer::new(move || {
163+
App::new()
164+
.app_data(web::Data::new(tx.clone()))
165+
.app_data(state.clone())
166+
.route("/", web::get().to(status))
167+
.route("/", web::post().to(stream))
168+
.route("/", web::delete().to(shutdown))
169+
.service(
170+
web::resource("/stream/{id}")
171+
.name("stop_stream")
172+
.guard(guard::Delete())
173+
.to(stop_stream),
174+
)
175+
})
176+
.bind("127.0.0.1:8080")?
177+
.run();
178+
179+
let handle = server.handle();
180+
181+
thread::spawn(move || {
182+
// wait for shutdown signal
183+
if let Ok(()) = rx.recv() {
184+
executor::block_on(handle.stop(true))
185+
}
186+
});
187+
188+
// run server
189+
server.await
190+
}

0 commit comments

Comments
 (0)