diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index f8a60ce4..00bfd8bd 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -930,7 +930,7 @@ impl Coordinator where B: Backend, { - pub fn new(limits: Arc, backend: B) -> Self { + fn new(limits: Arc, backend: B) -> Self { let token = CancelOnDrop(CancellationToken::new()); Self { @@ -1221,9 +1221,8 @@ impl Container { } = spawn_io_queue(stdin, stdout, token); let (command_tx, command_rx) = mpsc::channel(8); - let demultiplex_task = - tokio::spawn(Commander::demultiplex(command_rx, from_worker_rx).in_current_span()) - .abort_on_drop(); + let demultiplex = Commander::demultiplex(command_rx, from_worker_rx); + let demultiplex_task = tokio::spawn(demultiplex.in_current_span()).abort_on_drop(); let task = tokio::spawn( async move { @@ -1842,18 +1841,33 @@ impl Container { let mut stdin_open = true; loop { - select! { - () = &mut cancelled => { + enum Event { + Cancelled, + Stdin(Option), + FromWorker(WorkerMessage), + } + use Event::*; + + let event = select! { + () = &mut cancelled => Cancelled, + + stdin = stdin_rx.recv(), if stdin_open => Stdin(stdin), + + Some(container_msg) = from_worker_rx.recv() => FromWorker(container_msg), + + else => return UnexpectedEndOfMessagesSnafu.fail(), + }; + + match event { + Cancelled => { let msg = CoordinatorMessage::Kill; trace!(msg_name = msg.as_ref(), "processing"); to_worker_tx.send(msg).await.context(KillSnafu)?; - }, + } - stdin = stdin_rx.recv(), if stdin_open => { + Stdin(stdin) => { let msg = match stdin { - Some(stdin) => { - CoordinatorMessage::StdinPacket(stdin) - } + Some(stdin) => CoordinatorMessage::StdinPacket(stdin), None => { stdin_open = false; @@ -1863,9 +1877,9 @@ impl Container { trace!(msg_name = msg.as_ref(), "processing"); to_worker_tx.send(msg).await.context(StdinSnafu)?; - }, + } - Some(container_msg) = from_worker_rx.recv() => { + FromWorker(container_msg) => { trace!(msg_name = container_msg.as_ref(), "processing"); match container_msg { @@ -1885,20 +1899,18 @@ impl Container { status_tx.send(stats).await.ok(/* Receiver gone, that's OK */); } - WorkerMessage::Error(e) => - return Err(SerializedError2::adapt(e)).context(WorkerSnafu), + WorkerMessage::Error(e) => { + return Err(SerializedError2::adapt(e)).context(WorkerSnafu); + } - WorkerMessage::Error2(e) => - return Err(e).context(WorkerSnafu), + WorkerMessage::Error2(e) => return Err(e).context(WorkerSnafu), _ => { let message = container_msg.as_ref(); - return UnexpectedMessageSnafu { message }.fail() - }, + return UnexpectedMessageSnafu { message }.fail(); + } } - }, - - else => return UnexpectedEndOfMessagesSnafu.fail(), + } } } } @@ -2426,19 +2438,12 @@ impl Commander { continue; } - warn!(job_id, "no listener to notify"); + warn!(job_id, msg_name = msg.as_ref(), "no listener to notify"); } Gc => { - waiting = mem::take(&mut waiting) - .into_iter() - .filter(|(_job_id, tx)| !tx.is_closed()) - .collect(); - - waiting_once = mem::take(&mut waiting_once) - .into_iter() - .filter(|(_job_id, tx)| !tx.is_closed()) - .collect(); + waiting.retain(|_job_id, tx| !tx.is_closed()); + waiting_once.retain(|_job_id, tx| !tx.is_closed()); } } } @@ -2756,10 +2761,6 @@ impl Backend for DockerBackend { .args(["--name", &name]) .arg("-i") .args(["-a", "stdin", "-a", "stdout", "-a", "stderr"]) - // PLAYGROUND_ORCHESTRATOR is vestigial; I'm leaving it - // for a bit to allow new containers to get built and - // distributed. - .args(["-e", "PLAYGROUND_ORCHESTRATOR=1"]) .arg("--rm") .arg(channel.to_container_name()) .arg("worker") diff --git a/ui/src/request_database.rs b/ui/src/request_database.rs index b5d65d08..0a5b40c8 100644 --- a/ui/src/request_database.rs +++ b/ui/src/request_database.rs @@ -6,7 +6,7 @@ use tokio::{ sync::{mpsc, oneshot}, task, }; -use tracing::warn; +use tracing::{info, warn, Instrument as _}; pub struct Database { db: Connection, @@ -172,7 +172,11 @@ impl Handle { .drop_error_details() .context(SendStartRequestSnafu)?; - rx.await.context(RecvStartRequestSnafu)?.map_err(Into::into) + let id = rx.await.context(RecvStartRequestSnafu)??; + + info!(request_id = id.0, "Started request"); + + Ok(id) } async fn attempt_start_request( @@ -198,7 +202,11 @@ impl Handle { .drop_error_details() .context(SendEndRequestSnafu)?; - rx.await.context(RecvEndRequestSnafu)?.map_err(Into::into) + rx.await.context(RecvEndRequestSnafu)??; + + info!(request_id = id.0, "Ended request"); + + Ok(()) } async fn attempt_end_request(&self, id: Id, how: How) { @@ -245,7 +253,7 @@ impl Drop for EndGuardInner { let Self(id, how, ref mut handle) = *self; if let Ok(h) = tokio::runtime::Handle::try_current() { if let Some(handle) = handle.take() { - h.spawn(async move { handle.attempt_end_request(id, how).await }); + h.spawn(async move { handle.attempt_end_request(id, how).await }.in_current_span()); } } } diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index 27a0a0be..a375caa9 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -15,7 +15,6 @@ use snafu::prelude::*; use std::{ collections::BTreeMap, convert::TryFrom, - mem, pin::pin, sync::{ atomic::{AtomicU64, Ordering}, @@ -470,14 +469,12 @@ async fn handle_core( } GarbageCollection => { - active_executions = mem::take(&mut active_executions) - .into_iter() - .filter(|(_id, (_, tx))| tx.as_ref().is_some_and(|tx| !tx.is_closed())) - .collect(); + active_executions + .retain(|_id, (_, tx)| tx.as_ref().is_some_and(|tx| !tx.is_closed())); } IdleTimeout | IdleRequest => { - if let IdleRequest = event { + if matches!(event, IdleRequest) { info!("Container requested to idle"); } @@ -721,38 +718,52 @@ async fn handle_execute_inner( let mut reported = false; let status = loop { - tokio::select! { - status = &mut task => break status, - - stdin = rx.recv(), if stdin_tx.is_some() => { - match stdin { - Some(stdin) => { - stdin_tx - .as_ref() - .unwrap(/* This is a precondition */) - .send(stdin) - .await - .drop_error_details() - .context(StdinSnafu)?; - } - None => { - let stdin_tx = stdin_tx.take(); - drop(stdin_tx); // Signal closed - } - } + enum Event { + Stdin(Option), + Stdout(String), + Stderr(String), + Status(coordinator::ExecuteStatus), + } + use Event::*; + + let event = tokio::select! { + response = &mut task => break response, + + stdin = rx.recv(), if stdin_tx.is_some() => Stdin(stdin), + + Some(stdout) = stdout_rx.recv() => Stdout(stdout), + + Some(stderr) = stderr_rx.recv() => Stderr(stderr), + + Some(status) = status_rx.next() => Status(status) + }; + + match event { + Stdin(Some(stdin)) => { + stdin_tx + .as_ref() + .unwrap(/* This is a precondition */) + .send(stdin) + .await + .drop_error_details() + .context(StdinSnafu)?; + } + Stdin(None) => { + let stdin_tx = stdin_tx.take(); + drop(stdin_tx); // Signal closed } - Some(stdout) = stdout_rx.recv() => { + Stdout(stdout) => { let sent = send_stdout(stdout).await; abandon_if_closed!(sent); - }, + } - Some(stderr) = stderr_rx.recv() => { + Stderr(stderr) => { let sent = send_stderr(stderr).await; abandon_if_closed!(sent); - }, + } - Some(status) = status_rx.next() => { + Status(status) => { if !reported && status.total_time_secs > 60.0 { error!("Request consumed more than 60s of CPU time: {req:?}"); reported = true; @@ -760,7 +771,9 @@ async fn handle_execute_inner( let payload = status.into(); let meta = meta.clone(); - let sent = tx.send(Ok(MessageResponse::ExecuteStatus { payload, meta })).await; + let sent = tx + .send(Ok(MessageResponse::ExecuteStatus { payload, meta })) + .await; abandon_if_closed!(sent); } }