Skip to content

Commit 1746619

Browse files
committed
add naive worker and coordinator; output filename not corrected
1 parent 764833e commit 1746619

File tree

11 files changed

+733
-0
lines changed

11 files changed

+733
-0
lines changed

playground-worker/Cargo.lock

Lines changed: 98 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

playground-worker/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "playground_worker"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
worker_message = { path = "../worker-message" }
10+
bincode = "1.3"
11+
log = "0.4"

playground-worker/src/lib.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use bincode::deserialize_from;
2+
use std::collections::HashMap;
3+
use std::fs;
4+
use std::io::{Read, Write};
5+
use std::os::unix::net::{UnixListener, UnixStream};
6+
use std::path::Path;
7+
use std::process::Child;
8+
use std::process::Command;
9+
use std::process::Stdio;
10+
use std::str::from_utf8;
11+
use std::{io, thread};
12+
use worker_message::{ExecuteOutput, Pid, Request, Response};
13+
14+
pub fn listen(request_socket: &str, stdio_socket: &str) {
15+
let request_listener = UnixListener::bind(request_socket).unwrap();
16+
17+
let stdio_listener = UnixListener::bind(stdio_socket).unwrap();
18+
19+
let (request_stream, _) = request_listener.accept().unwrap();
20+
let (stdio_stream, _) = stdio_listener.accept().unwrap();
21+
handle_socket(request_stream, stdio_stream);
22+
}
23+
24+
fn handle_socket(request_stream: UnixStream, stdio_stream: UnixStream) {
25+
let mut children = HashMap::<Pid, Child>::new();
26+
loop {
27+
let req: Request = deserialize_from(&request_stream).unwrap();
28+
match req {
29+
Request::WriteFile { path, content } => {
30+
let path = Path::new(&path);
31+
let parent_dir = path.parent().unwrap();
32+
fs::create_dir_all(parent_dir).unwrap();
33+
let result = fs::write(path, content);
34+
let response = Response::WriteResult(result.map_err(|e| e.to_string()));
35+
bincode::serialize_into(&request_stream, &response).unwrap();
36+
}
37+
Request::ReadFile { path } => {
38+
let result = fs::read(path);
39+
let response = Response::ReadResult(result.map_err(|e| e.to_string()));
40+
bincode::serialize_into(&request_stream, &response).unwrap();
41+
}
42+
Request::ExecuteCommand {
43+
cmd,
44+
args,
45+
envs,
46+
cwd,
47+
} => {
48+
let result = Command::new(cmd)
49+
.args(args)
50+
.envs(envs)
51+
.current_dir(cwd)
52+
.output();
53+
let response = Response::ExecuteResult(
54+
result
55+
.map(|output| ExecuteOutput {
56+
status: output.status.code(),
57+
stderr: output.stderr,
58+
stdout: output.stdout,
59+
})
60+
.map_err(|e| e.to_string()),
61+
);
62+
bincode::serialize_into(&request_stream, &response).unwrap();
63+
}
64+
Request::StreamCommand {
65+
cmd,
66+
args,
67+
envs,
68+
cwd,
69+
} => {
70+
let result = Command::new(cmd)
71+
.args(args)
72+
.envs(envs)
73+
.current_dir(cwd)
74+
.stdin(Stdio::piped())
75+
.stdout(Stdio::piped())
76+
.stderr(Stdio::piped())
77+
.spawn();
78+
let result = match result {
79+
Ok(mut child) => {
80+
let pid = child.id();
81+
stream_stdio(&stdio_stream, &mut child);
82+
children.insert(pid, child);
83+
Ok(pid)
84+
}
85+
Err(err) => Err(err.to_string()),
86+
};
87+
let response = Response::StreamResult(result);
88+
bincode::serialize_into(&request_stream, &response).unwrap();
89+
}
90+
Request::KillProcess { pid } => {
91+
let result = match children.remove(&pid) {
92+
Some(mut child) => match child.kill() {
93+
Ok(()) => Ok(()),
94+
Err(err) => {
95+
if err.kind() == io::ErrorKind::InvalidInput {
96+
Ok(())
97+
} else {
98+
Err(err.to_string())
99+
}
100+
}
101+
},
102+
None => Ok(()),
103+
};
104+
let response = Response::KillResult(result);
105+
bincode::serialize_into(&request_stream, &response).unwrap();
106+
}
107+
}
108+
}
109+
}
110+
111+
fn stream_stdio(stream: &UnixStream, child: &mut Child) {
112+
let mut stdin = child.stdin.take().unwrap();
113+
let mut stdout = child.stdout.take().unwrap();
114+
let mut stderr = child.stderr.take().unwrap();
115+
116+
let mut stream_in = stream.try_clone().unwrap();
117+
let mut stream_out = stream.try_clone().unwrap();
118+
let mut stream_err = stream.try_clone().unwrap();
119+
thread::spawn(move || {
120+
let mut buffer = [0; 1024];
121+
loop {
122+
let n = stream_in.read(&mut buffer).unwrap();
123+
if n != 0 {
124+
stdin.write_all(&buffer[..n]).unwrap();
125+
println!("To stdin {} - {}", n, from_utf8(&buffer[..n]).unwrap());
126+
}
127+
}
128+
});
129+
thread::spawn(move || {
130+
let mut buffer = [0; 1024];
131+
loop {
132+
let n = stdout.read(&mut buffer).unwrap();
133+
if n != 0 {
134+
stream_out.write_all(&buffer[..n]).unwrap();
135+
println!("From stdout {} - {}", n, from_utf8(&buffer[..n]).unwrap());
136+
}
137+
}
138+
});
139+
thread::spawn(move || {
140+
let mut buffer = [0; 1024];
141+
loop {
142+
let n = stderr.read(&mut buffer).unwrap();
143+
if n != 0 {
144+
stream_err.write_all(&buffer[..n]).unwrap();
145+
println!("From stderr {} - {}", n, from_utf8(&buffer[..n]).unwrap());
146+
}
147+
}
148+
});
149+
}

playground-worker/src/main.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
use playground_worker::listen;
2+
3+
4+
fn main() {
5+
let request_socket_path = "/tmp/request_sock";
6+
let stdio_socket_path = "/tmp/stdio_sock";
7+
listen(request_socket_path, stdio_socket_path);
8+
}
9+

ui/Cargo.lock

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ui/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,6 @@ tokio = { version = "1.9", features = ["macros", "time", "process", "rt-multi-th
3030
tower-http = { version = "0.4", features = ["cors", "fs", "set-header", "trace"] }
3131
tracing = "0.1.37"
3232
tracing-subscriber = "0.3.16"
33+
worker_message = { path = "../worker-message" }
34+
playground_worker = { path = "../playground-worker" }
35+
bincode = "1.3"

0 commit comments

Comments
 (0)