Skip to content

Commit 291b6d4

Browse files
committed
Extract a type and methods from manage_processes
1 parent eaf899e commit 291b6d4

File tree

1 file changed

+94
-36
lines changed

1 file changed

+94
-36
lines changed

compiler/base/orchestrator/src/worker.rs

Lines changed: 94 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -373,63 +373,121 @@ enum ProcessCommand {
373373
Stdin(String),
374374
}
375375

376+
struct ProcessState {
377+
project_path: PathBuf,
378+
processes: JoinSet<Result<(), ProcessError>>,
379+
stdin_senders: HashMap<JobId, mpsc::Sender<String>>,
380+
stdin_shutdown_tx: mpsc::Sender<JobId>,
381+
}
382+
383+
impl ProcessState {
384+
fn new(project_path: PathBuf, stdin_shutdown_tx: mpsc::Sender<JobId>) -> Self {
385+
Self {
386+
project_path,
387+
processes: Default::default(),
388+
stdin_senders: Default::default(),
389+
stdin_shutdown_tx,
390+
}
391+
}
392+
393+
async fn start(
394+
&mut self,
395+
job_id: JobId,
396+
req: ExecuteCommandRequest,
397+
worker_msg_tx: MultiplexingSender,
398+
) -> Result<(), ProcessError> {
399+
use process_error::*;
400+
401+
let RunningChild {
402+
child,
403+
stdin_rx,
404+
stdin,
405+
stdout,
406+
stderr,
407+
} = match process_begin(req, &self.project_path, &mut self.stdin_senders, job_id) {
408+
Ok(v) => v,
409+
Err(e) => {
410+
// Should we add a message for process started
411+
// in addition to the current message which
412+
// indicates that the process has ended?
413+
worker_msg_tx
414+
.send_err(e)
415+
.await
416+
.context(UnableToSendExecuteCommandStartedResponseSnafu)?;
417+
return Ok(());
418+
}
419+
};
420+
421+
let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr);
422+
423+
self.processes.spawn({
424+
let stdin_shutdown_tx = self.stdin_shutdown_tx.clone();
425+
async move {
426+
worker_msg_tx
427+
.send(process_end(child, task_set, stdin_shutdown_tx, job_id).await)
428+
.await
429+
.context(UnableToSendExecuteCommandResponseSnafu)
430+
}
431+
});
432+
433+
Ok(())
434+
}
435+
436+
async fn stdin(&mut self, job_id: JobId, packet: String) -> Result<(), ProcessError> {
437+
use process_error::*;
438+
439+
if let Some(stdin_tx) = self.stdin_senders.get(&job_id) {
440+
stdin_tx
441+
.send(packet)
442+
.await
443+
.drop_error_details()
444+
.context(UnableToSendStdinDataSnafu)?;
445+
}
446+
447+
Ok(())
448+
}
449+
450+
fn stdin_close(&mut self, job_id: JobId) {
451+
self.stdin_senders.remove(&job_id);
452+
// Should we care if we remove a sender that's already removed?
453+
}
454+
455+
async fn join_process(&mut self) -> Option<Result<(), ProcessError>> {
456+
use process_error::*;
457+
458+
let process = self.processes.join_next().await?;
459+
Some(process.context(ProcessTaskPanickedSnafu).and_then(|e| e))
460+
}
461+
}
462+
376463
async fn manage_processes(
377464
mut rx: mpsc::Receiver<Multiplexed<ProcessCommand>>,
378465
project_path: PathBuf,
379466
) -> Result<(), ProcessError> {
380467
use process_error::*;
381468

382-
let mut processes = JoinSet::new();
383-
let mut stdin_senders = HashMap::new();
384469
let (stdin_shutdown_tx, mut stdin_shutdown_rx) = mpsc::channel(8);
470+
let mut state = ProcessState::new(project_path, stdin_shutdown_tx);
385471

386472
loop {
387473
select! {
388474
cmd = rx.recv() => {
389475
let Some(Multiplexed(job_id, cmd)) = cmd else { break };
390-
match cmd {
391-
ProcessCommand::Start(req, worker_msg_tx) => {
392-
let RunningChild { child, stdin_rx, stdin, stdout, stderr } = match process_begin(req, &project_path, &mut stdin_senders, job_id) {
393-
Ok(v) => v,
394-
Err(e) => {
395-
// Should we add a message for process started
396-
// in addition to the current message which
397-
// indicates that the process has ended?
398-
worker_msg_tx.send_err(e).await.context(UnableToSendExecuteCommandStartedResponseSnafu)?;
399-
continue;
400-
}
401-
};
402-
403-
let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr);
404-
405-
processes.spawn({
406-
let stdin_shutdown_tx = stdin_shutdown_tx.clone();
407-
async move {
408-
worker_msg_tx
409-
.send(process_end(child, task_set, stdin_shutdown_tx, job_id).await)
410-
.await
411-
.context(UnableToSendExecuteCommandResponseSnafu)
412-
}
413-
});
414-
}
415476

416-
ProcessCommand::Stdin(packet) => {
417-
if let Some(stdin_tx) = stdin_senders.get(&job_id) {
418-
stdin_tx.send(packet).await.drop_error_details().context(UnableToSendStdinDataSnafu)?;
419-
}
477+
match cmd {
478+
ProcessCommand::Start(req, worker_msg_tx) => state.start(job_id, req, worker_msg_tx).await?,
420479

421-
}
480+
ProcessCommand::Stdin(packet) => state.stdin(job_id, packet).await?,
422481
}
423482
}
424483

425484
job_id = stdin_shutdown_rx.recv() => {
426485
let job_id = job_id.context(StdinShutdownReceiverEndedSnafu)?;
427-
stdin_senders.remove(&job_id);
428-
// Should we care if we remove a sender that's already removed?
486+
state.stdin_close(job_id);
429487
}
430488

431-
Some(process) = processes.join_next() => {
432-
process.context(ProcessTaskPanickedSnafu)??;
489+
Some(process) = state.join_process() => {
490+
process?;
433491
}
434492
}
435493
}

0 commit comments

Comments
 (0)