Skip to content

Minor cleanups and additional logging from re-reading code #1166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 37 additions & 36 deletions compiler/base/orchestrator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ impl<B> Coordinator<B>
where
B: Backend,
{
pub fn new(limits: Arc<dyn ResourceLimits>, backend: B) -> Self {
fn new(limits: Arc<dyn ResourceLimits>, backend: B) -> Self {
let token = CancelOnDrop(CancellationToken::new());

Self {
Expand Down Expand Up @@ -1221,9 +1221,8 @@ impl Container {
} = spawn_io_queue(stdin, stdout, token);

let (command_tx, command_rx) = mpsc::channel(8);
let demultiplex_task =
tokio::spawn(Commander::demultiplex(command_rx, from_worker_rx).in_current_span())
.abort_on_drop();
let demultiplex = Commander::demultiplex(command_rx, from_worker_rx);
let demultiplex_task = tokio::spawn(demultiplex.in_current_span()).abort_on_drop();

let task = tokio::spawn(
async move {
Expand Down Expand Up @@ -1842,18 +1841,33 @@ impl Container {
let mut stdin_open = true;

loop {
select! {
() = &mut cancelled => {
enum Event {
Cancelled,
Stdin(Option<String>),
FromWorker(WorkerMessage),
}
use Event::*;

let event = select! {
() = &mut cancelled => Cancelled,

stdin = stdin_rx.recv(), if stdin_open => Stdin(stdin),

Some(container_msg) = from_worker_rx.recv() => FromWorker(container_msg),

else => return UnexpectedEndOfMessagesSnafu.fail(),
};

match event {
Cancelled => {
let msg = CoordinatorMessage::Kill;
trace!(msg_name = msg.as_ref(), "processing");
to_worker_tx.send(msg).await.context(KillSnafu)?;
},
}

stdin = stdin_rx.recv(), if stdin_open => {
Stdin(stdin) => {
let msg = match stdin {
Some(stdin) => {
CoordinatorMessage::StdinPacket(stdin)
}
Some(stdin) => CoordinatorMessage::StdinPacket(stdin),

None => {
stdin_open = false;
Expand All @@ -1863,9 +1877,9 @@ impl Container {

trace!(msg_name = msg.as_ref(), "processing");
to_worker_tx.send(msg).await.context(StdinSnafu)?;
},
}

Some(container_msg) = from_worker_rx.recv() => {
FromWorker(container_msg) => {
trace!(msg_name = container_msg.as_ref(), "processing");

match container_msg {
Expand All @@ -1885,20 +1899,18 @@ impl Container {
status_tx.send(stats).await.ok(/* Receiver gone, that's OK */);
}

WorkerMessage::Error(e) =>
return Err(SerializedError2::adapt(e)).context(WorkerSnafu),
WorkerMessage::Error(e) => {
return Err(SerializedError2::adapt(e)).context(WorkerSnafu);
}

WorkerMessage::Error2(e) =>
return Err(e).context(WorkerSnafu),
WorkerMessage::Error2(e) => return Err(e).context(WorkerSnafu),

_ => {
let message = container_msg.as_ref();
return UnexpectedMessageSnafu { message }.fail()
},
return UnexpectedMessageSnafu { message }.fail();
}
}
},

else => return UnexpectedEndOfMessagesSnafu.fail(),
}
}
}
}
Expand Down Expand Up @@ -2426,19 +2438,12 @@ impl Commander {
continue;
}

warn!(job_id, "no listener to notify");
warn!(job_id, msg_name = msg.as_ref(), "no listener to notify");
}

Gc => {
waiting = mem::take(&mut waiting)
.into_iter()
.filter(|(_job_id, tx)| !tx.is_closed())
.collect();

waiting_once = mem::take(&mut waiting_once)
.into_iter()
.filter(|(_job_id, tx)| !tx.is_closed())
.collect();
waiting.retain(|_job_id, tx| !tx.is_closed());
waiting_once.retain(|_job_id, tx| !tx.is_closed());
}
}
}
Expand Down Expand Up @@ -2756,10 +2761,6 @@ impl Backend for DockerBackend {
.args(["--name", &name])
.arg("-i")
.args(["-a", "stdin", "-a", "stdout", "-a", "stderr"])
// PLAYGROUND_ORCHESTRATOR is vestigial; I'm leaving it
// for a bit to allow new containers to get built and
// distributed.
.args(["-e", "PLAYGROUND_ORCHESTRATOR=1"])
.arg("--rm")
.arg(channel.to_container_name())
.arg("worker")
Expand Down
16 changes: 12 additions & 4 deletions ui/src/request_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::{
sync::{mpsc, oneshot},
task,
};
use tracing::warn;
use tracing::{info, warn, Instrument as _};

pub struct Database {
db: Connection,
Expand Down Expand Up @@ -172,7 +172,11 @@ impl Handle {
.drop_error_details()
.context(SendStartRequestSnafu)?;

rx.await.context(RecvStartRequestSnafu)?.map_err(Into::into)
let id = rx.await.context(RecvStartRequestSnafu)??;

info!(request_id = id.0, "Started request");

Ok(id)
}

async fn attempt_start_request(
Expand All @@ -198,7 +202,11 @@ impl Handle {
.drop_error_details()
.context(SendEndRequestSnafu)?;

rx.await.context(RecvEndRequestSnafu)?.map_err(Into::into)
rx.await.context(RecvEndRequestSnafu)??;

info!(request_id = id.0, "Ended request");

Ok(())
}

async fn attempt_end_request(&self, id: Id, how: How) {
Expand Down Expand Up @@ -245,7 +253,7 @@ impl Drop for EndGuardInner {
let Self(id, how, ref mut handle) = *self;
if let Ok(h) = tokio::runtime::Handle::try_current() {
if let Some(handle) = handle.take() {
h.spawn(async move { handle.attempt_end_request(id, how).await });
h.spawn(async move { handle.attempt_end_request(id, how).await }.in_current_span());
}
}
}
Expand Down
75 changes: 44 additions & 31 deletions ui/src/server_axum/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use snafu::prelude::*;
use std::{
collections::BTreeMap,
convert::TryFrom,
mem,
pin::pin,
sync::{
atomic::{AtomicU64, Ordering},
Expand Down Expand Up @@ -470,14 +469,12 @@ async fn handle_core(
}

GarbageCollection => {
active_executions = mem::take(&mut active_executions)
.into_iter()
.filter(|(_id, (_, tx))| tx.as_ref().is_some_and(|tx| !tx.is_closed()))
.collect();
active_executions
.retain(|_id, (_, tx)| tx.as_ref().is_some_and(|tx| !tx.is_closed()));
}

IdleTimeout | IdleRequest => {
if let IdleRequest = event {
if matches!(event, IdleRequest) {
info!("Container requested to idle");
}

Expand Down Expand Up @@ -721,46 +718,62 @@ async fn handle_execute_inner(
let mut reported = false;

let status = loop {
tokio::select! {
status = &mut task => break status,

stdin = rx.recv(), if stdin_tx.is_some() => {
match stdin {
Some(stdin) => {
stdin_tx
.as_ref()
.unwrap(/* This is a precondition */)
.send(stdin)
.await
.drop_error_details()
.context(StdinSnafu)?;
}
None => {
let stdin_tx = stdin_tx.take();
drop(stdin_tx); // Signal closed
}
}
enum Event {
Stdin(Option<String>),
Stdout(String),
Stderr(String),
Status(coordinator::ExecuteStatus),
}
use Event::*;

let event = tokio::select! {
response = &mut task => break response,

stdin = rx.recv(), if stdin_tx.is_some() => Stdin(stdin),

Some(stdout) = stdout_rx.recv() => Stdout(stdout),

Some(stderr) = stderr_rx.recv() => Stderr(stderr),

Some(status) = status_rx.next() => Status(status)
};

match event {
Stdin(Some(stdin)) => {
stdin_tx
.as_ref()
.unwrap(/* This is a precondition */)
.send(stdin)
.await
.drop_error_details()
.context(StdinSnafu)?;
}
Stdin(None) => {
let stdin_tx = stdin_tx.take();
drop(stdin_tx); // Signal closed
}

Some(stdout) = stdout_rx.recv() => {
Stdout(stdout) => {
let sent = send_stdout(stdout).await;
abandon_if_closed!(sent);
},
}

Some(stderr) = stderr_rx.recv() => {
Stderr(stderr) => {
let sent = send_stderr(stderr).await;
abandon_if_closed!(sent);
},
}

Some(status) = status_rx.next() => {
Status(status) => {
if !reported && status.total_time_secs > 60.0 {
error!("Request consumed more than 60s of CPU time: {req:?}");
reported = true;
}

let payload = status.into();
let meta = meta.clone();
let sent = tx.send(Ok(MessageResponse::ExecuteStatus { payload, meta })).await;
let sent = tx
.send(Ok(MessageResponse::ExecuteStatus { payload, meta }))
.await;
abandon_if_closed!(sent);
}
}
Expand Down