Skip to content

Commit eaf899e

Browse files
committed
Combine starting a process and submitting stdin into one channel
1 parent 7835da8 commit eaf899e

File tree

1 file changed

+44
-44
lines changed

1 file changed

+44
-44
lines changed

compiler/base/orchestrator/src/worker.rs

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -57,25 +57,21 @@ use crate::{
5757
DropErrorDetailsExt,
5858
};
5959

60-
type CommandRequest = (Multiplexed<ExecuteCommandRequest>, MultiplexingSender);
61-
6260
pub async fn listen(project_dir: impl Into<PathBuf>) -> Result<(), Error> {
6361
let project_dir = project_dir.into();
6462

6563
let (coordinator_msg_tx, coordinator_msg_rx) = mpsc::channel(8);
6664
let (worker_msg_tx, worker_msg_rx) = mpsc::channel(8);
6765
let mut io_tasks = spawn_io_queue(coordinator_msg_tx, worker_msg_rx);
6866

69-
let (cmd_tx, cmd_rx) = mpsc::channel(8);
70-
let (stdin_tx, stdin_rx) = mpsc::channel(8);
71-
let process_task = tokio::spawn(manage_processes(stdin_rx, cmd_rx, project_dir.clone()));
67+
let (process_tx, process_rx) = mpsc::channel(8);
68+
let process_task = tokio::spawn(manage_processes(process_rx, project_dir.clone()));
7269

7370
let handler_task = tokio::spawn(handle_coordinator_message(
7471
coordinator_msg_rx,
7572
worker_msg_tx,
7673
project_dir,
77-
cmd_tx,
78-
stdin_tx,
74+
process_tx,
7975
));
8076

8177
select! {
@@ -122,8 +118,7 @@ async fn handle_coordinator_message(
122118
mut coordinator_msg_rx: mpsc::Receiver<Multiplexed<CoordinatorMessage>>,
123119
worker_msg_tx: mpsc::Sender<Multiplexed<WorkerMessage>>,
124120
project_dir: PathBuf,
125-
cmd_tx: mpsc::Sender<CommandRequest>,
126-
stdin_tx: mpsc::Sender<Multiplexed<String>>,
121+
process_tx: mpsc::Sender<Multiplexed<ProcessCommand>>,
127122
) -> Result<(), HandleCoordinatorMessageError> {
128123
use handle_coordinator_message_error::*;
129124

@@ -177,16 +172,16 @@ async fn handle_coordinator_message(
177172
}
178173

179174
CoordinatorMessage::ExecuteCommand(req) => {
180-
cmd_tx
181-
.send((Multiplexed(job_id, req), worker_msg_tx()))
175+
process_tx
176+
.send(Multiplexed(job_id, ProcessCommand::Start(req, worker_msg_tx())))
182177
.await
183178
.drop_error_details()
184179
.context(UnableToSendCommandExecutionRequestSnafu)?;
185180
}
186181

187182
CoordinatorMessage::StdinPacket(data) => {
188-
stdin_tx
189-
.send(Multiplexed(job_id, data))
183+
process_tx
184+
.send(Multiplexed(job_id, ProcessCommand::Stdin(data)))
190185
.await
191186
.drop_error_details()
192187
.context(UnableToSendStdinPacketSnafu)?;
@@ -373,9 +368,13 @@ fn parse_working_dir(cwd: Option<String>, project_path: impl Into<PathBuf>) -> P
373368
final_path
374369
}
375370

371+
enum ProcessCommand {
372+
Start(ExecuteCommandRequest, MultiplexingSender),
373+
Stdin(String),
374+
}
375+
376376
async fn manage_processes(
377-
mut stdin_rx: mpsc::Receiver<Multiplexed<String>>,
378-
mut cmd_rx: mpsc::Receiver<CommandRequest>,
377+
mut rx: mpsc::Receiver<Multiplexed<ProcessCommand>>,
379378
project_path: PathBuf,
380379
) -> Result<(), ProcessError> {
381380
use process_error::*;
@@ -386,39 +385,40 @@ async fn manage_processes(
386385

387386
loop {
388387
select! {
389-
cmd_req = cmd_rx.recv() => {
390-
let Some((Multiplexed(job_id, req), worker_msg_tx)) = cmd_req else { break };
391-
392-
let RunningChild { child, stdin_rx, stdin, stdout, stderr } = match process_begin(req, &project_path, &mut stdin_senders, job_id) {
393-
Ok(v) => v,
394-
Err(e) => {
395-
// Should we add a message for process started
396-
// in addition to the current message which
397-
// indicates that the process has ended?
398-
worker_msg_tx.send_err(e).await.context(UnableToSendExecuteCommandStartedResponseSnafu)?;
399-
continue;
388+
cmd = rx.recv() => {
389+
let Some(Multiplexed(job_id, cmd)) = cmd else { break };
390+
match cmd {
391+
ProcessCommand::Start(req, worker_msg_tx) => {
392+
let RunningChild { child, stdin_rx, stdin, stdout, stderr } = match process_begin(req, &project_path, &mut stdin_senders, job_id) {
393+
Ok(v) => v,
394+
Err(e) => {
395+
// Should we add a message for process started
396+
// in addition to the current message which
397+
// indicates that the process has ended?
398+
worker_msg_tx.send_err(e).await.context(UnableToSendExecuteCommandStartedResponseSnafu)?;
399+
continue;
400+
}
401+
};
402+
403+
let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr);
404+
405+
processes.spawn({
406+
let stdin_shutdown_tx = stdin_shutdown_tx.clone();
407+
async move {
408+
worker_msg_tx
409+
.send(process_end(child, task_set, stdin_shutdown_tx, job_id).await)
410+
.await
411+
.context(UnableToSendExecuteCommandResponseSnafu)
412+
}
413+
});
400414
}
401-
};
402415

403-
let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr);
416+
ProcessCommand::Stdin(packet) => {
417+
if let Some(stdin_tx) = stdin_senders.get(&job_id) {
418+
stdin_tx.send(packet).await.drop_error_details().context(UnableToSendStdinDataSnafu)?;
419+
}
404420

405-
processes.spawn({
406-
let stdin_shutdown_tx = stdin_shutdown_tx.clone();
407-
async move {
408-
worker_msg_tx
409-
.send(process_end(child, task_set, stdin_shutdown_tx, job_id).await)
410-
.await
411-
.context(UnableToSendExecuteCommandResponseSnafu)
412421
}
413-
});
414-
}
415-
416-
stdin_packet = stdin_rx.recv() => {
417-
// Dispatch stdin packet to different child by attached command id.
418-
let Some(Multiplexed(job_id, packet)) = stdin_packet else { break };
419-
420-
if let Some(stdin_tx) = stdin_senders.get(&job_id) {
421-
stdin_tx.send(packet).await.drop_error_details().context(UnableToSendStdinDataSnafu)?;
422422
}
423423
}
424424

0 commit comments

Comments
 (0)