Skip to content

Commit 3cef47a

Browse files
committed
still half work on switching to async
1 parent 0361f02 commit 3cef47a

File tree

6 files changed

+364
-126
lines changed

6 files changed

+364
-126
lines changed

ui/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ui/src/coordinator.rs

Lines changed: 247 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::collections::HashMap;
2+
use std::convert::TryFrom;
23
use std::io::{Read, Write};
3-
use std::os::unix::net::UnixStream;
44

55
use bincode::{deserialize_from, serialize_into};
66
use log::info;
77

8+
use crate::{Error, parse_channel, parse_mode, parse_edition, parse_crate_type, parse_target};
89
use crate::sandbox::{
9-
Channel, CompileRequest, CompileTarget, CrateType, Edition, ExecuteRequest, Mode,
10+
Channel, CompileRequest, CompileTarget, CrateType, Edition, ExecuteRequest, Mode, self,
1011
};
1112

1213
type JobBatch = Vec<worker_message::Request>;
@@ -138,42 +139,135 @@ fn execute_request_to_batch(req: ExecuteRequest) -> JobBatch {
138139
batch
139140
}
140141

141-
fn process_batch(stream: &mut UnixStream, batch: JobBatch) -> Option<worker_message::Response> {
142-
let mut response = None;
143-
for job in batch {
144-
let res = work(stream, job);
145-
dbg!(&res);
146-
if !res.is_ok() {
147-
return Some(res);
148-
} else {
149-
response = Some(res);
150-
}
142+
// fn process_batch(stream: &mut UnixStream, batch: JobBatch) -> Option<worker_message::Response> {
143+
// let mut response = None;
144+
// for job in batch {
145+
// let res = work(stream, job);
146+
// dbg!(&res);
147+
// if !res.is_ok() {
148+
// return Some(res);
149+
// } else {
150+
// response = Some(res);
151+
// }
152+
// }
153+
// response
154+
// }
155+
156+
// fn work(stream: &mut UnixStream, req: worker_message::Request) -> worker_message::Response {
157+
// serialize_into(&mut *stream, &req).unwrap();
158+
// deserialize_from(&mut *stream).unwrap()
159+
// }
160+
//
161+
#[derive(serde::Deserialize)]
162+
#[serde(rename_all = "camelCase")]
163+
struct WSExecuteRequest {
164+
channel: String,
165+
mode: String,
166+
edition: String,
167+
crate_type: String,
168+
tests: bool,
169+
code: String,
170+
backtrace: bool,
171+
extra: serde_json::Value,
172+
}
173+
174+
impl TryFrom<WSExecuteRequest> for (sandbox::ExecuteRequest, serde_json::Value) {
175+
type Error = Error;
176+
177+
fn try_from(value: WSExecuteRequest) -> Result<Self, Self::Error> {
178+
let WSExecuteRequest {
179+
channel,
180+
mode,
181+
edition,
182+
crate_type,
183+
tests,
184+
code,
185+
backtrace,
186+
extra,
187+
} = value;
188+
189+
let req = sandbox::ExecuteRequest {
190+
channel: parse_channel(&channel)?,
191+
mode: parse_mode(&mode)?,
192+
edition: parse_edition(&edition)?,
193+
crate_type: parse_crate_type(&crate_type)?,
194+
tests,
195+
backtrace,
196+
code,
197+
};
198+
199+
Ok((req, extra))
151200
}
152-
response
153201
}
154202

155-
fn work(stream: &mut UnixStream, req: worker_message::Request) -> worker_message::Response {
156-
serialize_into(&mut *stream, &req).unwrap();
157-
deserialize_from(&mut *stream).unwrap()
203+
#[derive(serde::Serialize, serde::Deserialize)]
204+
#[serde(rename_all = "camelCase")]
205+
struct WSCompileRequest {
206+
target: String,
207+
channel: String,
208+
mode: String,
209+
edition: String,
210+
crate_type: String,
211+
tests: bool,
212+
code: String,
213+
backtrace: bool,
214+
extra: serde_json::Value,
215+
}
216+
217+
impl TryFrom<WSCompileRequest> for (sandbox::CompileRequest, serde_json::Value) {
218+
type Error = Error;
219+
220+
fn try_from(value: WSCompileRequest) -> Result<Self, Self::Error> {
221+
let WSCompileRequest {
222+
target,
223+
channel,
224+
mode,
225+
edition,
226+
crate_type,
227+
tests,
228+
code,
229+
backtrace,
230+
extra,
231+
} = value;
232+
233+
let req = sandbox::CompileRequest {
234+
target: parse_target(&target)?,
235+
channel: parse_channel(&channel)?,
236+
mode: parse_mode(&mode)?,
237+
edition: parse_edition(&edition)?,
238+
crate_type: parse_crate_type(&crate_type)?,
239+
tests,
240+
backtrace,
241+
code,
242+
};
243+
244+
Ok((req, extra))
245+
}
158246
}
159247

160248
#[cfg(test)]
161249
mod tests {
162250
use std::io::{Read, Write};
163-
use std::path::Path;
251+
use std::process::Stdio;
164252
use std::str::from_utf8;
165253
use std::time::SystemTime;
166-
use std::{fs, os::unix::net::UnixStream, thread, time::Duration};
254+
use std::{fs, thread, time::Duration};
167255

256+
const WORKER_FILEPATH: &str = "../worker_message/target/debug/worker";
168257
use crate::{
169258
coordinator::{compile_request_to_batch, execute_request_to_batch},
170259
sandbox::{
171260
Channel, CompileRequest, CompileTarget, CrateType, Edition, ExecuteRequest, Mode,
172261
},
173262
};
174-
use worker_message::worker;
263+
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
264+
use tokio::process::{ChildStdin, ChildStdout, Command};
265+
use tokio::sync::mpsc;
266+
use worker_message::{worker, CoordinatorMessage, WorkerMessage};
267+
268+
use super::WSCompileRequest;
175269

176-
use super::process_batch;
270+
// use super::process_batch;
177271
fn new_execute_request() -> ExecuteRequest {
178272
ExecuteRequest {
179273
channel: Channel::Stable,
@@ -186,85 +280,155 @@ mod tests {
186280
}
187281
}
188282

189-
fn new_compile_request() -> CompileRequest {
190-
CompileRequest {
191-
target: CompileTarget::Mir,
192-
channel: Channel::Stable,
193-
crate_type: CrateType::Binary,
194-
mode: Mode::Release,
195-
edition: Some(Edition::Rust2021),
283+
fn new_compile_request() -> WSCompileRequest {
284+
WSCompileRequest {
285+
target: "mir".to_owned(),
286+
channel: "stable".to_owned(),
287+
crate_type: "binary".to_owned(),
288+
mode: "release".to_owned(),
289+
edition: "2021".to_owned(),
196290
tests: false,
197291
backtrace: true,
198292
code: r#"fn main() { println!("Hello World!"); }"#.to_owned(),
293+
extra: serde_json::json!({})
199294
}
200295
}
201296

202-
fn generate_temporary_socket() -> String {
203-
let timestamp = SystemTime::now()
204-
.duration_since(SystemTime::UNIX_EPOCH)
205-
.unwrap()
206-
.as_nanos()
207-
.to_string();
208-
format!("/tmp/playground_test_socket_{timestamp}")
297+
async fn run_worker_in_background() -> (ChildStdin, ChildStdout) {
298+
let mut child = Command::new(WORKER_FILEPATH)
299+
.stdin(Stdio::piped())
300+
.stdout(Stdio::piped())
301+
.spawn()
302+
.unwrap();
303+
(child.stdin.take().unwrap(), child.stdout.take().unwrap())
209304
}
210305

211-
fn run_worker_in_background() -> (UnixStream, UnixStream) {
212-
let test_request_socket = generate_temporary_socket();
213-
let test_stdio_socket = generate_temporary_socket();
214-
if Path::new(&test_request_socket).exists() {
215-
fs::remove_file(&test_request_socket).unwrap();
216-
}
217-
if Path::new(&test_stdio_socket).exists() {
218-
fs::remove_file(&test_stdio_socket).unwrap();
219-
}
220-
{
221-
let test_request_socket = test_request_socket.clone();
222-
let test_stdio_socket = test_stdio_socket.clone();
223-
thread::spawn(move || {
224-
worker::listen(&test_request_socket, &test_stdio_socket);
225-
});
226-
}
227-
// Let listener get ready.
228-
thread::sleep(Duration::from_secs(1));
229-
let request_socket = UnixStream::connect(test_request_socket).unwrap();
230-
let stdio_socket = UnixStream::connect(test_stdio_socket).unwrap();
231-
(request_socket, stdio_socket)
306+
async fn spawn_io_queue(
307+
mut sender: ChildStdin,
308+
mut receiver: ChildStdout,
309+
) -> (
310+
mpsc::Sender<CoordinatorMessage>,
311+
mpsc::Receiver<WorkerMessage>,
312+
) {
313+
let (tx, worker_rx) = mpsc::channel(32);
314+
tokio::spawn(async move {
315+
let mut buffer = Vec::new();
316+
loop {
317+
let payload_len = receiver.read_u64().await.unwrap();
318+
buffer.resize(payload_len as usize, 0);
319+
receiver.read_exact(&mut buffer).await.unwrap();
320+
let worker_msg: WorkerMessage = bincode::deserialize(&buffer).unwrap();
321+
tx.send(worker_msg).await.unwrap();
322+
}
323+
});
324+
let (worker_tx, mut rx) = mpsc::channel(32);
325+
tokio::spawn(async move {
326+
loop {
327+
let worker_msg = rx.recv().await.unwrap();
328+
let encoded = bincode::serialize(&worker_msg).unwrap();
329+
sender.write_u64(encoded.len() as u64).await.unwrap();
330+
sender.write_all(&encoded).await.unwrap();
331+
}
332+
});
333+
(worker_tx, worker_rx)
232334
}
233335

234-
#[test]
235-
fn compile_request() {
236-
let (mut request_socket, stdio_socket) = run_worker_in_background();
237-
238-
let request = new_compile_request();
239-
let batch = compile_request_to_batch(request);
240-
let result = process_batch(&mut request_socket, batch).unwrap();
241-
println!("{result}");
242-
assert!(result.is_ok());
336+
async fn make_worker_channel() -> (
337+
mpsc::Sender<CoordinatorMessage>,
338+
mpsc::Receiver<WorkerMessage>,
339+
) {
340+
let (stdin, stdout) = run_worker_in_background().await;
341+
spawn_io_queue(stdin, stdout).await
243342
}
244343

245-
#[test]
246-
fn execute_request() {
247-
let (mut request_socket, mut stdio_socket) = run_worker_in_background();
344+
async fn handle_websocket_message(
345+
ws_msg: String,
346+
worker_sender: &mpsc::Sender<CoordinatorMessage>,
347+
) {
348+
}
349+
async fn handle_worker_message(msg: WorkerMessage, websocket_sender: &mpsc::Sender<String>) {}
248350

249-
let request = new_execute_request();
250-
let batch = execute_request_to_batch(request);
251-
let result = process_batch(&mut request_socket, batch).unwrap();
252-
let handle = thread::spawn(move || {
253-
let mut buffer = vec![0; 1024];
254-
loop {
255-
let n = stdio_socket.read(&mut buffer).unwrap();
256-
if n != 0 {
257-
println!("From stdio {} - {}", n, from_utf8(&buffer[..n]).unwrap());
351+
async fn pair_websocket_worker(ws_sender: mpsc::Sender<String>, mut ws_receiver: mpsc::Receiver<String>, worker_sender: mpsc::Sender<CoordinatorMessage>, mut worker_receiver: mpsc::Receiver<WorkerMessage>) {
352+
loop {
353+
tokio::select! {
354+
ws_request = ws_receiver.recv() => {
355+
match ws_request {
356+
None => {
357+
break;
358+
}
359+
Some(txt) => {
360+
handle_websocket_message(txt, &worker_sender).await;
361+
}
362+
}
363+
},
364+
worker_msg = worker_receiver.recv() => {
365+
match worker_msg {
366+
None => {
367+
break;
368+
}
369+
Some(msg) => {
370+
handle_worker_message(msg, &ws_sender).await;
371+
}
372+
}
258373
}
259374
}
260-
});
261-
println!("{result}");
262-
assert!(result.is_ok());
263-
// handle.join().unwrap();
264-
thread::sleep(Duration::from_secs(1));
375+
}
265376
}
266377

267-
#[test]
268-
fn consecutive_compile_execute() {
378+
#[tokio::test]
379+
async fn compile_request() {
380+
let (worker_sender, worker_receiver) = make_worker_channel().await;
381+
let (tx, ws_receiver) = mpsc::channel(32);
382+
let (ws_sender, mut rx) = mpsc::channel(32);
383+
tokio::spawn(async move {
384+
pair_websocket_worker(ws_sender, ws_receiver, worker_sender, worker_receiver).await;
385+
});
386+
387+
// Simulate user input.
388+
tx.send(serde_json::to_string(&new_compile_request()).unwrap()).await.unwrap();
389+
loop {
390+
let server_msg = rx.recv().await;
391+
match server_msg {
392+
None => {
393+
panic!("server disconnected");
394+
}
395+
Some(msg) => {
396+
}
397+
}
398+
}
399+
400+
401+
402+
// let request = new_compile_request();
403+
// let batch = compile_request_to_batch(request);
404+
// let result = process_batch(&mut request_socket, batch).unwrap();
405+
406+
// println!("{result}");
407+
// assert!(result.is_ok());
269408
}
409+
410+
// #[test]
411+
// fn execute_request() {
412+
// let (mut request_socket, mut stdio_socket) = run_worker_in_background();
413+
414+
// let request = new_execute_request();
415+
// let batch = execute_request_to_batch(request);
416+
// let result = process_batch(&mut request_socket, batch).unwrap();
417+
// let handle = thread::spawn(move || {
418+
// let mut buffer = vec![0; 1024];
419+
// loop {
420+
// let n = stdio_socket.read(&mut buffer).unwrap();
421+
// if n != 0 {
422+
// println!("From stdio {} - {}", n, from_utf8(&buffer[..n]).unwrap());
423+
// }
424+
// }
425+
// });
426+
// println!("{result}");
427+
// assert!(result.is_ok());
428+
// // handle.join().unwrap();
429+
// thread::sleep(Duration::from_secs(1));
430+
// }
431+
432+
// #[test]
433+
// fn consecutive_compile_execute() {}
270434
}

0 commit comments

Comments
 (0)