Skip to content

Commit 0b70019

Browse files
committed
Request idling containers exit when the container limit is reached
We let WebSocket connections keep {stable,beta,nightly} containers running after the execution finishes. These idle containers still count against the enforced container limits even though they aren't doing useful work. This change allows the owner of those idle containers to know that someone else wants their slot and then they can make the containers exit earlier than they otherwise would have. This can be best shown by setting the container limit very low (e.g. 2) and then opening that many plus one more browser tabs (e.g. 3). Before this change, the last tab would need to wait for the idle timeout (defaults to 60 seconds) before they could have a chance to run their code. = Known issues There's a race that can cause containers to spuriously exit, but that should only occur when we are near the cap of containers anyway. More details are in comments. = Future thoughts / limitations One WebSocket connection could have *both* an active container and an idle one (e.g. stable and nightly). This case is not handled, but it may also be comparatively rare. The non-WebSocket endpoints can request other containers to exit, but they won't ever decrement the exit request counter themselves. This means that if you hit the server many times at these endpoints, then future WebSocket connections will exit earlier than they need to. Hopefully this is also comparatively rare.
1 parent 6330754 commit 0b70019

File tree

3 files changed

+83
-6
lines changed

3 files changed

+83
-6
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,9 @@ type ResourceResult<T, E = ResourceError> = std::result::Result<T, E>;
852852
pub trait ResourceLimits: Send + Sync + fmt::Debug + 'static {
853853
/// Block until resources for a container are available.
854854
fn next_container(&self) -> BoxFuture<'static, ResourceResult<Box<dyn ContainerPermit>>>;
855+
856+
/// Block until someone reqeusts that you return an in-use container.
857+
fn container_requested(&self) -> BoxFuture<'static, ()>;
855858
}
856859

857860
/// Represents one allowed Docker container (or equivalent).
@@ -884,6 +887,10 @@ impl CoordinatorFactory {
884887

885888
Coordinator::new(limits, backend)
886889
}
890+
891+
pub async fn container_requested(&self) {
892+
self.limits.container_requested().await
893+
}
887894
}
888895

889896
#[derive(Debug)]

compiler/base/orchestrator/src/coordinator/limits.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub struct Global<L = NoOpLifecycle> {
9494
lifecycle: L,
9595
container_semaphore: Arc<Semaphore>,
9696
process_semaphore: Arc<Semaphore>,
97+
container_request_semaphore: Arc<Semaphore>,
9798
start: u64,
9899
id: AtomicU64,
99100
}
@@ -136,6 +137,7 @@ where
136137
pub fn with_lifecycle(container_limit: usize, process_limit: usize, lifecycle: L) -> Self {
137138
let container_semaphore = Arc::new(Semaphore::new(container_limit));
138139
let process_semaphore = Arc::new(Semaphore::new(process_limit));
140+
let container_request_semaphore = Arc::new(Semaphore::new(0));
139141

140142
let now = std::time::SystemTime::now();
141143
let start = now
@@ -149,6 +151,7 @@ where
149151
lifecycle,
150152
container_semaphore,
151153
process_semaphore,
154+
container_request_semaphore,
152155
start,
153156
id,
154157
}
@@ -163,13 +166,44 @@ where
163166
let lifecycle = self.lifecycle.clone();
164167
let container_semaphore = self.container_semaphore.clone();
165168
let process_semaphore = self.process_semaphore.clone();
169+
let container_request_semaphore = self.container_request_semaphore.clone();
166170
let start = self.start;
167171
let id = self.id.fetch_add(1, Ordering::SeqCst);
168172

169173
async move {
170174
let guard = ContainerAcquireGuard::start(&lifecycle);
171175

172-
let container_permit = container_semaphore.acquire_owned().await;
176+
// Attempt to acquire the container semaphore. If we don't
177+
// immediately get it, notify the container request
178+
// semaphore. Any idle-but-not-yet-exited connections
179+
// should watch that semaphore to see if they should give
180+
// up thier container to allow someone else in.
181+
//
182+
// There *is* a race here: a container might naturally
183+
// exit after we attempt to acquire the first time. In
184+
// that case, we'd spuriously notify the request semaphore
185+
// and a container might exit earlier than it needed
186+
// to. However, this should be a transient issue and only
187+
// occur when we are already at the upper bounds of our
188+
// limits. In those cases, freeing an extra container or
189+
// two shouldn't be the worst thing.
190+
let container_permit = {
191+
let fallback = {
192+
let container_semaphore = container_semaphore.clone();
193+
async {
194+
container_request_semaphore.add_permits(1);
195+
container_semaphore.acquire_owned().await
196+
}
197+
};
198+
199+
tokio::select! {
200+
biased;
201+
202+
permit = container_semaphore.acquire_owned() => permit,
203+
permit = fallback => permit,
204+
}
205+
};
206+
173207
let container_permit = guard.complete(container_permit)?;
174208

175209
let token = TrackContainer {
@@ -183,6 +217,23 @@ where
183217
}
184218
.boxed()
185219
}
220+
221+
fn container_requested(&self) -> BoxFuture<'static, ()> {
222+
let container_request_semaphore = self.container_request_semaphore.clone();
223+
224+
async move {
225+
let permit = container_request_semaphore
226+
.acquire()
227+
.await
228+
.expect("The semaphore is never closed");
229+
230+
// We're now dealing with the request to return a
231+
// container so we discard the permit to prevent anyone
232+
// else from trying to handle it.
233+
permit.forget();
234+
}
235+
.boxed()
236+
}
186237
}
187238

188239
impl<L> fmt::Display for TrackContainer<L>

ui/src/server_axum/websocket.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::{
1616
collections::BTreeMap,
1717
convert::TryFrom,
1818
mem,
19+
ops::ControlFlow,
1920
pin::pin,
2021
sync::{
2122
atomic::{AtomicU64, Ordering},
@@ -444,13 +445,16 @@ async fn handle_core(
444445
},
445446

446447
_ = &mut idle_timeout, if manager.is_empty() => {
447-
let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu);
448+
if handle_idle(&mut manager, &tx).await.is_break() {
449+
break
450+
}
451+
},
448452

449-
let Err(error) = idled else { continue };
453+
_ = factory.container_requested(), if manager.is_empty() => {
454+
info!("Container requested to idle");
450455

451-
if tx.send(Err((error, None))).await.is_err() {
452-
// We can't send a response
453-
break;
456+
if handle_idle(&mut manager, &tx).await.is_break() {
457+
break
454458
}
455459
},
456460

@@ -506,6 +510,21 @@ fn response_to_message(response: MessageResponse) -> Message {
506510
Message::Text(resp)
507511
}
508512

513+
async fn handle_idle(manager: &mut CoordinatorManager, tx: &ResponseTx) -> ControlFlow<()> {
514+
let idled = manager.idle().await.context(StreamingCoordinatorIdleSnafu);
515+
516+
let Err(error) = idled else {
517+
return ControlFlow::Continue(());
518+
};
519+
520+
if tx.send(Err((error, None))).await.is_err() {
521+
// We can't send a response
522+
return ControlFlow::Break(());
523+
}
524+
525+
ControlFlow::Continue(())
526+
}
527+
509528
type ActiveExecutionInfo = (CancellationToken, Option<mpsc::Sender<String>>);
510529

511530
async fn handle_msg(

0 commit comments

Comments
 (0)