Skip to content

Commit b930fc2

Browse files
committed
Allow closing the stdin stream
This allows executing programs that read until output is complete.
1 parent 291b6d4 commit b930fc2

File tree

8 files changed

+258
-42
lines changed

8 files changed

+258
-42
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -777,10 +777,22 @@ impl Container {
777777

778778
let task = tokio::spawn({
779779
async move {
780+
let mut stdin_open = true;
781+
780782
loop {
781783
select! {
782-
Some(stdin) = stdin_rx.recv() => {
783-
let msg = CoordinatorMessage::StdinPacket(stdin);
784+
stdin = stdin_rx.recv(), if stdin_open => {
785+
let msg = match stdin {
786+
Some(stdin) => {
787+
CoordinatorMessage::StdinPacket(stdin)
788+
}
789+
790+
None => {
791+
stdin_open = false;
792+
CoordinatorMessage::StdinClose
793+
}
794+
};
795+
784796
trace!("processing {msg:?}");
785797
to_worker_tx.send(msg).await.context(StdinSnafu)?;
786798
},
@@ -1802,6 +1814,62 @@ mod tests {
18021814
Ok(())
18031815
}
18041816

1817+
#[tokio::test]
1818+
#[snafu::report]
1819+
async fn execute_stdin_close() -> Result<()> {
1820+
let coordinator = new_coordinator().await;
1821+
1822+
let request = ExecuteRequest {
1823+
code: r#"
1824+
fn main() {
1825+
let mut input = String::new();
1826+
while let Ok(n) = std::io::stdin().read_line(&mut input) {
1827+
if n == 0 {
1828+
break;
1829+
}
1830+
println!("You entered >>>{input:?}<<<");
1831+
input.clear();
1832+
}
1833+
}
1834+
"#
1835+
.into(),
1836+
..ARBITRARY_EXECUTE_REQUEST
1837+
};
1838+
1839+
let ActiveExecution {
1840+
task,
1841+
stdin_tx,
1842+
stdout_rx,
1843+
stderr_rx,
1844+
} = coordinator.begin_execute(request).await.unwrap();
1845+
1846+
for i in 0..3 {
1847+
stdin_tx.send(format!("line {i}\n")).await.unwrap();
1848+
}
1849+
1850+
stdin_tx.send("no newline".into()).await.unwrap();
1851+
drop(stdin_tx); // Close the stdin handle
1852+
1853+
let WithOutput {
1854+
response,
1855+
stdout,
1856+
stderr,
1857+
} = WithOutput::try_absorb(task, stdout_rx, stderr_rx)
1858+
.with_timeout()
1859+
.await
1860+
.unwrap();
1861+
1862+
assert!(response.success, "{stderr}");
1863+
assert_contains!(stdout, r#">>>"line 0\n"<<<"#);
1864+
assert_contains!(stdout, r#">>>"line 1\n"<<<"#);
1865+
assert_contains!(stdout, r#">>>"line 2\n"<<<"#);
1866+
assert_contains!(stdout, r#">>>"no newline"<<<"#);
1867+
1868+
coordinator.shutdown().await?;
1869+
1870+
Ok(())
1871+
}
1872+
18051873
const HELLO_WORLD_CODE: &str = r#"fn main() { println!("Hello World!"); }"#;
18061874

18071875
const ARBITRARY_COMPILE_REQUEST: CompileRequest = CompileRequest {

compiler/base/orchestrator/src/message.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub enum CoordinatorMessage {
2626
ReadFile(ReadFileRequest),
2727
ExecuteCommand(ExecuteCommandRequest),
2828
StdinPacket(String),
29+
StdinClose,
2930
}
3031

3132
impl_narrow_to_broad!(

compiler/base/orchestrator/src/worker.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,14 @@ async fn handle_coordinator_message(
186186
.drop_error_details()
187187
.context(UnableToSendStdinPacketSnafu)?;
188188
}
189+
190+
CoordinatorMessage::StdinClose => {
191+
process_tx
192+
.send(Multiplexed(job_id, ProcessCommand::StdinClose))
193+
.await
194+
.drop_error_details()
195+
.context(UnableToSendStdinCloseSnafu)?;
196+
}
189197
}
190198
}
191199

@@ -216,6 +224,9 @@ pub enum HandleCoordinatorMessageError {
216224
#[snafu(display("Failed to send stdin packet to the command task"))]
217225
UnableToSendStdinPacket { source: mpsc::error::SendError<()> },
218226

227+
#[snafu(display("Failed to send stdin close request to the command task"))]
228+
UnableToSendStdinClose { source: mpsc::error::SendError<()> },
229+
219230
#[snafu(display("A coordinator command handler background task panicked"))]
220231
TaskPanicked { source: tokio::task::JoinError },
221232
}
@@ -371,6 +382,7 @@ fn parse_working_dir(cwd: Option<String>, project_path: impl Into<PathBuf>) -> P
371382
enum ProcessCommand {
372383
Start(ExecuteCommandRequest, MultiplexingSender),
373384
Stdin(String),
385+
StdinClose,
374386
}
375387

376388
struct ProcessState {
@@ -478,6 +490,8 @@ async fn manage_processes(
478490
ProcessCommand::Start(req, worker_msg_tx) => state.start(job_id, req, worker_msg_tx).await?,
479491

480492
ProcessCommand::Stdin(packet) => state.stdin(job_id, packet).await?,
493+
494+
ProcessCommand::StdinClose => state.stdin_close(job_id),
481495
}
482496
}
483497

tests/spec/features/streaming_spec.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,30 @@
6363
end
6464
end
6565

66+
scenario "input can be closed" do
67+
editor.set <<~EOF
68+
fn main() {
69+
let mut input = String::new();
70+
while std::io::stdin().read_line(&mut input).unwrap() != 0 {
71+
println!("You entered >>>{input:?}<<<");
72+
input.clear();
73+
}
74+
println!("All done");
75+
}
76+
EOF
77+
78+
click_on("Run")
79+
80+
within(:stdin) do
81+
click_on 'Execution control'
82+
click_on 'Close stdin'
83+
end
84+
85+
within(:output, :stdout) do
86+
expect(page).to have_content 'All done'
87+
end
88+
end
89+
6690
def editor
6791
Editor.new(page)
6892
end

ui/frontend/Stdin.module.css

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,22 @@
2525
visibility: hidden;
2626
white-space: pre-wrap;
2727
}
28+
29+
.buttons {
30+
font-size: 11px;
31+
}
32+
33+
.menu {
34+
list-style: none;
35+
margin: 0;
36+
padding: 0;
37+
}
38+
39+
.button {
40+
composes: -menuItemFullButton from './shared.module.css';
41+
padding: 1em;
42+
43+
&:hover {
44+
color: var(--header-tint);
45+
}
46+
}

ui/frontend/Stdin.tsx

Lines changed: 73 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import React, { ChangeEvent, FormEvent, KeyboardEvent, useCallback, useRef, useState } from 'react';
22
import { useDispatch, useSelector } from 'react-redux';
33

4-
import { wsExecuteStdin } from './reducers/output/execute';
4+
import { Button, ButtonSet, IconButton } from './ButtonSet';
5+
import PopButton, { ButtonProps, MenuProps } from './PopButton';
6+
import { wsExecuteStdin, wsExecuteStdinClose } from './reducers/output/execute';
57
import { enableStdinSelector } from './selectors';
68

79
import styles from './Stdin.module.css';
@@ -20,6 +22,10 @@ const Stdin: React.FC = () => {
2022
e.preventDefault();
2123
form.current?.dispatchEvent(new Event('submit', { bubbles: true, cancelable: true }));
2224
}
25+
26+
if (e.key === 'd' && e.ctrlKey && content.length === 0) {
27+
dispatch(wsExecuteStdinClose());
28+
}
2329
},
2430
[dispatch, form, content],
2531
);
@@ -42,26 +48,73 @@ const Stdin: React.FC = () => {
4248
[dispatch, content, setContent],
4349
);
4450

51+
const menuContainer = useRef<HTMLDivElement | null>(null);
52+
53+
return (
54+
<div data-test-id="stdin">
55+
<form onSubmit={handleSubmit} className={styles.form} ref={form}>
56+
<div className={styles.multiLine}>
57+
<textarea
58+
rows={1}
59+
onKeyDown={handleKeyDown}
60+
onChange={handleChange}
61+
name="content"
62+
autoComplete="off"
63+
spellCheck="false"
64+
className={styles.text}
65+
value={content}
66+
disabled={disabled}
67+
></textarea>
68+
<p className={styles.sizer}>{content} </p>
69+
</div>
70+
71+
<ButtonSet className={styles.buttons}>
72+
<Button isPrimary isSmall type="submit" disabled={disabled} iconRight={() => '⏎'}>
73+
Send
74+
</Button>
75+
76+
<PopButton Button={MoreButton} Menu={MoreMenu} menuContainer={menuContainer} />
77+
</ButtonSet>
78+
</form>
79+
<div ref={menuContainer} />
80+
</div>
81+
);
82+
};
83+
84+
const MoreButton = React.forwardRef<HTMLButtonElement, ButtonProps>(({ toggle }, ref) => {
85+
const disabled = !useSelector(enableStdinSelector);
86+
87+
return (
88+
<IconButton
89+
isSmall
90+
type="button"
91+
ref={ref}
92+
title="Execution control"
93+
onClick={toggle}
94+
disabled={disabled}
95+
>
96+
97+
</IconButton>
98+
);
99+
});
100+
MoreButton.displayName = 'MoreButton';
101+
102+
const MoreMenu: React.FC<MenuProps> = ({ close }) => {
103+
const dispatch = useDispatch();
104+
105+
const stdinClose = useCallback(() => {
106+
dispatch(wsExecuteStdinClose());
107+
close();
108+
}, [dispatch, close]);
109+
45110
return (
46-
<form onSubmit={handleSubmit} className={styles.form} data-test-id="stdin" ref={form}>
47-
<div className={styles.multiLine}>
48-
<textarea
49-
rows={1}
50-
onKeyDown={handleKeyDown}
51-
onChange={handleChange}
52-
name="content"
53-
autoComplete="off"
54-
spellCheck="false"
55-
className={styles.text}
56-
value={content}
57-
disabled={disabled}
58-
></textarea>
59-
<p className={styles.sizer}>{content} </p>
60-
</div>
61-
<button type="submit" disabled={disabled}>
62-
Send
63-
</button>
64-
</form>
111+
<ul className={styles.menu}>
112+
<li>
113+
<button type="button" className={styles.button} onClick={stdinClose}>
114+
Close stdin
115+
</button>
116+
</li>
117+
</ul>
65118
);
66119
};
67120

ui/frontend/reducers/output/execute.ts

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Draft, createAsyncThunk, createSlice } from '@reduxjs/toolkit';
1+
import { AnyAction, Draft, createAsyncThunk, createSlice } from '@reduxjs/toolkit';
22
import * as z from 'zod';
33

44
import { SimpleThunkAction, adaptFetchError, jsonPost, routes } from '../../actions';
@@ -78,6 +78,14 @@ export const performExecute = createAsyncThunk(sliceName, async (payload: Execut
7878
adaptFetchError(() => jsonPost<ExecuteResponseBody>(routes.execute, payload)),
7979
);
8080

81+
const prepareWithCurrentSequenceNumber = <P>(payload: P, sequenceNumber: number) => ({
82+
payload,
83+
meta: {
84+
websocket: true,
85+
sequenceNumber,
86+
},
87+
});
88+
8189
const sequenceNumberMatches =
8290
<P>(whenMatch: (state: Draft<State>, payload: P) => void) =>
8391
(state: Draft<State>, action: WsPayloadAction<P>) => {
@@ -111,13 +119,12 @@ const slice = createSlice({
111119
wsExecuteStdin: {
112120
reducer: () => {},
113121

114-
prepare: (payload: string, sequenceNumber: number) => ({
115-
payload,
116-
meta: {
117-
websocket: true,
118-
sequenceNumber,
119-
},
120-
}),
122+
prepare: prepareWithCurrentSequenceNumber,
123+
},
124+
wsExecuteStdinClose: {
125+
reducer: () => {},
126+
127+
prepare: prepareWithCurrentSequenceNumber,
121128
},
122129
},
123130
extraReducers: (builder) => {
@@ -192,17 +199,27 @@ export const performCommonExecute =
192199
}
193200
};
194201

195-
export const wsExecuteStdin =
196-
(payload: string): SimpleThunkAction =>
202+
const dispatchWhenSequenceNumber =
203+
<A extends AnyAction>(cb: (sequenceNumber: number) => A): SimpleThunkAction =>
197204
(dispatch, getState) => {
198205
const state = getState();
199-
const { requestsInProgress, sequenceNumber } = state.output.execute;
200-
if (requestsInProgress === 0 || !sequenceNumber) {
201-
return;
206+
const { sequenceNumber } = state.output.execute;
207+
if (sequenceNumber) {
208+
const action = cb(sequenceNumber);
209+
dispatch(action);
202210
}
203-
dispatch(slice.actions.wsExecuteStdin(payload, sequenceNumber));
204211
};
205212

213+
export const wsExecuteStdin = (payload: string): SimpleThunkAction =>
214+
dispatchWhenSequenceNumber((sequenceNumber) =>
215+
slice.actions.wsExecuteStdin(payload, sequenceNumber),
216+
);
217+
218+
export const wsExecuteStdinClose = (): SimpleThunkAction =>
219+
dispatchWhenSequenceNumber((sequenceNumber) =>
220+
slice.actions.wsExecuteStdinClose(undefined, sequenceNumber),
221+
);
222+
206223
export { wsExecuteBeginSchema, wsExecuteStdoutSchema, wsExecuteStderrSchema, wsExecuteEndSchema };
207224

208225
export default slice.reducer;

0 commit comments

Comments
 (0)