Skip to content

Commit e816a30

Browse files
Nero5023facebook-github-bot
authored andcommitted
Add support for stdout output for superconsole emitting.
Summary: RFC: https://fb.workplace.com/groups/buck2dev/permalink/3976662752621767/ Previously all output of superconsole is stderr by default. This diff add support to emit stdout above the canvas like we emit stderr. ## Why we need output stdout support in superconsole Superconsole writes to stderr by default. If stdout isn’t managed by it, it would get cleared or users need to stop superconsole. In Buck2 BXL, streaming output is sent to stdout during execution, and users often need to listen on it. At the same time, users still want to see Superconsole’s progress updates. ## This diff 1. adds a `aux_stream` to `NonBlockingSuperConsoleOutput` and `BlockingSuperConsoleOutput` 2. For trait `SuperConsoleOutput` adds `output_to` methods to output to different stream 3. In `SuperConsole`, it adds a new buffer `aux_to_emit` for aux output (stdout) and an api `emit_aux` 4. In render of superconsle, output the stdout when `aux_to_emit` is not emtpy. Note: right now when stdout is not compatible with tty, it hasn't handle correctly. Handle this issue at D73564704 Reviewed By: cjhopman Differential Revision: D73534481 fbshipit-source-id: 38d99da2a8934b0aafb292d7f3697523ba4b12d1
1 parent 6f337b1 commit e816a30

File tree

4 files changed

+175
-69
lines changed

4 files changed

+175
-69
lines changed

src/builder.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use crate::output::SuperConsoleOutput;
1919
/// A builder to create SuperConsole, with more options.
2020
pub struct Builder {
2121
non_blocking: bool,
22+
// The stream that superconsole writes to by default (emit output + canvas). By default is stderr.
2223
stream: Box<dyn Write + Send + 'static + Sync>,
24+
// The stream that superconsole writes to for auxiliary output. By default is stdout.
25+
aux_stream: Box<dyn Write + Send + 'static + Sync>,
2326
}
2427

2528
impl Default for Builder {
@@ -33,6 +36,7 @@ impl Builder {
3336
Self {
3437
non_blocking: false,
3538
stream: Box::new(io::stderr()),
39+
aux_stream: Box::new(io::stdout()),
3640
}
3741
}
3842

@@ -62,14 +66,21 @@ impl Builder {
6266
}
6367

6468
fn build_inner(self, fallback_size: Option<Dimensions>) -> anyhow::Result<SuperConsole> {
65-
Ok(SuperConsole::new_with_output(fallback_size, self.output()?))
69+
let output = self.output()?;
70+
Ok(SuperConsole::new_with_output(fallback_size, output))
6671
}
6772

6873
fn output(self) -> anyhow::Result<Box<dyn SuperConsoleOutput>> {
6974
if self.non_blocking {
70-
Ok(Box::new(NonBlockingSuperConsoleOutput::new(self.stream)?))
75+
Ok(Box::new(NonBlockingSuperConsoleOutput::new(
76+
self.stream,
77+
self.aux_stream,
78+
)?))
7179
} else {
72-
Ok(Box::new(BlockingSuperConsoleOutput::new(self.stream)))
80+
Ok(Box::new(BlockingSuperConsoleOutput::new(
81+
self.stream,
82+
self.aux_stream,
83+
)))
7384
}
7485
}
7586
}

src/content/lines.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -285,20 +285,27 @@ impl Lines {
285285
/// If a limit is specified, no more than that amount will be drained.
286286
/// The limit is on the number of *lines*, **NOT** the number of *bytes*.
287287
/// Care should be taken with calling a limit of 0 - this will cause no lines to render and the buffer to never be drained.
288+
///
289+
/// Returns the remain limit after rendering. If the limit is None, means no limit
288290
pub(crate) fn render_with_limit(
289291
&mut self,
290292
writer: &mut Vec<u8>,
291293
limit: Option<usize>,
292-
) -> anyhow::Result<()> {
293-
let limit = limit.unwrap_or(self.len());
294-
let amt = cmp::min(limit, self.len());
294+
) -> anyhow::Result<Option<usize>> {
295+
let output_limit = limit.unwrap_or(self.len());
296+
let amt = cmp::min(output_limit, self.len());
295297
for line in self.0.drain(..amt) {
296298
line.render_with_clear_and_nl(writer)?;
297299
}
298-
Ok(())
300+
if limit.is_some() {
301+
Ok(Some(output_limit - amt))
302+
} else {
303+
// if the original limit was None, it means no limit, so just return None meaning no limit
304+
Ok(None)
305+
}
299306
}
300307

301-
/// Formats and renders all lines to `stdout`.
308+
/// Formats and renders all lines to `buffer`.
302309
/// Notably, this *queues* the lines for rendering. You must flush the buffer.
303310
pub(crate) fn render_from_line(
304311
&self,

src/output.rs

Lines changed: 90 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ use crossbeam_channel::unbounded;
2020

2121
use crate::Dimensions;
2222

23+
/// Represents the target stream for output
24+
#[derive(Copy, Clone, Debug)]
25+
pub enum OutputTarget {
26+
/// Main output stream (default: stderr)
27+
Main,
28+
/// Auxiliary output stream (default: stdout)
29+
Aux,
30+
}
31+
2332
pub trait SuperConsoleOutput: Send + Sync + 'static {
2433
/// Called before rendering will occur. This has a chance to prevent rendering by returning
2534
/// false.
@@ -29,6 +38,14 @@ pub trait SuperConsoleOutput: Send + Sync + 'static {
2938
/// clearing. This should flush if possible.
3039
fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()>;
3140

41+
/// Called to produce to a specific output target.
42+
///This may be called without should_render if we are finalizing or clearing. This should flush if possible.
43+
/// Default implementation sends all to main output for backwards compatibility
44+
fn output_to(&mut self, buffer: Vec<u8>, target: OutputTarget) -> anyhow::Result<()> {
45+
let _ = target;
46+
self.output(buffer)
47+
}
48+
3249
/// How big is the terminal to write to.
3350
fn terminal_size(&self) -> anyhow::Result<Dimensions> {
3451
Ok(crossterm::terminal::size()?.into())
@@ -48,11 +65,16 @@ pub trait SuperConsoleOutput: Send + Sync + 'static {
4865
pub struct BlockingSuperConsoleOutput {
4966
/// Stream to write to.
5067
stream: Box<dyn Write + Send + 'static + Sync>,
68+
/// Auxiliary stream to write to.
69+
aux_stream: Box<dyn Write + Send + 'static + Sync>,
5170
}
5271

5372
impl BlockingSuperConsoleOutput {
54-
pub fn new(stream: Box<dyn Write + Send + 'static + Sync>) -> Self {
55-
Self { stream }
73+
pub fn new(
74+
stream: Box<dyn Write + Send + 'static + Sync>,
75+
aux_stream: Box<dyn Write + Send + 'static + Sync>,
76+
) -> Self {
77+
Self { stream, aux_stream }
5678
}
5779
}
5880

@@ -62,9 +84,20 @@ impl SuperConsoleOutput for BlockingSuperConsoleOutput {
6284
}
6385

6486
fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()> {
65-
self.stream.write_all(&buffer)?;
66-
self.stream.flush()?;
87+
self.output_to(buffer, OutputTarget::Main)
88+
}
6789

90+
fn output_to(&mut self, buffer: Vec<u8>, target: OutputTarget) -> anyhow::Result<()> {
91+
match target {
92+
OutputTarget::Main => {
93+
self.stream.write_all(&buffer)?;
94+
self.stream.flush()?;
95+
}
96+
OutputTarget::Aux => {
97+
self.aux_stream.write_all(&buffer)?;
98+
self.aux_stream.flush()?;
99+
}
100+
}
68101
Ok(())
69102
}
70103

@@ -88,7 +121,7 @@ impl SuperConsoleOutput for BlockingSuperConsoleOutput {
88121
/// - When an error occurs, the next fallible call will return it.
89122
pub(crate) struct NonBlockingSuperConsoleOutput {
90123
/// A channel to send frames for writing.
91-
sender: Sender<Vec<u8>>,
124+
sender: Sender<(Vec<u8>, OutputTarget)>,
92125
/// A channel back for errors encountered by the thread doing the writing.
93126
errors: Receiver<io::Error>,
94127
/// The thread doing the writing. It owns the other end of the aforementioned channels and will
@@ -97,19 +130,29 @@ pub(crate) struct NonBlockingSuperConsoleOutput {
97130
}
98131

99132
impl NonBlockingSuperConsoleOutput {
100-
pub fn new(stream: Box<dyn Write + Send + 'static + Sync>) -> anyhow::Result<Self> {
101-
Self::new_for_writer(stream)
133+
pub fn new(
134+
stream: Box<dyn Write + Send + 'static + Sync>,
135+
aux_stream: Box<dyn Write + Send + 'static + Sync>,
136+
) -> anyhow::Result<Self> {
137+
Self::new_for_writer(stream, aux_stream)
102138
}
103139

104-
fn new_for_writer(mut stream: Box<dyn Write + Send + 'static + Sync>) -> anyhow::Result<Self> {
105-
let (sender, receiver) = bounded::<Vec<u8>>(1);
140+
fn new_for_writer(
141+
mut stream: Box<dyn Write + Send + 'static + Sync>,
142+
mut aux_stream: Box<dyn Write + Send + 'static + Sync>,
143+
) -> anyhow::Result<Self> {
144+
let (sender, receiver) = bounded::<(Vec<u8>, OutputTarget)>(1);
106145
let (error_sender, errors) = unbounded::<io::Error>();
107146

108147
let handle = std::thread::Builder::new()
109148
.name("superconsole-io".to_owned())
110149
.spawn(move || {
111-
for frame in receiver.into_iter() {
112-
match stream.write_all(&frame).and_then(|()| stream.flush()) {
150+
for (data, output_target) in receiver.into_iter() {
151+
let out_stream = match output_target {
152+
OutputTarget::Main => &mut stream,
153+
OutputTarget::Aux => &mut aux_stream,
154+
};
155+
match out_stream.write_all(&data).and_then(|()| stream.flush()) {
113156
Ok(()) => {}
114157
Err(e) => {
115158
// This can only fail if the sender disconnected, in which case they'll
@@ -140,12 +183,16 @@ impl SuperConsoleOutput for NonBlockingSuperConsoleOutput {
140183
/// Attempt to send out a frame. If we called should_render, this won't block. If we didn't,
141184
/// then it may block.
142185
fn output(&mut self, buffer: Vec<u8>) -> anyhow::Result<()> {
186+
self.output_to(buffer, OutputTarget::Main)
187+
}
188+
189+
fn output_to(&mut self, buffer: Vec<u8>, target: OutputTarget) -> anyhow::Result<()> {
143190
if let Ok(err) = self.errors.try_recv() {
144191
return Err(anyhow::Error::from(err).context("Superconsole I/O thread errored"));
145192
}
146193

147194
self.sender
148-
.send(buffer)
195+
.send((buffer, target))
149196
.context("Superconsole I/O thread has crashed")?;
150197

151198
Ok(())
@@ -221,29 +268,44 @@ mod tests {
221268

222269
#[test]
223270
fn test_non_blocking_output_errors_on_next_output() -> anyhow::Result<()> {
224-
let (writer, drain) = TestWriter::new();
271+
fn test_send_target(target0: OutputTarget, target1: OutputTarget) -> anyhow::Result<()> {
272+
let (writer, drain) = TestWriter::new();
273+
let aux_writer = writer.clone();
274+
275+
let mut output = NonBlockingSuperConsoleOutput::new_for_writer(
276+
Box::new(writer),
277+
Box::new(aux_writer),
278+
)?;
225279

226-
let mut output = NonBlockingSuperConsoleOutput::new_for_writer(Box::new(writer))?;
280+
// Send a first message, this will go into write()
281+
assert!(output.should_render());
282+
output.output_to(msg(), target0)?;
227283

228-
// Send a first message, this will go into write()
229-
assert!(output.should_render());
230-
output.output(msg())?;
284+
// Send a second message, this will stay in the channel.
285+
output.output_to(msg(), target1)?;
231286

232-
// Send a second message, this will stay in the channel.
233-
output.output(msg())?;
287+
// Now, kill the output
288+
assert!(!output.should_render());
289+
drop(drain);
234290

235-
// Now, kill the output
236-
assert!(!output.should_render());
237-
drop(drain);
291+
// We expect that should_render() will eventually return true.
292+
while !output.should_render() {
293+
std::thread::yield_now();
294+
}
238295

239-
// We expect that should_render() will eventually return true.
240-
while !output.should_render() {
241-
std::thread::yield_now();
296+
// Likewise, we expect that sending output and finalizing wold fail.
297+
assert!(output.output(Vec::new()).is_err());
298+
assert!(Box::new(output).finalize().is_err());
299+
300+
Ok(())
242301
}
243302

244-
// Likewise, we expect that sending output and finalizing wold fail.
245-
assert!(output.output(Vec::new()).is_err());
246-
assert!(Box::new(output).finalize().is_err());
303+
// Test all combinations of targets
304+
for target0 in [OutputTarget::Main, OutputTarget::Aux] {
305+
for target1 in [OutputTarget::Main, OutputTarget::Aux] {
306+
test_send_target(target0, target1)?
307+
}
308+
}
247309

248310
Ok(())
249311
}

0 commit comments

Comments
 (0)