Skip to content

Commit ffabc2d

Browse files
committed
add error handling for worker
1 parent 32f86c4 commit ffabc2d

File tree

5 files changed

+115
-167
lines changed

5 files changed

+115
-167
lines changed

ui/Cargo.lock

Lines changed: 9 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

worker-message/Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

worker-message/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ tracing = "0.1"
1818
tracing-subscriber = "0.3"
1919
tracing-appender = "0.2"
2020
tempdir = "0.3"
21+
anyhow = "1.0.69"
2122

worker-message/src/message.rs

Lines changed: 25 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
use serde::{Deserialize, Serialize};
2-
use std::{collections::HashMap, fmt};
2+
use std::collections::HashMap;
33

44
pub type Pid = u32;
55
pub type JobId = u128;
66
pub type Path = String;
77
pub type ResponseError = String;
8-
pub type Result<T> = std::result::Result<T, ResponseError>;
9-
108

119
#[derive(Debug, Serialize, Deserialize)]
1210
pub struct Job {
13-
pub reqs: Vec<Request>
11+
pub reqs: Vec<Request>,
1412
}
1513

1614
impl Job {
@@ -25,20 +23,19 @@ impl Job {
2523

2624
#[derive(Debug, Serialize, Deserialize)]
2725
pub struct JobReport {
28-
pub resps: Vec<Response>
26+
pub resps: Vec<ResponseResult>,
2927
}
3028

3129
impl JobReport {
3230
pub fn is_ok(&self) -> bool {
3331
if let Some(resp) = self.resps.last() {
34-
resp.is_ok()
32+
resp.0.is_ok()
3533
} else {
3634
false
3735
}
3836
}
3937
}
4038

41-
4239
#[derive(Debug, Serialize, Deserialize)]
4340
pub enum CoordinatorMessage {
4441
Request(JobId, Job),
@@ -50,53 +47,54 @@ pub enum WorkerMessage {
5047
Response(JobId, JobReport),
5148
StdoutPacket(String),
5249
StderrPacket(String),
50+
StreamFailure,
5351
}
5452

5553
#[derive(Debug, Serialize, Deserialize)]
5654
pub struct WriteFileRequest {
5755
pub path: Path,
58-
pub content: Vec<u8>
56+
pub content: Vec<u8>,
5957
}
6058

6159
#[derive(Debug, Serialize, Deserialize)]
62-
pub struct WriteFileResponse(pub Result<()>);
60+
pub struct WriteFileResponse(pub ());
6361

6462
#[derive(Debug, Serialize, Deserialize)]
6563
pub struct ReadFileRequest {
66-
pub path: Path
64+
pub path: Path,
6765
}
6866

6967
#[derive(Debug, Serialize, Deserialize)]
70-
pub struct ReadFileResponse(pub Result<Vec<u8>>);
68+
pub struct ReadFileResponse(pub Vec<u8>);
7169

7270
#[derive(Debug, Serialize, Deserialize)]
7371
pub struct ExecuteCommandRequest {
74-
pub cmd: String,
75-
pub args: Vec<String>,
76-
pub envs: HashMap<String, String>,
77-
pub cwd: Option<String>, // None means in project direcotry.
72+
pub cmd: String,
73+
pub args: Vec<String>,
74+
pub envs: HashMap<String, String>,
75+
pub cwd: Option<String>, // None means in project direcotry.
7876
}
7977

8078
#[derive(Debug, Serialize, Deserialize)]
81-
pub struct ExecuteCommandResponse(pub Result<ExecuteOutput>);
79+
pub struct ExecuteCommandResponse(pub ExecuteOutput);
8280

8381
#[derive(Debug, Serialize, Deserialize)]
8482
pub struct StreamCommandRequest {
85-
pub cmd: String,
86-
pub args: Vec<String>,
87-
pub envs: HashMap<String, String>,
88-
pub cwd: Option<String>, // None means in project direcotry.
83+
pub cmd: String,
84+
pub args: Vec<String>,
85+
pub envs: HashMap<String, String>,
86+
pub cwd: Option<String>, // None means in project direcotry.
8987
}
9088

9189
#[derive(Debug, Serialize, Deserialize)]
92-
pub struct StreamCommandResponse(pub Result<()>);
90+
pub struct StreamCommandResponse(pub ());
9391
// Relative path is resolved in project direcotry.
9492
#[derive(Debug, Serialize, Deserialize)]
9593
pub enum Request {
9694
WriteFile(WriteFileRequest),
9795
ReadFile(ReadFileRequest),
9896
ExecuteCommand(ExecuteCommandRequest),
99-
StreamCommand(StreamCommandRequest)
97+
StreamCommand(StreamCommandRequest),
10098
}
10199

102100
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -106,18 +104,6 @@ pub struct ExecuteOutput {
106104
pub stdout: Vec<u8>,
107105
}
108106

109-
impl fmt::Display for ExecuteOutput {
110-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111-
write!(
112-
f,
113-
"ExecuteOutput(\nstatus: {:?}\nstderr: {}\nstdout: {}\n)",
114-
self.status,
115-
std::str::from_utf8(&self.stderr).unwrap(),
116-
std::str::from_utf8(&self.stdout).unwrap()
117-
)
118-
}
119-
}
120-
121107
#[derive(Debug, Serialize, Deserialize)]
122108
pub enum Response {
123109
WriteFile(WriteFileResponse),
@@ -126,39 +112,11 @@ pub enum Response {
126112
StreamCommand(StreamCommandResponse),
127113
}
128114

129-
impl fmt::Display for Response {
130-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131-
match self {
132-
Response::ReadFile(res) => {
133-
if let Ok(content) = &res.0 {
134-
write!(
135-
f,
136-
"ReadResult.content:\n{}",
137-
std::str::from_utf8(&content).unwrap()
138-
)
139-
} else {
140-
write!(f, "{:#?}", res)
141-
}
142-
}
143-
Response::ExecuteCommand(res) => {
144-
if let Ok(output) = &res.0 {
145-
write!(f, "ReadResult.output:\n{}", output)
146-
} else {
147-
write!(f, "{:#?}", res)
148-
}
149-
}
150-
_ => write!(f, "{:#?}", self),
151-
}
152-
}
153-
}
115+
#[derive(Debug, Serialize, Deserialize)]
116+
pub struct ResponseResult(pub Result<Response, ResponseError>);
154117

155-
impl Response {
156-
pub fn is_ok(&self) -> bool {
157-
match self {
158-
Response::WriteFile(res) => res.0.is_ok(),
159-
Response::ReadFile(res) => res.0.is_ok(),
160-
Response::ExecuteCommand(res) => res.0.is_ok(),
161-
Response::StreamCommand(res) => res.0.is_ok(),
162-
}
118+
impl From<Result<Response, anyhow::Error>> for ResponseResult {
119+
fn from(value: Result<Response, anyhow::Error>) -> Self {
120+
ResponseResult(value.map_err(|e| format!("{:#}", e)))
163121
}
164122
}

0 commit comments

Comments
 (0)