Skip to content

Commit c1aee2b

Browse files
committed
Change uses of join! to try_join!
1 parent 8b3e32d commit c1aee2b

File tree

1 file changed

+94
-50
lines changed

1 file changed

+94
-50
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 94 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use futures::{
2-
future::{BoxFuture, OptionFuture},
3-
stream::BoxStream,
4-
Future, FutureExt, Stream, StreamExt,
5-
};
1+
use futures::{future::BoxFuture, stream::BoxStream, Future, FutureExt, Stream, StreamExt};
62
use serde::Deserialize;
73
use snafu::prelude::*;
84
use std::{
@@ -16,12 +12,12 @@ use std::{
1612
time::Duration,
1713
};
1814
use tokio::{
19-
join,
2015
process::{Child, ChildStdin, ChildStdout, Command},
2116
select,
2217
sync::{mpsc, oneshot, OnceCell},
2318
task::{JoinHandle, JoinSet},
2419
time::{self, MissedTickBehavior},
20+
try_join,
2521
};
2622
use tokio_stream::wrappers::ReceiverStream;
2723
use tokio_util::{io::SyncIoBridge, sync::CancellationToken};
@@ -799,11 +795,10 @@ impl<T> WithOutput<T> {
799795
where
800796
F: Future<Output = Result<T, E>>,
801797
{
802-
let stdout = stdout_rx.collect();
803-
let stderr = stderr_rx.collect();
798+
let stdout = stdout_rx.collect().map(Ok);
799+
let stderr = stderr_rx.collect().map(Ok);
804800

805-
let (result, stdout, stderr) = join!(task, stdout, stderr);
806-
let response = result?;
801+
let (response, stdout, stderr) = try_join!(task, stdout, stderr)?;
807802

808803
Ok(WithOutput {
809804
response,
@@ -936,11 +931,11 @@ where
936931
c.versions().await.map_err(VersionsChannelError::from)
937932
});
938933

939-
let (stable, beta, nightly) = join!(stable, beta, nightly);
934+
let stable = async { stable.await.context(StableSnafu) };
935+
let beta = async { beta.await.context(BetaSnafu) };
936+
let nightly = async { nightly.await.context(NightlySnafu) };
940937

941-
let stable = stable.context(StableSnafu)?;
942-
let beta = beta.context(BetaSnafu)?;
943-
let nightly = nightly.context(NightlySnafu)?;
938+
let (stable, beta, nightly) = try_join!(stable, beta, nightly)?;
944939

945940
Ok(Versions {
946941
stable,
@@ -1128,16 +1123,17 @@ where
11281123
let token = mem::take(token);
11291124
token.cancel();
11301125

1131-
let channels =
1132-
[stable, beta, nightly].map(|c| OptionFuture::from(c.take().map(|c| c.shutdown())));
1126+
let channels = [stable, beta, nightly].map(|c| async {
1127+
match c.take() {
1128+
Some(c) => c.shutdown().await,
1129+
_ => Ok(()),
1130+
}
1131+
});
11331132

11341133
let [stable, beta, nightly] = channels;
11351134

1136-
let (stable, beta, nightly) = join!(stable, beta, nightly);
1137-
1138-
stable.transpose()?;
1139-
beta.transpose()?;
1140-
nightly.transpose()?;
1135+
let (stable, beta, nightly) = try_join!(stable, beta, nightly)?;
1136+
let _: [(); 3] = [stable, beta, nightly];
11411137

11421138
Ok(())
11431139
}
@@ -1196,14 +1192,28 @@ impl Container {
11961192

11971193
let task = tokio::spawn(
11981194
async move {
1199-
let (c, d, t) = join!(child.wait(), demultiplex_task, tasks.join_next());
1195+
let child = async {
1196+
let _: std::process::ExitStatus =
1197+
child.wait().await.context(JoinWorkerSnafu)?;
1198+
Ok(())
1199+
};
12001200

1201-
c.context(JoinWorkerSnafu)?;
1202-
d.context(DemultiplexerTaskPanickedSnafu)?
1203-
.context(DemultiplexerTaskFailedSnafu)?;
1204-
if let Some(t) = t {
1205-
t.context(IoQueuePanickedSnafu)??;
1206-
}
1201+
let demultiplex_task = async {
1202+
demultiplex_task
1203+
.await
1204+
.context(DemultiplexerTaskPanickedSnafu)?
1205+
.context(DemultiplexerTaskFailedSnafu)
1206+
};
1207+
1208+
let task = async {
1209+
if let Some(t) = tasks.join_next().await {
1210+
t.context(IoQueuePanickedSnafu)??;
1211+
}
1212+
Ok(())
1213+
};
1214+
1215+
let (c, d, t) = try_join!(child, demultiplex_task, task)?;
1216+
let _: [(); 3] = [c, d, t];
12071217

12081218
Ok(())
12091219
}
@@ -1234,19 +1244,41 @@ impl Container {
12341244

12351245
let token = CancellationToken::new();
12361246

1237-
let rustc = self.rustc_version(token.clone());
1238-
let rustfmt = self.tool_version(token.clone(), "fmt");
1239-
let clippy = self.tool_version(token.clone(), "clippy");
1240-
let miri = self.tool_version(token, "miri");
1247+
let rustc = {
1248+
let token = token.clone();
1249+
async {
1250+
self.rustc_version(token)
1251+
.await
1252+
.context(RustcSnafu)?
1253+
.context(RustcMissingSnafu)
1254+
}
1255+
};
1256+
let rustfmt = {
1257+
let token = token.clone();
1258+
async {
1259+
self.tool_version(token, "fmt")
1260+
.await
1261+
.context(RustfmtSnafu)?
1262+
.context(RustfmtMissingSnafu)
1263+
}
1264+
};
1265+
let clippy = {
1266+
let token = token.clone();
1267+
async {
1268+
self.tool_version(token, "clippy")
1269+
.await
1270+
.context(ClippySnafu)?
1271+
.context(ClippyMissingSnafu)
1272+
}
1273+
};
1274+
let miri = {
1275+
let token = token.clone();
1276+
async { self.tool_version(token, "miri").await.context(MiriSnafu) }
1277+
};
12411278

1242-
let (rustc, rustfmt, clippy, miri) = join!(rustc, rustfmt, clippy, miri);
1279+
let _token = token.drop_guard();
12431280

1244-
let rustc = rustc.context(RustcSnafu)?.context(RustcMissingSnafu)?;
1245-
let rustfmt = rustfmt
1246-
.context(RustfmtSnafu)?
1247-
.context(RustfmtMissingSnafu)?;
1248-
let clippy = clippy.context(ClippySnafu)?.context(ClippyMissingSnafu)?;
1249-
let miri = miri.context(MiriSnafu)?;
1281+
let (rustc, rustfmt, clippy, miri) = try_join!(rustc, rustfmt, clippy, miri)?;
12501282

12511283
Ok(ChannelVersions {
12521284
rustc,
@@ -1711,21 +1743,33 @@ impl Container {
17111743
) -> Result<SpawnCargo, DoRequestError> {
17121744
use do_request_error::*;
17131745

1714-
let delete_previous_main = request.delete_previous_main_request();
1715-
let write_main = request.write_main_request();
1716-
let execute_cargo = request.execute_cargo_request();
1746+
let delete_previous_main = async {
1747+
self.commander
1748+
.one(request.delete_previous_main_request())
1749+
.await
1750+
.context(CouldNotDeletePreviousCodeSnafu)
1751+
.map(drop::<crate::message::DeleteFileResponse>)
1752+
};
17171753

1718-
let delete_previous_main = self.commander.one(delete_previous_main);
1719-
let write_main = self.commander.one(write_main);
1720-
let modify_cargo_toml = self.modify_cargo_toml.modify_for(&request);
1754+
let write_main = async {
1755+
self.commander
1756+
.one(request.write_main_request())
1757+
.await
1758+
.context(CouldNotWriteCodeSnafu)
1759+
.map(drop::<crate::message::WriteFileResponse>)
1760+
};
17211761

1722-
let (delete_previous_main, write_main, modify_cargo_toml) =
1723-
join!(delete_previous_main, write_main, modify_cargo_toml);
1762+
let modify_cargo_toml = async {
1763+
self.modify_cargo_toml
1764+
.modify_for(&request)
1765+
.await
1766+
.context(CouldNotModifyCargoTomlSnafu)
1767+
};
17241768

1725-
delete_previous_main.context(CouldNotDeletePreviousCodeSnafu)?;
1726-
write_main.context(CouldNotWriteCodeSnafu)?;
1727-
modify_cargo_toml.context(CouldNotModifyCargoTomlSnafu)?;
1769+
let (d, w, m) = try_join!(delete_previous_main, write_main, modify_cargo_toml)?;
1770+
let _: [(); 3] = [d, w, m];
17281771

1772+
let execute_cargo = request.execute_cargo_request();
17291773
self.spawn_cargo_task(token, execute_cargo)
17301774
.await
17311775
.context(CouldNotStartCargoSnafu)

0 commit comments

Comments
 (0)