Skip to content

Commit d20f205

Browse files
committed
one child for each channel
1 parent e0fdafc commit d20f205

File tree

2 files changed

+130
-39
lines changed

2 files changed

+130
-39
lines changed

orchestrator/src/coordinator.rs

Lines changed: 118 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
CoordinatorMessage, JobId, Multiplexed, OneToOneResponse, ReadFileRequest,
2727
ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest,
2828
},
29-
sandbox::{CompileRequest, CompileResponse, CompileResponseWithOutput},
29+
sandbox::{CompileRequest, CompileResponse, CompileResponseWithOutput, Channel},
3030
DropErrorDetailsExt,
3131
};
3232

@@ -38,12 +38,99 @@ enum DemultiplexCommand {
3838

3939
#[derive(Debug)]
4040
pub struct Container {
41-
task: JoinHandle<Result<()>>,
42-
commander: Commander,
41+
stable: ContainerCore,
42+
beta: ContainerCore,
43+
nightly: ContainerCore,
4344
token: CancellationToken,
4445
}
4546

4647
impl Container {
48+
pub async fn compile(
49+
&self,
50+
request: CompileRequest,
51+
) -> Result<CompileResponseWithOutput, CompileError> {
52+
self.select_channel(request.channel).compile(request).await
53+
}
54+
55+
pub async fn begin_compile(
56+
&self,
57+
request: CompileRequest,
58+
) -> Result<ActiveCompilation, CompileError> {
59+
self.select_channel(request.channel).begin_compile(request).await
60+
}
61+
62+
pub async fn shutdown(self) -> Result<()> {
63+
let Self {
64+
stable,
65+
beta,
66+
nightly,
67+
token,
68+
} = self;
69+
token.cancel();
70+
71+
let (stable, beta, nightly) = join!(stable.shutdown(), beta.shutdown(), nightly.shutdown());
72+
73+
stable.unwrap();
74+
beta.unwrap();
75+
nightly.unwrap();
76+
77+
Ok(())
78+
// task.await.context(ContainerTaskPanickedSnafu)?
79+
}
80+
81+
82+
fn select_channel(&self, channel: Channel) -> &ContainerCore {
83+
match channel {
84+
Channel::Stable => &self.stable,
85+
Channel::Beta => &self.beta,
86+
Channel::Nightly => &self.nightly,
87+
}
88+
}
89+
}
90+
91+
#[derive(Debug)]
92+
pub struct ContainerCore {
93+
task: JoinHandle<Result<()>>,
94+
commander: Commander,
95+
}
96+
97+
impl ContainerCore {
98+
fn new(channel: Channel, token: CancellationToken, backend: &impl Backend) -> Result<Self> {
99+
let (mut child, stdin, stdout) = backend.run_worker_in_background(channel)?;
100+
let IoQueue {
101+
mut tasks,
102+
to_worker_tx,
103+
from_worker_rx,
104+
} = spawn_io_queue(stdin, stdout, token);
105+
106+
let (command_tx, command_rx) = mpsc::channel(8);
107+
let demultiplex_task = tokio::spawn(Commander::demultiplex(command_rx, from_worker_rx));
108+
109+
let task = tokio::spawn(async move {
110+
let (c, d, t) = join!(child.wait(), demultiplex_task, tasks.join_next());
111+
c.context(JoinWorkerSnafu)?;
112+
d.context(DemultiplexerTaskPanickedSnafu)?
113+
.context(DemultiplexerTaskFailedSnafu)?;
114+
if let Some(t) = t {
115+
t.context(IoQueuePanickedSnafu)??;
116+
}
117+
118+
Ok(())
119+
});
120+
121+
let commander = Commander {
122+
to_worker_tx,
123+
to_demultiplexer_tx: command_tx,
124+
id: Default::default(),
125+
};
126+
127+
Ok(ContainerCore {
128+
task,
129+
commander,
130+
})
131+
132+
}
133+
47134
pub async fn compile(
48135
&self,
49136
request: CompileRequest,
@@ -150,10 +237,8 @@ impl Container {
150237
let Self {
151238
task,
152239
commander,
153-
token,
154240
} = self;
155241
drop(commander);
156-
token.cancel();
157242
task.await.context(ContainerTaskPanickedSnafu)?
158243
}
159244
}
@@ -423,37 +508,14 @@ impl<B: Backend> Coordinator<B> {
423508
pub fn allocate(&mut self) -> Result<Container, Error> {
424509
let token = CancellationToken::new();
425510

426-
let (mut child, stdin, stdout) = self.backend.run_worker_in_background()?;
427-
let IoQueue {
428-
mut tasks,
429-
to_worker_tx,
430-
from_worker_rx,
431-
} = spawn_io_queue(stdin, stdout, token.clone());
432-
433-
let (command_tx, command_rx) = mpsc::channel(8);
434-
let demultiplex_task = tokio::spawn(Commander::demultiplex(command_rx, from_worker_rx));
435-
436-
let task = tokio::spawn(async move {
437-
let (c, d, t) = join!(child.wait(), demultiplex_task, tasks.join_next());
438-
c.context(JoinWorkerSnafu)?;
439-
d.context(DemultiplexerTaskPanickedSnafu)?
440-
.context(DemultiplexerTaskFailedSnafu)?;
441-
if let Some(t) = t {
442-
t.context(IoQueuePanickedSnafu)??;
443-
}
444-
445-
Ok(())
511+
let [stable, beta, nightly] = Channel::ALL.map(|channel| {
512+
ContainerCore::new(channel, token.clone(), &self.backend)
446513
});
447514

448-
let commander = Commander {
449-
to_worker_tx,
450-
to_demultiplexer_tx: command_tx,
451-
id: Default::default(),
452-
};
453-
454515
Ok(Container {
455-
task,
456-
commander,
516+
stable: stable?,
517+
beta: beta?,
518+
nightly: nightly?,
457519
token,
458520
})
459521
}
@@ -466,9 +528,9 @@ impl Coordinator<DockerBackend> {
466528
}
467529

468530
pub trait Backend {
469-
fn run_worker_in_background(&self) -> Result<(Child, ChildStdin, ChildStdout)> {
531+
fn run_worker_in_background(&self, channel: Channel) -> Result<(Child, ChildStdin, ChildStdout)> {
470532
let mut child = self
471-
.prepare_worker_command()
533+
.prepare_worker_command(channel)
472534
.stdin(Stdio::piped())
473535
.stdout(Stdio::piped())
474536
.stderr(Stdio::inherit())
@@ -479,7 +541,7 @@ pub trait Backend {
479541
Ok((child, stdin, stdout))
480542
}
481543

482-
fn prepare_worker_command(&self) -> Command;
544+
fn prepare_worker_command(&self, channel: Channel) -> Command;
483545
}
484546

485547
macro_rules! docker_command {
@@ -516,19 +578,29 @@ fn basic_secure_docker_command() -> Command {
516578
pub struct DockerBackend(());
517579

518580
impl Backend for DockerBackend {
519-
fn prepare_worker_command(&self) -> Command {
581+
fn prepare_worker_command(&self, channel: Channel) -> Command {
520582
let mut command = basic_secure_docker_command();
521583
command
522584
.arg("-i")
523585
.args(["-a", "stdin", "-a", "stdout", "-a", "stderr"])
524586
.arg("--rm")
525-
.arg("orchestrator")
587+
.arg(channel.to_container_name())
526588
.arg("worker")
527589
.arg("/playground");
528590
command
529591
}
530592
}
531593

594+
impl Channel {
595+
fn to_container_name(&self) -> &'static str {
596+
match self {
597+
Channel::Stable => "orchestrator-stable",
598+
Channel::Beta => "orchestrator-beta",
599+
Channel::Nightly => "orchestrator-nightly",
600+
}
601+
}
602+
}
603+
532604
pub type Result<T, E = Error> = ::std::result::Result<T, E>;
533605

534606
#[derive(Debug, Snafu)]
@@ -681,7 +753,14 @@ mod tests {
681753
}
682754

683755
impl Backend for TestBackend {
684-
fn prepare_worker_command(&self) -> Command {
756+
fn prepare_worker_command(&self, channel: Channel) -> Command {
757+
let toolchain_file = format!(r#"
758+
[toolchain]
759+
channel = "{}"
760+
"#, channel.to_str());
761+
let path = self.project_dir.path().join("rust-toolchain.toml");
762+
std::fs::write(path, toolchain_file).expect("Couldn't write toolchain file");
763+
685764
let mut command = Command::new("./target/debug/worker");
686765
command.arg(self.project_dir.path());
687766
command

orchestrator/src/sandbox.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ pub enum Channel {
3535
Nightly,
3636
}
3737

38+
impl Channel {
39+
pub(crate) const ALL: [Self; 3] = [Self::Stable, Self::Beta, Self::Nightly];
40+
41+
pub(crate) fn to_str(&self) -> &'static str {
42+
match self {
43+
Channel::Stable => "stable",
44+
Channel::Beta => "beta",
45+
Channel::Nightly => "nightly",
46+
}
47+
}
48+
}
49+
3850
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3951
pub enum Mode {
4052
Debug,

0 commit comments

Comments
 (0)