Skip to content

Commit c77764a

Browse files
committed
added error handling for coordinator
1 parent da69616 commit c77764a

File tree

5 files changed

+376
-425
lines changed

5 files changed

+376
-425
lines changed

ui/frontend/websocketMiddleware.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,6 @@ export const websocketMiddleware =
8888

8989
socket.addEventListener('open', () => {
9090
store.dispatch(websocketConnected());
91-
// const params = new URLSearchParams(window.location.search);
92-
if (socket != null) {
93-
// if (params.has("pool")) {
94-
socket.send("use container pool");
95-
// } else {
96-
// socket.send("use no pool");
97-
// }
98-
}
9991

10092
wasConnected = true;
10193
});

ui/src/coordinator.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{collections::VecDeque, sync::{Arc, Mutex}};
22

3-
use crate::worker::{setup_worker, Container};
3+
use crate::worker::{spawn_worker, Container, self};
44

55
use lazy_static::lazy_static;
66

@@ -20,8 +20,12 @@ impl Coordinator {
2020
}
2121
}
2222

23-
pub fn allocate(&mut self) -> Container {
24-
self.free_containers.pop_front().unwrap_or_else(setup_worker)
23+
pub fn allocate(&mut self) -> Result<Container, worker::Error> {
24+
if let Some(container) = self.free_containers.pop_front() {
25+
Ok(container)
26+
} else {
27+
Ok(spawn_worker()?)
28+
}
2529
}
2630

2731
pub fn recycle(&mut self, container: Container) -> bool {

ui/src/server_axum/websocket.rs

Lines changed: 30 additions & 230 deletions
Original file line numberDiff line numberDiff line change
@@ -1,265 +1,65 @@
11
use crate::{
2+
coordinator::COORDINATOR,
23
metrics::{DURATION_WS, LIVE_WS},
3-
parse_channel, parse_crate_type, parse_edition, parse_mode,
4-
sandbox::{self, Sandbox},
5-
Error, ExecutionSnafu, Result, SandboxCreationSnafu, WebSocketTaskPanicSnafu, coordinator::{COORDINATOR, Coordinator},
64
};
75

86
use axum::extract::ws::{Message, WebSocket};
9-
use snafu::prelude::*;
10-
use std::{
11-
convert::{TryFrom, TryInto},
12-
time::Instant,
13-
};
14-
use tokio::{sync::mpsc, task::JoinSet};
15-
16-
#[derive(serde::Deserialize)]
17-
#[serde(tag = "type")]
18-
enum WSMessageRequest {
19-
#[serde(rename = "WS_EXECUTE_REQUEST")]
20-
WSExecuteRequest(WSExecuteRequest),
21-
}
22-
23-
#[derive(serde::Deserialize)]
24-
#[serde(rename_all = "camelCase")]
25-
struct WSExecuteRequest {
26-
channel: String,
27-
mode: String,
28-
edition: String,
29-
crate_type: String,
30-
tests: bool,
31-
code: String,
32-
backtrace: bool,
33-
extra: serde_json::Value,
34-
}
35-
36-
impl TryFrom<WSExecuteRequest> for (sandbox::ExecuteRequest, serde_json::Value) {
37-
type Error = Error;
38-
39-
fn try_from(value: WSExecuteRequest) -> Result<Self, Self::Error> {
40-
let WSExecuteRequest {
41-
channel,
42-
mode,
43-
edition,
44-
crate_type,
45-
tests,
46-
code,
47-
backtrace,
48-
extra,
49-
} = value;
50-
51-
let req = sandbox::ExecuteRequest {
52-
channel: parse_channel(&channel)?,
53-
mode: parse_mode(&mode)?,
54-
edition: parse_edition(&edition)?,
55-
crate_type: parse_crate_type(&crate_type)?,
56-
tests,
57-
backtrace,
58-
code,
59-
};
607

61-
Ok((req, extra))
62-
}
63-
}
64-
65-
#[derive(Debug, serde::Serialize)]
66-
#[serde(tag = "type")]
67-
enum WSMessageResponse {
68-
#[serde(rename = "WEBSOCKET_ERROR")]
69-
Error(WSError),
70-
#[serde(rename = "WS_EXECUTE_RESPONSE")]
71-
WSExecuteResponse(WSExecuteResponse),
72-
}
73-
74-
#[derive(Debug, serde::Serialize)]
75-
#[serde(rename_all = "camelCase")]
76-
struct WSError {
77-
error: String,
78-
}
79-
80-
#[derive(Debug, serde::Serialize)]
81-
#[serde(rename_all = "camelCase")]
82-
struct WSExecuteResponse {
83-
success: bool,
84-
stdout: String,
85-
stderr: String,
86-
extra: serde_json::Value,
87-
}
88-
89-
impl From<(sandbox::ExecuteResponse, serde_json::Value)> for WSExecuteResponse {
90-
fn from(value: (sandbox::ExecuteResponse, serde_json::Value)) -> Self {
91-
let sandbox::ExecuteResponse {
92-
success,
93-
stdout,
94-
stderr,
95-
} = value.0;
96-
let extra = value.1;
97-
98-
WSExecuteResponse {
99-
success,
100-
stdout,
101-
stderr,
102-
extra,
103-
}
104-
}
105-
}
8+
use std::time::Instant;
1069

10710
pub async fn handle(mut socket: WebSocket) {
10811
LIVE_WS.inc();
10912
let start = Instant::now();
11013

111-
let mut use_pool = false;
112-
if let Some(Ok(Message::Text(txt))) = socket.recv().await {
113-
dbg!(&txt);
114-
if txt == *"use container pool" {
115-
use_pool = true;
116-
}
117-
};
118-
if use_pool {
119-
// let mut coordinator = Coordinator::new();
120-
// Avoid task gets cancelled and make sure recycling happen.
121-
tokio::spawn(async move {
122-
let (ws_sender, mut ws_receiver) = COORDINATOR.lock().unwrap().allocate();
123-
// let (ws_sender, mut ws_receiver) = coordinator.allocate();
124-
loop {
125-
tokio::select! {
126-
client_msg = socket.recv() => {
127-
match client_msg {
128-
None => {
129-
// browser disconnected
130-
break;
131-
}
132-
Some(Ok(Message::Text(txt))) => ws_sender.send(txt).await.unwrap(),
133-
Some(Ok(_)) => {
134-
// unknown message type
135-
continue;
136-
}
137-
Some(Err(e)) => super::record_websocket_error(e.to_string()),
138-
}
139-
},
140-
worker_msg = ws_receiver.recv() => {
141-
if let Some(txt) = worker_msg {
142-
socket.send(Message::Text(txt)).await.unwrap();
143-
} else {
144-
// Worker message channel closed.
145-
// Maybe it panicked or got killed.
146-
// TODO: send a notification to user and clean the worker process.
147-
break;
148-
}
149-
}
150-
}
151-
}
152-
COORDINATOR.lock().unwrap().recycle((ws_sender, ws_receiver));
153-
// coordinator.recycle((ws_sender, ws_receiver));
154-
});
155-
} else {
156-
let (tx, mut rx) = mpsc::channel(3);
157-
let mut tasks = JoinSet::new();
158-
159-
// TODO: Implement some kind of timeout to shutdown running work?
160-
14+
let allocated = COORDINATOR
15+
.lock()
16+
.unwrap_or_else(|e| e.into_inner())
17+
.allocate();
18+
// TODO: notify the user that container allocation failed.
19+
if let Ok((ws_sender, mut ws_receiver)) = allocated {
16120
loop {
16221
tokio::select! {
163-
request = socket.recv() => {
164-
match request {
22+
client_msg = socket.recv() => {
23+
match client_msg {
16524
None => {
16625
// browser disconnected
16726
break;
16827
}
169-
Some(Ok(Message::Text(txt))) => handle_msg(txt, &tx, &mut tasks).await,
28+
Some(Ok(Message::Text(txt))) => {
29+
if ws_sender.send(txt).await.is_err() {
30+
// Worker exited.
31+
// TODO: reallocate a worker to this WebSocket.
32+
break;
33+
}
34+
}
17035
Some(Ok(_)) => {
17136
// unknown message type
17237
continue;
17338
}
17439
Some(Err(e)) => super::record_websocket_error(e.to_string()),
17540
}
17641
},
177-
resp = rx.recv() => {
178-
let resp = resp.expect("The rx should never close as we have a tx");
179-
let resp = resp.unwrap_or_else(error_to_response);
180-
let resp = response_to_message(resp);
181-
182-
if let Err(_) = socket.send(resp).await {
183-
// We can't send a response
184-
break;
185-
}
186-
},
187-
// We don't care if there are no running tasks
188-
Some(task) = tasks.join_next() => {
189-
let Err(error) = task else { continue };
190-
// The task was cancelled; no need to report
191-
let Ok(panic) = error.try_into_panic() else { continue };
192-
193-
let text = match panic.downcast::<String>() {
194-
Ok(text) => *text,
195-
Err(panic) => match panic.downcast::<&str>() {
196-
Ok(text) => text.to_string(),
197-
_ => "An unknown panic occurred".into(),
42+
worker_msg = ws_receiver.recv() => {
43+
if let Some(txt) = worker_msg {
44+
if (socket.send(Message::Text(txt)).await).is_err() {
45+
// We can't send a message to user.
46+
break;
19847
}
199-
};
200-
let error = WebSocketTaskPanicSnafu { text }.build();
201-
202-
let resp = error_to_response(error);
203-
let resp = response_to_message(resp);
204-
205-
if let Err(_) = socket.send(resp).await {
206-
// We can't send a response
48+
} else {
49+
// Worker exited.
50+
// TODO: reallocate a worker to this WebSocket.
20751
break;
20852
}
209-
},
53+
}
21054
}
21155
}
212-
213-
drop((tx, rx, socket));
214-
tasks.shutdown().await;
56+
COORDINATOR
57+
.lock()
58+
.unwrap_or_else(|e| e.into_inner())
59+
.recycle((ws_sender, ws_receiver));
21560
}
21661

21762
LIVE_WS.dec();
21863
let elapsed = start.elapsed();
21964
DURATION_WS.observe(elapsed.as_secs_f64());
22065
}
221-
222-
fn error_to_response(error: Error) -> WSMessageResponse {
223-
let error = error.to_string();
224-
WSMessageResponse::Error(WSError { error })
225-
}
226-
227-
fn response_to_message(response: WSMessageResponse) -> Message {
228-
const LAST_CHANCE_ERROR: &str =
229-
r#"{ "type": "WEBSOCKET_ERROR", "error": "Unable to serialize JSON" }"#;
230-
let resp = serde_json::to_string(&response).unwrap_or_else(|_| LAST_CHANCE_ERROR.into());
231-
Message::Text(resp)
232-
}
233-
234-
async fn handle_msg(
235-
txt: String,
236-
tx: &mpsc::Sender<Result<WSMessageResponse>>,
237-
tasks: &mut JoinSet<Result<()>>,
238-
) {
239-
use WSMessageRequest::*;
240-
241-
let msg = serde_json::from_str(&txt).context(crate::DeserializationSnafu);
242-
243-
match msg {
244-
Ok(WSExecuteRequest(req)) => {
245-
let tx = tx.clone();
246-
tasks.spawn(async move {
247-
let resp = handle_execute(req).await;
248-
tx.send(resp).await.ok(/* We don't care if the channel is closed */);
249-
Ok(())
250-
});
251-
}
252-
Err(e) => {
253-
let resp = Err(e);
254-
tx.send(resp).await.ok(/* We don't care if the channel is closed */);
255-
}
256-
}
257-
}
258-
259-
async fn handle_execute(req: WSExecuteRequest) -> Result<WSMessageResponse> {
260-
let sb = Sandbox::new().await.context(SandboxCreationSnafu)?;
261-
262-
let (req, extra) = req.try_into()?;
263-
let resp = sb.execute(&req).await.context(ExecutionSnafu)?;
264-
Ok(WSMessageResponse::WSExecuteResponse((resp, extra).into()))
265-
}

0 commit comments

Comments
 (0)