Skip to content

Commit b24d32e

Browse files
committed
clenaer
1 parent d20f205 commit b24d32e

File tree

2 files changed

+67
-70
lines changed

2 files changed

+67
-70
lines changed

orchestrator/src/coordinator.rs

Lines changed: 66 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
CoordinatorMessage, JobId, Multiplexed, OneToOneResponse, ReadFileRequest,
2727
ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest,
2828
},
29-
sandbox::{CompileRequest, CompileResponse, CompileResponseWithOutput, Channel},
29+
sandbox::{Channel, CompileRequest, CompileResponse, CompileResponseWithOutput},
3030
DropErrorDetailsExt,
3131
};
3232

@@ -37,14 +37,34 @@ enum DemultiplexCommand {
3737
}
3838

3939
#[derive(Debug)]
40-
pub struct Container {
41-
stable: ContainerCore,
42-
beta: ContainerCore,
43-
nightly: ContainerCore,
40+
pub struct Coordinator<B> {
41+
backend: B,
42+
// Consider making these lazily-created and/or idly time out
43+
stable: Container,
44+
beta: Container,
45+
nightly: Container,
4446
token: CancellationToken,
4547
}
4648

47-
impl Container {
49+
impl<B> Coordinator<B>
50+
where
51+
B: Backend,
52+
{
53+
pub fn new(backend: B) -> Result<Self, Error> {
54+
let token = CancellationToken::new();
55+
56+
let [stable, beta, nightly] =
57+
Channel::ALL.map(|channel| Container::new(channel, token.clone(), &backend));
58+
59+
Ok(Self {
60+
backend,
61+
stable: stable?,
62+
beta: beta?,
63+
nightly: nightly?,
64+
token,
65+
})
66+
}
67+
4868
pub async fn compile(
4969
&self,
5070
request: CompileRequest,
@@ -56,11 +76,14 @@ impl Container {
5676
&self,
5777
request: CompileRequest,
5878
) -> Result<ActiveCompilation, CompileError> {
59-
self.select_channel(request.channel).begin_compile(request).await
79+
self.select_channel(request.channel)
80+
.begin_compile(request)
81+
.await
6082
}
6183

62-
pub async fn shutdown(self) -> Result<()> {
84+
pub async fn shutdown(self) -> Result<B> {
6385
let Self {
86+
backend,
6487
stable,
6588
beta,
6689
nightly,
@@ -70,16 +93,14 @@ impl Container {
7093

7194
let (stable, beta, nightly) = join!(stable.shutdown(), beta.shutdown(), nightly.shutdown());
7295

73-
stable.unwrap();
96+
stable.unwrap(); // ContainerTaskPanickedSnafu
7497
beta.unwrap();
7598
nightly.unwrap();
7699

77-
Ok(())
78-
// task.await.context(ContainerTaskPanickedSnafu)?
100+
Ok(backend)
79101
}
80102

81-
82-
fn select_channel(&self, channel: Channel) -> &ContainerCore {
103+
fn select_channel(&self, channel: Channel) -> &Container {
83104
match channel {
84105
Channel::Stable => &self.stable,
85106
Channel::Beta => &self.beta,
@@ -88,13 +109,19 @@ impl Container {
88109
}
89110
}
90111

112+
impl Coordinator<DockerBackend> {
113+
pub fn new_docker() -> Result<Self, Error> {
114+
Self::new(DockerBackend(()))
115+
}
116+
}
117+
91118
#[derive(Debug)]
92-
pub struct ContainerCore {
119+
pub struct Container {
93120
task: JoinHandle<Result<()>>,
94121
commander: Commander,
95122
}
96123

97-
impl ContainerCore {
124+
impl Container {
98125
fn new(channel: Channel, token: CancellationToken, backend: &impl Backend) -> Result<Self> {
99126
let (mut child, stdin, stdout) = backend.run_worker_in_background(channel)?;
100127
let IoQueue {
@@ -124,11 +151,7 @@ impl ContainerCore {
124151
id: Default::default(),
125152
};
126153

127-
Ok(ContainerCore {
128-
task,
129-
commander,
130-
})
131-
154+
Ok(Container { task, commander })
132155
}
133156

134157
pub async fn compile(
@@ -234,10 +257,7 @@ impl ContainerCore {
234257
}
235258

236259
pub async fn shutdown(self) -> Result<()> {
237-
let Self {
238-
task,
239-
commander,
240-
} = self;
260+
let Self { task, commander } = self;
241261
drop(commander);
242262
task.await.context(ContainerTaskPanickedSnafu)?
243263
}
@@ -495,40 +515,11 @@ pub enum CommanderError {
495515
WorkerOperationFailed { text: String },
496516
}
497517

498-
#[derive(Debug)]
499-
pub struct Coordinator<B> {
500-
backend: B,
501-
}
502-
503-
impl<B: Backend> Coordinator<B> {
504-
pub fn new(backend: B) -> Self {
505-
Self { backend }
506-
}
507-
508-
pub fn allocate(&mut self) -> Result<Container, Error> {
509-
let token = CancellationToken::new();
510-
511-
let [stable, beta, nightly] = Channel::ALL.map(|channel| {
512-
ContainerCore::new(channel, token.clone(), &self.backend)
513-
});
514-
515-
Ok(Container {
516-
stable: stable?,
517-
beta: beta?,
518-
nightly: nightly?,
519-
token,
520-
})
521-
}
522-
}
523-
524-
impl Coordinator<DockerBackend> {
525-
pub fn new_docker() -> Self {
526-
Self::new(DockerBackend(()))
527-
}
528-
}
529-
530518
pub trait Backend {
531-
fn run_worker_in_background(&self, channel: Channel) -> Result<(Child, ChildStdin, ChildStdout)> {
519+
fn run_worker_in_background(
520+
&self,
521+
channel: Channel,
522+
) -> Result<(Child, ChildStdin, ChildStdout)> {
532523
let mut child = self
533524
.prepare_worker_command(channel)
534525
.stdin(Stdio::piped())
@@ -544,6 +535,15 @@ pub trait Backend {
544535
fn prepare_worker_command(&self, channel: Channel) -> Command;
545536
}
546537

538+
impl<B> Backend for &B
539+
where
540+
B: Backend,
541+
{
542+
fn prepare_worker_command(&self, channel: Channel) -> Command {
543+
B::prepare_worker_command(self, channel)
544+
}
545+
}
546+
547547
macro_rules! docker_command {
548548
($($arg:expr),* $(,)?) => ({
549549
let mut cmd = Command::new("docker");
@@ -721,6 +721,7 @@ mod tests {
721721

722722
use super::*;
723723

724+
#[derive(Debug)]
724725
struct TestBackend {
725726
project_dir: TempDir,
726727
}
@@ -754,10 +755,7 @@ mod tests {
754755

755756
impl Backend for TestBackend {
756757
fn prepare_worker_command(&self, channel: Channel) -> Command {
757-
let toolchain_file = format!(r#"
758-
[toolchain]
759-
channel = "{}"
760-
"#, channel.to_str());
758+
let toolchain_file = format!(r#"[toolchain]\nchannel = "{}""#, channel.to_str());
761759
let path = self.project_dir.path().join("rust-toolchain.toml");
762760
std::fs::write(path, toolchain_file).expect("Couldn't write toolchain file");
763761

@@ -767,7 +765,7 @@ mod tests {
767765
}
768766
}
769767

770-
fn new_coordinator() -> Coordinator<impl Backend> {
768+
fn new_coordinator() -> Result<Coordinator<impl Backend>> {
771769
Coordinator::new(TestBackend::new())
772770
// Coordinator::new_docker()
773771
}
@@ -788,35 +786,33 @@ mod tests {
788786
#[tokio::test]
789787
#[snafu::report]
790788
async fn test_compile_response() -> Result<()> {
791-
let mut coordinator = new_coordinator();
789+
let coordinator = new_coordinator()?;
792790

793-
let container = coordinator.allocate()?;
794791
let response = tokio::time::timeout(
795792
Duration::from_millis(5000),
796-
container.compile(new_compile_request()),
793+
coordinator.compile(new_compile_request()),
797794
)
798795
.await
799796
.expect("Failed to receive streaming from container in time")
800797
.unwrap();
801798

802799
assert!(response.success);
803800

804-
container.shutdown().await?;
801+
coordinator.shutdown().await?;
805802

806803
Ok(())
807804
}
808805

809806
#[tokio::test]
810807
#[snafu::report]
811808
async fn test_compile_streaming() -> Result<()> {
812-
let mut coordinator = new_coordinator();
809+
let coordinator = new_coordinator()?;
813810

814-
let container = coordinator.allocate()?;
815811
let ActiveCompilation {
816812
task,
817813
stdout_rx,
818814
stderr_rx,
819-
} = container
815+
} = coordinator
820816
.begin_compile(new_compile_request())
821817
.await
822818
.unwrap();
@@ -841,7 +837,7 @@ mod tests {
841837
);
842838
assert!(stderr.contains("Finished"), "Missing `Finished`: {stderr}");
843839

844-
container.shutdown().await?;
840+
coordinator.shutdown().await?;
845841

846842
Ok(())
847843
}

orchestrator/src/sandbox.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub enum Channel {
3838
impl Channel {
3939
pub(crate) const ALL: [Self; 3] = [Self::Stable, Self::Beta, Self::Nightly];
4040

41+
#[cfg(test)]
4142
pub(crate) fn to_str(&self) -> &'static str {
4243
match self {
4344
Channel::Stable => "stable",

0 commit comments

Comments
 (0)