Skip to content

Commit f3cbcc9

Browse files
committed
Copy web server to branch & add Cargo dependencies
1 parent 1a62a96 commit f3cbcc9

File tree

3 files changed

+387
-0
lines changed

3 files changed

+387
-0
lines changed

Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ hyper-rustls = { version = "0.22.1", optional = true }
1919
log = "0.4.6"
2020
pin-project = "1.0.5"
2121
tokio = { version = "1.2.0", features = ["time"] }
22+
serde = { version = "1.0", features = ["derive"], optional = true }
23+
serde_json = { version = "1.0.39", optional = true }
24+
actix = { version = "0.12.0", optional = true }
25+
actix-web = { version = "4.0.0-beta.10", optional = true }
26+
reqwest = { version = "0.11.6", features = ["json"], optional = true }
27+
env_logger = { version = "0.7.1", optional = true }
2228

2329
[dev-dependencies]
2430
env_logger = "0.7.1"
@@ -29,6 +35,12 @@ tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] }
2935
[features]
3036
default = ["rustls"]
3137
rustls = ["hyper-rustls", "hyper/http2"]
38+
build-binary = ["serde", "serde_json", "actix", "actix-web", "reqwest", "env_logger"]
39+
40+
41+
[[bin]]
42+
name = "sse-test-api"
43+
required-features = ["build-binary", "rustls"]
3244

3345
[[example]]
3446
name = "tail"

src/bin/sse-test-api/main.rs

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
mod stream_entity;
2+
3+
use actix_web::{guard, web, App, HttpRequest, HttpResponse, HttpServer, Responder};
4+
use eventsource_client as es;
5+
use futures::executor;
6+
use serde::{self, Deserialize, Serialize};
7+
use std::collections::HashMap;
8+
use std::sync::{mpsc, Mutex};
9+
use std::thread;
10+
use stream_entity::StreamEntity;
11+
12+
#[derive(Serialize)]
13+
struct Status {
14+
capabilities: Vec<String>,
15+
}
16+
17+
#[derive(Deserialize, Debug)]
18+
#[serde(rename_all = "camelCase")]
19+
struct Config {
20+
/// The URL of an SSE endpoint created by the test harness.
21+
stream_url: String,
22+
/// The URL of a callback endpoint created by the test harness .
23+
callback_url: String,
24+
/// A string describing the current test, if desired for logging.
25+
tag: Option<String>,
26+
/// An optional integer specifying the initial reconnection delay parameter, in
27+
/// milliseconds. Not all SSE client implementations allow this to be configured, but the
28+
/// test harness will send a value anyway in an attempt to avoid having reconnection tests
29+
/// run unnecessarily slowly.
30+
initial_delay_ms: Option<u64>,
31+
/// An optional integer specifying the read timeout for the connection, in
32+
/// milliseconds.
33+
read_timeout_ms: Option<u64>,
34+
/// An optional string which should be sent as the Last-Event-Id header in the initial
35+
/// HTTP request. The test harness will only set this property if the test service has the
36+
/// "last-event-id" capability.
37+
last_event_id: Option<String>,
38+
/// A JSON object containing additional HTTP header names and string values. The SSE
39+
/// client should be configured to add these headers to its HTTP requests; the test harness
40+
/// will then verify that it receives those headers. The test harness will only set this
41+
/// property if the test service has the "headers" capability. Header names can be assumed
42+
/// to all be lowercase.
43+
headers: Option<HashMap<String, String>>,
44+
/// A string specifying an HTTP method to use instead of GET. The test harness will only
45+
/// set this property if the test service has the "post" or "report" capability.
46+
method: Option<String>,
47+
/// A string specifying data to be sent in the HTTP request body. The test harness will
48+
/// only set this property if the test service has the "post" or "report" capability.
49+
body: Option<String>,
50+
}
51+
52+
#[derive(Serialize, Debug)]
53+
#[serde(tag = "kind")]
54+
enum EventType {
55+
#[serde(rename = "event")]
56+
Event { event: Event },
57+
#[serde(rename = "comment")]
58+
Comment { comment: String },
59+
#[serde(rename = "error")]
60+
Error { error: String },
61+
}
62+
63+
impl From<es::SSE> for EventType {
64+
fn from(event: es::SSE) -> Self {
65+
match event {
66+
es::SSE::Event(evt) => Self::Event {
67+
event: Event {
68+
event_type: evt.event_type,
69+
data: String::from_utf8(evt.data.to_vec()).unwrap(),
70+
id: String::from_utf8(evt.id.to_vec()).unwrap(),
71+
},
72+
},
73+
es::SSE::Comment(comment) => Self::Comment {
74+
comment: String::from_utf8(comment).unwrap(),
75+
},
76+
}
77+
}
78+
}
79+
80+
#[derive(Serialize, Debug)]
81+
struct Event {
82+
#[serde(rename = "type")]
83+
event_type: String,
84+
data: String,
85+
id: String,
86+
}
87+
88+
async fn status() -> impl Responder {
89+
web::Json(Status {
90+
capabilities: vec![
91+
"comments".to_string(),
92+
"retry".to_string(),
93+
"post".to_string(),
94+
"report".to_string(),
95+
"read-timeout".to_string(),
96+
"headers".to_string(),
97+
"last-event-id".to_string(),
98+
],
99+
})
100+
}
101+
102+
async fn stream(
103+
req: HttpRequest,
104+
config: web::Json<Config>,
105+
app_state: web::Data<AppState>,
106+
) -> HttpResponse {
107+
let mut stream_entity = match StreamEntity::new(config.into_inner()) {
108+
Ok(se) => se,
109+
Err(e) => return HttpResponse::InternalServerError().body(e),
110+
};
111+
112+
let mut counter = match app_state.counter.lock() {
113+
Ok(c) => c,
114+
Err(_) => return HttpResponse::InternalServerError().body("Unable to retrieve counter"),
115+
};
116+
117+
let mut entities = match app_state.stream_entities.lock() {
118+
Ok(h) => h,
119+
Err(_) => return HttpResponse::InternalServerError().body("Unable to retrieve handles"),
120+
};
121+
122+
*counter += 1;
123+
stream_entity.start();
124+
entities.insert(*counter, stream_entity);
125+
126+
let stream_resource = match req.url_for("stop_stream", &[counter.to_string()]) {
127+
Ok(sr) => sr,
128+
Err(_) => {
129+
return HttpResponse::InternalServerError()
130+
.body("Unable to generate stream response URL")
131+
}
132+
};
133+
134+
let mut response = HttpResponse::Ok();
135+
response.insert_header(("Location", stream_resource.to_string()));
136+
response.finish()
137+
}
138+
139+
async fn shutdown(stopper: web::Data<mpsc::Sender<()>>) -> HttpResponse {
140+
match stopper.send(()) {
141+
Ok(_) => HttpResponse::NoContent().finish(),
142+
Err(_) => HttpResponse::InternalServerError().body("Unable to send shutdown signal"),
143+
}
144+
}
145+
146+
async fn stop_stream(req: HttpRequest, app_state: web::Data<AppState>) -> HttpResponse {
147+
if let Some(stream_id) = req.match_info().get("id") {
148+
let stream_id: u32 = match stream_id.parse() {
149+
Ok(id) => id,
150+
Err(_) => return HttpResponse::BadRequest().body("Unable to parse stream id"),
151+
};
152+
153+
match app_state.stream_entities.lock() {
154+
Ok(mut entities) => {
155+
if let Some(mut entity) = entities.remove(&stream_id) {
156+
entity.stop();
157+
}
158+
}
159+
Err(_) => {
160+
return HttpResponse::InternalServerError().body("Unable to retrieve handles")
161+
}
162+
};
163+
164+
HttpResponse::NoContent().finish()
165+
} else {
166+
HttpResponse::BadRequest().body("No stream id was provided in the URL")
167+
}
168+
}
169+
170+
struct AppState {
171+
counter: Mutex<u32>,
172+
stream_entities: Mutex<HashMap<u32, StreamEntity>>,
173+
}
174+
175+
#[actix_web::main]
176+
async fn main() -> std::io::Result<()> {
177+
env_logger::init();
178+
179+
let (tx, rx) = mpsc::channel::<()>();
180+
181+
let state = web::Data::new(AppState {
182+
counter: Mutex::new(0),
183+
stream_entities: Mutex::new(HashMap::new()),
184+
});
185+
186+
let server = HttpServer::new(move || {
187+
App::new()
188+
.app_data(web::Data::new(tx.clone()))
189+
.app_data(state.clone())
190+
.route("/", web::get().to(status))
191+
.route("/", web::post().to(stream))
192+
.route("/", web::delete().to(shutdown))
193+
.service(
194+
web::resource("/stream/{id}")
195+
.name("stop_stream")
196+
.guard(guard::Delete())
197+
.to(stop_stream),
198+
)
199+
})
200+
.bind("127.0.0.1:8080")?
201+
.run();
202+
203+
let handle = server.handle();
204+
205+
thread::spawn(move || {
206+
// wait for shutdown signal
207+
if let Ok(()) = rx.recv() {
208+
executor::block_on(handle.stop(true))
209+
}
210+
});
211+
212+
// run server
213+
server.await
214+
}

0 commit comments

Comments
 (0)