Skip to content

Commit 58201a4

Browse files
committed
Limit concurrency of active backends, not Coordinators
Each `Coordinator` can have 0-3 backends running as it abstracts over {stable,beta,nightly} and each backend is lazily created and idled. The "good" failure case was that we could be off by a factor of 3 in the upper bound. The catastrophic failure case was for the WebSocket connections where there are thousands of `Coordinator`s idling with very few of them actively compiling at a time. Setting a low limit would prevent connections, setting a high limit was basically pointless.
1 parent 23144f4 commit 58201a4

File tree

2 files changed

+31
-48
lines changed

2 files changed

+31
-48
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 29 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -861,60 +861,23 @@ impl CoordinatorFactory {
861861
CoordinatorId { start, id }
862862
}
863863

864-
pub async fn build<B>(&self) -> LimitedCoordinator<B>
864+
pub async fn build<B>(&self) -> Coordinator<B>
865865
where
866866
B: Backend + From<CoordinatorId>,
867867
{
868868
let semaphore = self.semaphore.clone();
869-
let permit = semaphore
870-
.acquire_owned()
871-
.await
872-
.expect("Unable to acquire permit");
873869

874870
let id = self.next_id();
875871
let backend = B::from(id);
876872

877-
let coordinator = Coordinator::new(backend);
878-
879-
LimitedCoordinator {
880-
coordinator,
881-
_permit: permit,
882-
}
883-
}
884-
}
885-
886-
pub struct LimitedCoordinator<T> {
887-
coordinator: Coordinator<T>,
888-
_permit: OwnedSemaphorePermit,
889-
}
890-
891-
impl<T> LimitedCoordinator<T>
892-
where
893-
T: Backend,
894-
{
895-
pub async fn shutdown(self) -> Result<T> {
896-
self.coordinator.shutdown().await
897-
}
898-
}
899-
900-
impl<T> ops::Deref for LimitedCoordinator<T> {
901-
type Target = Coordinator<T>;
902-
903-
fn deref(&self) -> &Self::Target {
904-
&self.coordinator
905-
}
906-
}
907-
908-
impl<T> ops::DerefMut for LimitedCoordinator<T> {
909-
fn deref_mut(&mut self) -> &mut Self::Target {
910-
&mut self.coordinator
873+
Coordinator::new(semaphore, backend)
911874
}
912875
}
913876

914877
#[derive(Debug)]
915878
pub struct Coordinator<B> {
879+
semaphore: Arc<Semaphore>,
916880
backend: B,
917-
// Consider making these lazily-created and/or idly time out
918881
stable: OnceCell<Container>,
919882
beta: OnceCell<Container>,
920883
nightly: OnceCell<Container>,
@@ -934,10 +897,11 @@ impl<B> Coordinator<B>
934897
where
935898
B: Backend,
936899
{
937-
pub fn new(backend: B) -> Self {
900+
pub fn new(semaphore: Arc<Semaphore>, backend: B) -> Self {
938901
let token = CancellationToken::new();
939902

940903
Self {
904+
semaphore,
941905
backend,
942906
stable: OnceCell::new(),
943907
beta: OnceCell::new(),
@@ -1174,13 +1138,18 @@ where
11741138
};
11751139

11761140
container
1177-
.get_or_try_init(|| Container::new(channel, self.token.clone(), &self.backend))
1141+
.get_or_try_init(|| {
1142+
let semaphore = self.semaphore.clone();
1143+
let token = self.token.clone();
1144+
Container::new(channel, semaphore, token, &self.backend)
1145+
})
11781146
.await
11791147
}
11801148
}
11811149

11821150
#[derive(Debug)]
11831151
struct Container {
1152+
permit: OwnedSemaphorePermit,
11841153
task: JoinHandle<Result<()>>,
11851154
kill_child: Option<Command>,
11861155
modify_cargo_toml: ModifyCargoToml,
@@ -1190,9 +1159,15 @@ struct Container {
11901159
impl Container {
11911160
async fn new(
11921161
channel: Channel,
1162+
semaphore: Arc<Semaphore>,
11931163
token: CancellationToken,
11941164
backend: &impl Backend,
11951165
) -> Result<Self> {
1166+
let permit = semaphore
1167+
.acquire_owned()
1168+
.await
1169+
.context(AcquirePermitSnafu)?;
1170+
11961171
let (mut child, kill_child, stdin, stdout) = backend.run_worker_in_background(channel)?;
11971172
let IoQueue {
11981173
mut tasks,
@@ -1231,6 +1206,7 @@ impl Container {
12311206
.context(CouldNotLoadCargoTomlSnafu)?;
12321207

12331208
Ok(Container {
1209+
permit,
12341210
task,
12351211
kill_child,
12361212
modify_cargo_toml,
@@ -1907,6 +1883,7 @@ impl Container {
19071883

19081884
async fn shutdown(self) -> Result<()> {
19091885
let Self {
1886+
permit,
19101887
task,
19111888
kill_child,
19121889
modify_cargo_toml,
@@ -1927,7 +1904,10 @@ impl Container {
19271904
.context(KillWorkerSnafu)?;
19281905
}
19291906

1930-
task.await.context(ContainerTaskPanickedSnafu)?
1907+
let r = task.await;
1908+
drop(permit);
1909+
1910+
r.context(ContainerTaskPanickedSnafu)?
19311911
}
19321912
}
19331913

@@ -2705,6 +2685,9 @@ pub enum Error {
27052685

27062686
#[snafu(display("Unable to load original Cargo.toml"))]
27072687
CouldNotLoadCargoToml { source: ModifyCargoTomlError },
2688+
2689+
#[snafu(display("Could not acquire a semaphore permit"))]
2690+
AcquirePermit { source: tokio::sync::AcquireError },
27082691
}
27092692

27102693
struct IoQueue {
@@ -2866,15 +2849,15 @@ mod tests {
28662849
static TEST_COORDINATOR_FACTORY: Lazy<CoordinatorFactory> =
28672850
Lazy::new(|| CoordinatorFactory::new(*MAX_CONCURRENT_TESTS));
28682851

2869-
async fn new_coordinator_test() -> LimitedCoordinator<TestBackend> {
2852+
async fn new_coordinator_test() -> Coordinator<TestBackend> {
28702853
TEST_COORDINATOR_FACTORY.build().await
28712854
}
28722855

2873-
async fn new_coordinator_docker() -> LimitedCoordinator<DockerBackend> {
2856+
async fn new_coordinator_docker() -> Coordinator<DockerBackend> {
28742857
TEST_COORDINATOR_FACTORY.build().await
28752858
}
28762859

2877-
async fn new_coordinator() -> LimitedCoordinator<impl Backend> {
2860+
async fn new_coordinator() -> Coordinator<impl Backend> {
28782861
#[cfg(not(force_docker))]
28792862
{
28802863
new_coordinator_test().await

ui/src/server_axum/websocket.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
use axum::extract::ws::{Message, WebSocket};
1010
use futures::{future::Fuse, Future, FutureExt, StreamExt, TryFutureExt};
1111
use orchestrator::{
12-
coordinator::{self, CoordinatorFactory, DockerBackend, LimitedCoordinator},
12+
coordinator::{self, Coordinator, CoordinatorFactory, DockerBackend},
1313
DropErrorDetailsExt,
1414
};
1515
use snafu::prelude::*;
@@ -222,7 +222,7 @@ pub(crate) async fn handle(
222222

223223
type TaggedError = (Error, Option<Meta>);
224224
type ResponseTx = mpsc::Sender<Result<MessageResponse, TaggedError>>;
225-
type SharedCoordinator = Arc<LimitedCoordinator<DockerBackend>>;
225+
type SharedCoordinator = Arc<Coordinator<DockerBackend>>;
226226

227227
/// Manages a limited amount of access to the `Coordinator`.
228228
///

0 commit comments

Comments
 (0)