Skip to content

Commit 91d3507

Browse files
committed
passed coordinator tests with Docker
1 parent d85b973 commit 91d3507

File tree

3 files changed

+21
-21
lines changed

3 files changed

+21
-21
lines changed

ui/src/coordinator.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,29 @@ use crate::{parse_channel, parse_crate_type, parse_edition, parse_mode, parse_ta
1111

1212
use worker_message::{CoordinatorMessage, Job, JobReport, WorkerMessage};
1313

14+
use lazy_static::lazy_static;
1415
use snafu::prelude::*;
1516
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
1617
use tokio::process::{ChildStdin, ChildStdout, Command};
1718
use tokio::sync::{mpsc, oneshot};
1819
use tokio::task::{JoinHandle, JoinSet};
19-
use lazy_static::lazy_static;
2020

2121
pub type Container = (mpsc::Sender<String>, mpsc::Receiver<String>);
2222

2323
lazy_static! {
24-
pub(crate) static ref COORDINATOR: Arc<Mutex<Coordinator>> = Arc::new(Mutex::new(Coordinator::new()));
24+
pub(crate) static ref COORDINATOR: Arc<Mutex<Coordinator>> =
25+
Arc::new(Mutex::new(Coordinator::new()));
2526
}
2627

2728
#[derive(Debug)]
2829
pub struct Coordinator {
29-
free_containers: VecDeque<Container>
30+
free_containers: VecDeque<Container>,
3031
}
3132

3233
impl Coordinator {
3334
pub fn new() -> Self {
3435
Coordinator {
35-
free_containers: VecDeque::new()
36+
free_containers: VecDeque::new(),
3637
}
3738
}
3839

@@ -47,7 +48,7 @@ impl Coordinator {
4748
pub fn recycle(&mut self, container: Container) -> bool {
4849
// Seems not rigorous.
4950
let closed = container.0.is_closed();
50-
if !closed {
51+
if !closed {
5152
self.free_containers.push_back(container);
5253
}
5354

@@ -56,7 +57,6 @@ impl Coordinator {
5657
}
5758
}
5859

59-
6060
#[derive(Debug, Snafu)]
6161
pub enum Error {
6262
#[snafu(display("Reached system process limit"))]
@@ -690,16 +690,16 @@ fn basic_secure_docker_command() -> Command {
690690
)
691691
}
692692

693-
694693
fn run_worker_in_background() -> Result<(ChildStdin, ChildStdout)> {
695694
// For local development.
696-
// const WORKER_FILEPATH: &str = "../worker-message/target/debug/worker";
695+
// const WORKER_FILEPATH: &str = "../worker-message/target/release/worker";
697696

698697
// No need to track Child.
699698
// We know whether it exits via channels.
700699
// let mut child = Command::new(WORKER_FILEPATH)
701700
let mut child = basic_secure_docker_command()
702701
.arg("-i")
702+
.args(["-a", "stdin", "-a", "stdout", "-a", "stderr"])
703703
.arg("adwinw/rust-playground-worker")
704704
.stdin(Stdio::piped())
705705
.stdout(Stdio::piped())
@@ -836,7 +836,7 @@ async fn pair_websocket_worker(
836836
WorkerMessage::StdoutPacket(packet) => {
837837
let Ok(_) = ws_sender.send(
838838
serde_json::to_string(&PlaygroundMessage::Stdout {packet}).context(PlaygroundMessageSerializationSnafu)?).await
839-
.context(UnableToSendPlaygroundMessageToWebSocketSnafu)
839+
.context(UnableToSendPlaygroundMessageToWebSocketSnafu)
840840
else {
841841
// TODO: log the failure.
842842
continue;
@@ -845,7 +845,7 @@ async fn pair_websocket_worker(
845845
WorkerMessage::StderrPacket(packet) => {
846846
let Ok(_) = ws_sender.send(
847847
serde_json::to_string(&PlaygroundMessage::Stderr {packet}).context(PlaygroundMessageSerializationSnafu)?).await
848-
.context(UnableToSendPlaygroundMessageToWebSocketSnafu)
848+
.context(UnableToSendPlaygroundMessageToWebSocketSnafu)
849849
else {
850850
// TODO: log the failure.
851851
continue;
@@ -854,7 +854,7 @@ async fn pair_websocket_worker(
854854
WorkerMessage::StreamFailure => {
855855
let Ok(_) = ws_sender.send(
856856
serde_json::to_string(&PlaygroundMessage::StreamFailure).context(PlaygroundMessageSerializationSnafu)?).await
857-
.context(UnableToSendPlaygroundMessageToWebSocketSnafu)
857+
.context(UnableToSendPlaygroundMessageToWebSocketSnafu)
858858
else {
859859
// TODO: log the failure.
860860
continue;
@@ -896,12 +896,11 @@ pub fn spawn_worker() -> Result<Container> {
896896
#[cfg(test)]
897897
mod tests {
898898

899-
use crate::coordinator::{PlaygroundMessage, WSMessage, WSRequest, WSResponse, spawn_worker};
899+
use crate::coordinator::{spawn_worker, PlaygroundMessage, WSMessage, WSRequest, WSResponse};
900900
use std::sync::atomic::AtomicU64;
901901
use std::sync::Arc;
902902
use std::time::Duration;
903903

904-
905904
use serde_json::json;
906905
use tokio::time::error::Elapsed;
907906

@@ -1023,7 +1022,7 @@ mod tests {
10231022
let mut client = MockClient::new();
10241023
let compile_resp = check_websocket_request_response(
10251024
WSRequest::Compile(client.new_compile_request()),
1026-
5000,
1025+
15000,
10271026
)
10281027
.await
10291028
.expect("Failed to receive response from coordinator in time");
@@ -1038,7 +1037,7 @@ mod tests {
10381037
async fn format_request() {
10391038
let mut client = MockClient::new();
10401039
let compile_resp =
1041-
check_websocket_request_response(WSRequest::Format(client.new_format_request()), 5000)
1040+
check_websocket_request_response(WSRequest::Format(client.new_format_request()), 15000)
10421041
.await
10431042
.expect("Failed to receive response from coordinator in time");
10441043
if let WSResponse::Format(resp) = compile_resp {
@@ -1064,7 +1063,7 @@ mod tests {
10641063
.expect("Failed to send request to coordinator");
10651064

10661065
let user_input = "Hello Playground!\n";
1067-
tokio::time::timeout(Duration::from_millis(5000), async move {
1066+
tokio::time::timeout(Duration::from_millis(15000), async move {
10681067
// Check execute response and send user input.
10691068
if let Some(msg) = client_receiver.recv().await {
10701069
let playground_msg = serde_json::from_str(&msg)
@@ -1091,7 +1090,7 @@ mod tests {
10911090
match playground_msg {
10921091
PlaygroundMessage::Stdout { packet } => {
10931092
assert!(
1094-
packet == user_input,
1093+
dbg!(packet) == dbg!(user_input),
10951094
"Running program has incorrect standard output."
10961095
);
10971096
break
@@ -1124,7 +1123,7 @@ mod tests {
11241123
)
11251124
.await
11261125
.expect("Failed to send request to coordinator");
1127-
tokio::time::timeout(Duration::from_millis(5000), async move {
1126+
tokio::time::timeout(Duration::from_millis(15000), async move {
11281127
// Check format response and send compilation request.
11291128
if let Some(msg) = client_receiver.recv().await {
11301129
let playground_msg = serde_json::from_str(&msg)
@@ -1190,7 +1189,7 @@ mod tests {
11901189
.await
11911190
.expect("Failed to send request to coordinator");
11921191
let latest_seqno = 1;
1193-
tokio::time::timeout(Duration::from_millis(5000), async move {
1192+
tokio::time::timeout(Duration::from_millis(15000), async move {
11941193
loop {
11951194
let server_msg = client_receiver.recv().await;
11961195
match server_msg {
@@ -1248,7 +1247,7 @@ mod tests {
12481247
.await
12491248
.expect("Failed to send request to coordinator");
12501249
let latest_seqno = 1;
1251-
tokio::time::timeout(Duration::from_millis(5000), async move {
1250+
tokio::time::timeout(Duration::from_millis(15000), async move {
12521251
loop {
12531252
let server_msg = client_receiver.recv().await;
12541253
match server_msg {

worker-message/.dockerignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
target/

worker-message/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ async fn stream_stdio(
310310
let n = stderr_buf.read_line(&mut buffer).await?;
311311
if n != 0 {
312312
coordinator_tx_err
313-
.send(WorkerMessage::StdoutPacket(buffer))
313+
.send(WorkerMessage::StderrPacket(buffer))
314314
.await?;
315315
} else {
316316
break;

0 commit comments

Comments
 (0)