Skip to content

Commit 2986469

Browse files
authored
Merge pull request #1166 from rust-lang/poking
Minor cleanups and additional logging from re-reading code
2 parents 4fe5162 + 096986d commit 2986469

File tree

3 files changed

+93
-71
lines changed

3 files changed

+93
-71
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,7 @@ impl<B> Coordinator<B>
930930
where
931931
B: Backend,
932932
{
933-
pub fn new(limits: Arc<dyn ResourceLimits>, backend: B) -> Self {
933+
fn new(limits: Arc<dyn ResourceLimits>, backend: B) -> Self {
934934
let token = CancelOnDrop(CancellationToken::new());
935935

936936
Self {
@@ -1221,9 +1221,8 @@ impl Container {
12211221
} = spawn_io_queue(stdin, stdout, token);
12221222

12231223
let (command_tx, command_rx) = mpsc::channel(8);
1224-
let demultiplex_task =
1225-
tokio::spawn(Commander::demultiplex(command_rx, from_worker_rx).in_current_span())
1226-
.abort_on_drop();
1224+
let demultiplex = Commander::demultiplex(command_rx, from_worker_rx);
1225+
let demultiplex_task = tokio::spawn(demultiplex.in_current_span()).abort_on_drop();
12271226

12281227
let task = tokio::spawn(
12291228
async move {
@@ -1842,18 +1841,33 @@ impl Container {
18421841
let mut stdin_open = true;
18431842

18441843
loop {
1845-
select! {
1846-
() = &mut cancelled => {
1844+
enum Event {
1845+
Cancelled,
1846+
Stdin(Option<String>),
1847+
FromWorker(WorkerMessage),
1848+
}
1849+
use Event::*;
1850+
1851+
let event = select! {
1852+
() = &mut cancelled => Cancelled,
1853+
1854+
stdin = stdin_rx.recv(), if stdin_open => Stdin(stdin),
1855+
1856+
Some(container_msg) = from_worker_rx.recv() => FromWorker(container_msg),
1857+
1858+
else => return UnexpectedEndOfMessagesSnafu.fail(),
1859+
};
1860+
1861+
match event {
1862+
Cancelled => {
18471863
let msg = CoordinatorMessage::Kill;
18481864
trace!(msg_name = msg.as_ref(), "processing");
18491865
to_worker_tx.send(msg).await.context(KillSnafu)?;
1850-
},
1866+
}
18511867

1852-
stdin = stdin_rx.recv(), if stdin_open => {
1868+
Stdin(stdin) => {
18531869
let msg = match stdin {
1854-
Some(stdin) => {
1855-
CoordinatorMessage::StdinPacket(stdin)
1856-
}
1870+
Some(stdin) => CoordinatorMessage::StdinPacket(stdin),
18571871

18581872
None => {
18591873
stdin_open = false;
@@ -1863,9 +1877,9 @@ impl Container {
18631877

18641878
trace!(msg_name = msg.as_ref(), "processing");
18651879
to_worker_tx.send(msg).await.context(StdinSnafu)?;
1866-
},
1880+
}
18671881

1868-
Some(container_msg) = from_worker_rx.recv() => {
1882+
FromWorker(container_msg) => {
18691883
trace!(msg_name = container_msg.as_ref(), "processing");
18701884

18711885
match container_msg {
@@ -1885,20 +1899,18 @@ impl Container {
18851899
status_tx.send(stats).await.ok(/* Receiver gone, that's OK */);
18861900
}
18871901

1888-
WorkerMessage::Error(e) =>
1889-
return Err(SerializedError2::adapt(e)).context(WorkerSnafu),
1902+
WorkerMessage::Error(e) => {
1903+
return Err(SerializedError2::adapt(e)).context(WorkerSnafu);
1904+
}
18901905

1891-
WorkerMessage::Error2(e) =>
1892-
return Err(e).context(WorkerSnafu),
1906+
WorkerMessage::Error2(e) => return Err(e).context(WorkerSnafu),
18931907

18941908
_ => {
18951909
let message = container_msg.as_ref();
1896-
return UnexpectedMessageSnafu { message }.fail()
1897-
},
1910+
return UnexpectedMessageSnafu { message }.fail();
1911+
}
18981912
}
1899-
},
1900-
1901-
else => return UnexpectedEndOfMessagesSnafu.fail(),
1913+
}
19021914
}
19031915
}
19041916
}
@@ -2426,19 +2438,12 @@ impl Commander {
24262438
continue;
24272439
}
24282440

2429-
warn!(job_id, "no listener to notify");
2441+
warn!(job_id, msg_name = msg.as_ref(), "no listener to notify");
24302442
}
24312443

24322444
Gc => {
2433-
waiting = mem::take(&mut waiting)
2434-
.into_iter()
2435-
.filter(|(_job_id, tx)| !tx.is_closed())
2436-
.collect();
2437-
2438-
waiting_once = mem::take(&mut waiting_once)
2439-
.into_iter()
2440-
.filter(|(_job_id, tx)| !tx.is_closed())
2441-
.collect();
2445+
waiting.retain(|_job_id, tx| !tx.is_closed());
2446+
waiting_once.retain(|_job_id, tx| !tx.is_closed());
24422447
}
24432448
}
24442449
}
@@ -2756,10 +2761,6 @@ impl Backend for DockerBackend {
27562761
.args(["--name", &name])
27572762
.arg("-i")
27582763
.args(["-a", "stdin", "-a", "stdout", "-a", "stderr"])
2759-
// PLAYGROUND_ORCHESTRATOR is vestigial; I'm leaving it
2760-
// for a bit to allow new containers to get built and
2761-
// distributed.
2762-
.args(["-e", "PLAYGROUND_ORCHESTRATOR=1"])
27632764
.arg("--rm")
27642765
.arg(channel.to_container_name())
27652766
.arg("worker")

ui/src/request_database.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio::{
66
sync::{mpsc, oneshot},
77
task,
88
};
9-
use tracing::warn;
9+
use tracing::{info, warn, Instrument as _};
1010

1111
pub struct Database {
1212
db: Connection,
@@ -172,7 +172,11 @@ impl Handle {
172172
.drop_error_details()
173173
.context(SendStartRequestSnafu)?;
174174

175-
rx.await.context(RecvStartRequestSnafu)?.map_err(Into::into)
175+
let id = rx.await.context(RecvStartRequestSnafu)??;
176+
177+
info!(request_id = id.0, "Started request");
178+
179+
Ok(id)
176180
}
177181

178182
async fn attempt_start_request(
@@ -198,7 +202,11 @@ impl Handle {
198202
.drop_error_details()
199203
.context(SendEndRequestSnafu)?;
200204

201-
rx.await.context(RecvEndRequestSnafu)?.map_err(Into::into)
205+
rx.await.context(RecvEndRequestSnafu)??;
206+
207+
info!(request_id = id.0, "Ended request");
208+
209+
Ok(())
202210
}
203211

204212
async fn attempt_end_request(&self, id: Id, how: How) {
@@ -245,7 +253,7 @@ impl Drop for EndGuardInner {
245253
let Self(id, how, ref mut handle) = *self;
246254
if let Ok(h) = tokio::runtime::Handle::try_current() {
247255
if let Some(handle) = handle.take() {
248-
h.spawn(async move { handle.attempt_end_request(id, how).await });
256+
h.spawn(async move { handle.attempt_end_request(id, how).await }.in_current_span());
249257
}
250258
}
251259
}

ui/src/server_axum/websocket.rs

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use snafu::prelude::*;
1515
use std::{
1616
collections::BTreeMap,
1717
convert::TryFrom,
18-
mem,
1918
pin::pin,
2019
sync::{
2120
atomic::{AtomicU64, Ordering},
@@ -470,14 +469,12 @@ async fn handle_core(
470469
}
471470

472471
GarbageCollection => {
473-
active_executions = mem::take(&mut active_executions)
474-
.into_iter()
475-
.filter(|(_id, (_, tx))| tx.as_ref().is_some_and(|tx| !tx.is_closed()))
476-
.collect();
472+
active_executions
473+
.retain(|_id, (_, tx)| tx.as_ref().is_some_and(|tx| !tx.is_closed()));
477474
}
478475

479476
IdleTimeout | IdleRequest => {
480-
if let IdleRequest = event {
477+
if matches!(event, IdleRequest) {
481478
info!("Container requested to idle");
482479
}
483480

@@ -721,46 +718,62 @@ async fn handle_execute_inner(
721718
let mut reported = false;
722719

723720
let status = loop {
724-
tokio::select! {
725-
status = &mut task => break status,
726-
727-
stdin = rx.recv(), if stdin_tx.is_some() => {
728-
match stdin {
729-
Some(stdin) => {
730-
stdin_tx
731-
.as_ref()
732-
.unwrap(/* This is a precondition */)
733-
.send(stdin)
734-
.await
735-
.drop_error_details()
736-
.context(StdinSnafu)?;
737-
}
738-
None => {
739-
let stdin_tx = stdin_tx.take();
740-
drop(stdin_tx); // Signal closed
741-
}
742-
}
721+
enum Event {
722+
Stdin(Option<String>),
723+
Stdout(String),
724+
Stderr(String),
725+
Status(coordinator::ExecuteStatus),
726+
}
727+
use Event::*;
728+
729+
let event = tokio::select! {
730+
response = &mut task => break response,
731+
732+
stdin = rx.recv(), if stdin_tx.is_some() => Stdin(stdin),
733+
734+
Some(stdout) = stdout_rx.recv() => Stdout(stdout),
735+
736+
Some(stderr) = stderr_rx.recv() => Stderr(stderr),
737+
738+
Some(status) = status_rx.next() => Status(status)
739+
};
740+
741+
match event {
742+
Stdin(Some(stdin)) => {
743+
stdin_tx
744+
.as_ref()
745+
.unwrap(/* This is a precondition */)
746+
.send(stdin)
747+
.await
748+
.drop_error_details()
749+
.context(StdinSnafu)?;
750+
}
751+
Stdin(None) => {
752+
let stdin_tx = stdin_tx.take();
753+
drop(stdin_tx); // Signal closed
743754
}
744755

745-
Some(stdout) = stdout_rx.recv() => {
756+
Stdout(stdout) => {
746757
let sent = send_stdout(stdout).await;
747758
abandon_if_closed!(sent);
748-
},
759+
}
749760

750-
Some(stderr) = stderr_rx.recv() => {
761+
Stderr(stderr) => {
751762
let sent = send_stderr(stderr).await;
752763
abandon_if_closed!(sent);
753-
},
764+
}
754765

755-
Some(status) = status_rx.next() => {
766+
Status(status) => {
756767
if !reported && status.total_time_secs > 60.0 {
757768
error!("Request consumed more than 60s of CPU time: {req:?}");
758769
reported = true;
759770
}
760771

761772
let payload = status.into();
762773
let meta = meta.clone();
763-
let sent = tx.send(Ok(MessageResponse::ExecuteStatus { payload, meta })).await;
774+
let sent = tx
775+
.send(Ok(MessageResponse::ExecuteStatus { payload, meta }))
776+
.await;
764777
abandon_if_closed!(sent);
765778
}
766779
}

0 commit comments

Comments
 (0)