Skip to content

Commit e1efac5

Browse files
authored
Merge pull request #104 from openssh-rust/fix/cut-off
Fixed stdout/stderr cut off bug when using `RemoteChild::wait_with_output`
2 parents 5282f39 + 4c8e61a commit e1efac5

File tree

6 files changed

+200
-37
lines changed

6 files changed

+200
-37
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ dirs = "4.0.0"
4949

5050
openssh-mux-client = { version = "0.15.1", optional = true }
5151

52+
libc = "0.2.137"
53+
5254
[dev-dependencies]
5355
regex = "1"
5456
tokio = { version = "1", features = [ "full" ] }

src/changelog.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@ use crate::*;
55
///
66
/// ## Added
77
/// - `impl From<std::os::unix::io::OwnedFd> for Stdio`
8+
/// - Add new fn `Stdio::from_raw_fd_owned`
89
/// ## Changed
10+
/// - Mark `FromRawFd` impl for `Stdio` as deprecated
11+
/// - Mark `From<tokio_pipe::PipeRead>` for `Stdio` as deprecated
12+
/// - Mark `From<tokio_pipe::PipeWrite>` for `Stdio` as deprecated
13+
/// ## Fixed
14+
/// - [`wait_with_output` + `native-mux` cuts off stdout output](https://github.com/openssh-rust/openssh/issues/103)
915
/// ## Removed
1016
#[doc(hidden)]
1117
pub mod unreleased {}

src/child.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ macro_rules! delegate {
6262
/// available,`Stdio::piped()` should be passed to the corresponding method on
6363
/// [`Command`](crate::Command).
6464
///
65+
/// NOTE that once `RemoteChild` is dropped, any data written to `stdin` will not be sent to the
66+
/// remote process and `stdout` and `stderr` will yield EOF immediately.
67+
///
6568
/// ```rust,no_run
6669
/// # async fn foo() {
6770
/// # let child: openssh::RemoteChild<'static> = unimplemented!();
@@ -169,9 +172,15 @@ impl<'s> RemoteChild<'s> {
169172

170173
// Execute them concurrently to avoid the pipe buffer being filled up
171174
// and cause the remote process to block forever.
172-
let (status, stdout, stderr) = try_join!(self.wait(), stdout_read, stderr_read)?;
175+
let (stdout, stderr) = try_join!(stdout_read, stderr_read)?;
173176
Ok(Output {
174-
status,
177+
// The self.wait() future terminates the stdout and stderr futures
178+
// when it resolves, even if there may still be more data arriving
179+
// from the server.
180+
//
181+
// Therefore, we wait for them first, and only once they're complete
182+
// do we wait for the process to have terminated.
183+
status: self.wait().await?,
175184
stdout,
176185
stderr,
177186
})

src/native_mux_impl/stdio.rs

Lines changed: 86 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
use crate::stdio::StdioImpl;
2-
use crate::Error;
3-
use crate::Stdio;
1+
use crate::{stdio::StdioImpl, Error, Stdio};
42

5-
use once_cell::sync::OnceCell;
3+
use std::{
4+
fs::{File, OpenOptions},
5+
io,
6+
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},
7+
};
68

7-
use std::fs::{File, OpenOptions};
8-
use std::io;
9-
use std::os::unix::io::{AsRawFd, RawFd};
9+
use libc::{c_int, fcntl, F_GETFL, F_SETFL, O_NONBLOCK};
10+
use once_cell::sync::OnceCell;
1011
use tokio_pipe::{pipe, PipeRead, PipeWrite};
1112

1213
fn create_pipe() -> Result<(PipeRead, PipeWrite), Error> {
@@ -28,25 +29,73 @@ fn get_null_fd() -> Result<RawFd, Error> {
2829
}
2930

3031
pub(crate) enum Fd {
31-
PipeReadEnd(PipeRead),
32-
PipeWriteEnd(PipeWrite),
33-
32+
Owned(OwnedFd),
3433
Borrowed(RawFd),
3534
Null,
3635
}
3736

37+
fn cvt(ret: c_int) -> io::Result<c_int> {
38+
if ret == -1 {
39+
Err(io::Error::last_os_error())
40+
} else {
41+
Ok(ret)
42+
}
43+
}
44+
45+
fn set_blocking_inner(fd: RawFd) -> io::Result<()> {
46+
let flags = cvt(unsafe { fcntl(fd, F_GETFL) })?;
47+
cvt(unsafe { fcntl(fd, F_SETFL, flags & (!O_NONBLOCK)) })?;
48+
49+
Ok(())
50+
}
51+
52+
fn set_blocking(fd: RawFd) -> Result<(), Error> {
53+
set_blocking_inner(fd).map_err(Error::ChildIo)
54+
}
55+
3856
impl Fd {
3957
pub(crate) fn as_raw_fd_or_null_fd(&self) -> Result<RawFd, Error> {
4058
use Fd::*;
4159

4260
match self {
43-
PipeReadEnd(fd) => Ok(AsRawFd::as_raw_fd(fd)),
44-
PipeWriteEnd(fd) => Ok(AsRawFd::as_raw_fd(fd)),
45-
61+
Owned(owned_fd) => Ok(owned_fd.as_raw_fd()),
4662
Borrowed(rawfd) => Ok(*rawfd),
4763
Null => get_null_fd(),
4864
}
4965
}
66+
67+
/// # Safety
68+
///
69+
/// `T::into_raw_fd` must return a valid fd and transfers
70+
/// the ownershipt of it.
71+
unsafe fn new_owned<T: IntoRawFd>(fd: T) -> Result<Self, Error> {
72+
let raw_fd = fd.into_raw_fd();
73+
Ok(Fd::Owned(OwnedFd::from_raw_fd(raw_fd)))
74+
}
75+
}
76+
77+
impl TryFrom<PipeRead> for Fd {
78+
type Error = Error;
79+
80+
fn try_from(pipe_read: PipeRead) -> Result<Self, Error> {
81+
// Safety:
82+
//
83+
// PipeRead::into_raw_fd returns a valid fd and transfers the
84+
// ownership of it.
85+
unsafe { Self::new_owned(pipe_read) }
86+
}
87+
}
88+
89+
impl TryFrom<PipeWrite> for Fd {
90+
type Error = Error;
91+
92+
fn try_from(pipe_write: PipeWrite) -> Result<Self, Error> {
93+
// Safety:
94+
//
95+
// PipeWrite::into_raw_fd returns a valid fd and transfers the
96+
// ownership of it.
97+
unsafe { Self::new_owned(pipe_write) }
98+
}
5099
}
51100

52101
impl Stdio {
@@ -56,9 +105,19 @@ impl Stdio {
56105
StdioImpl::Null => Ok((Fd::Null, None)),
57106
StdioImpl::Pipe => {
58107
let (read, write) = create_pipe()?;
59-
Ok((Fd::PipeReadEnd(read), Some(write)))
108+
109+
// read end will be sent to ssh multiplex server
110+
// and it expects blocking fd.
111+
set_blocking(read.as_raw_fd())?;
112+
Ok((read.try_into()?, Some(write)))
113+
}
114+
StdioImpl::Fd(fd, owned) => {
115+
let raw_fd = fd.as_raw_fd();
116+
if *owned {
117+
set_blocking(raw_fd)?;
118+
}
119+
Ok((Fd::Borrowed(raw_fd), None))
60120
}
61-
StdioImpl::Fd(fd) => Ok((Fd::Borrowed(fd.as_raw_fd()), None)),
62121
}
63122
}
64123

@@ -68,9 +127,19 @@ impl Stdio {
68127
StdioImpl::Null => Ok((Fd::Null, None)),
69128
StdioImpl::Pipe => {
70129
let (read, write) = create_pipe()?;
71-
Ok((Fd::PipeWriteEnd(write), Some(read)))
130+
131+
// write end will be sent to ssh multiplex server
132+
// and it expects blocking fd.
133+
set_blocking(write.as_raw_fd())?;
134+
Ok((write.try_into()?, Some(read)))
135+
}
136+
StdioImpl::Fd(fd, owned) => {
137+
let raw_fd = fd.as_raw_fd();
138+
if *owned {
139+
set_blocking(raw_fd)?;
140+
}
141+
Ok((Fd::Borrowed(raw_fd), None))
72142
}
73-
StdioImpl::Fd(fd) => Ok((Fd::Borrowed(fd.as_raw_fd()), None)),
74143
}
75144
}
76145

src/stdio.rs

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub(crate) enum StdioImpl {
1818
/// Read/Write to a newly created pipe
1919
Pipe,
2020
/// Read/Write to custom fd
21-
Fd(OwnedFd),
21+
Fd(OwnedFd, bool),
2222
/// Inherit stdin/stdout/stderr
2323
Inherit,
2424
}
@@ -40,13 +40,40 @@ impl Stdio {
4040
}
4141

4242
/// The child inherits from the corresponding parent descriptor.
43+
///
44+
/// NOTE that the stdio fd must be in blocking mode, otherwise
45+
/// ssh might not flush all output since it considers
46+
/// (`EAGAIN`/`EWOULDBLOCK`) as an error
4347
pub const fn inherit() -> Self {
4448
Self(StdioImpl::Inherit)
4549
}
50+
51+
/// `Stdio::from_raw_fd_owned` takes ownership of the fd passed in
52+
/// and closes the fd on drop.
53+
///
54+
/// NOTE that the fd will be put into blocking mode, then it will be
55+
/// closed when `Stdio` is dropped.
56+
///
57+
/// # Safety
58+
///
59+
/// * `fd` - must be a valid fd and must give its ownership to `Stdio`.
60+
pub unsafe fn from_raw_fd_owned(fd: RawFd) -> Self {
61+
Self(StdioImpl::Fd(OwnedFd::from_raw_fd(fd), true))
62+
}
4663
}
64+
/// **Deprecated, use [`Stdio::from_raw_fd_owned`] instead.**
65+
///
66+
/// FromRawFd takes ownership of the fd passed in
67+
/// and closes the fd on drop.
68+
///
69+
/// NOTE that the fd must be in blocking mode, otherwise
70+
/// ssh might not flush all output since it considers
71+
/// (`EAGAIN`/`EWOULDBLOCK`) as an error
72+
#[allow(useless_deprecated)]
73+
#[deprecated(since = "0.9.8", note = "Use Stdio::from_raw_fd_owned instead")]
4774
impl FromRawFd for Stdio {
4875
unsafe fn from_raw_fd(fd: RawFd) -> Self {
49-
Self(StdioImpl::Fd(OwnedFd::from_raw_fd(fd)))
76+
Self(StdioImpl::Fd(OwnedFd::from_raw_fd(fd), false))
5077
}
5178
}
5279
impl From<Stdio> for process::Stdio {
@@ -59,14 +86,16 @@ impl From<Stdio> for process::Stdio {
5986
// safety: StdioImpl(fd) is only constructed from known-valid and
6087
// owned file descriptors by virtue of the safety requirement
6188
// for invoking from_raw_fd.
62-
StdioImpl::Fd(fd) => unsafe { process::Stdio::from_raw_fd(IntoRawFd::into_raw_fd(fd)) },
89+
StdioImpl::Fd(fd, _) => unsafe {
90+
process::Stdio::from_raw_fd(IntoRawFd::into_raw_fd(fd))
91+
},
6392
}
6493
}
6594
}
6695

6796
impl From<OwnedFd> for Stdio {
6897
fn from(fd: OwnedFd) -> Self {
69-
Self(StdioImpl::Fd(fd))
98+
Self(StdioImpl::Fd(fd, true))
7099
}
71100
}
72101

@@ -77,14 +106,31 @@ macro_rules! impl_from_for_stdio {
77106
let fd = arg.into_raw_fd();
78107
// safety: $type must have a valid into_raw_fd implementation
79108
// and must not be RawFd.
80-
unsafe { Self::from_raw_fd(fd) }
109+
Self(StdioImpl::Fd(unsafe { OwnedFd::from_raw_fd(fd) }, true))
110+
}
111+
}
112+
};
113+
(deprecated $type:ty) => {
114+
#[allow(useless_deprecated)]
115+
#[deprecated(
116+
since = "0.9.8",
117+
note = "Use From<OwnedFd> for Stdio or Stdio::from_raw_fd_owned instead"
118+
)]
119+
/// **Deprecated, use `From<OwnedFd> for Stdio` or
120+
/// [`Stdio::from_raw_fd_owned`] instead.**
121+
impl From<$type> for Stdio {
122+
fn from(arg: $type) -> Self {
123+
let fd = arg.into_raw_fd();
124+
// safety: $type must have a valid into_raw_fd implementation
125+
// and must not be RawFd.
126+
Self(StdioImpl::Fd(unsafe { OwnedFd::from_raw_fd(fd) }, true))
81127
}
82128
}
83129
};
84130
}
85131

86-
impl_from_for_stdio!(tokio_pipe::PipeWrite);
87-
impl_from_for_stdio!(tokio_pipe::PipeRead);
132+
impl_from_for_stdio!(deprecated tokio_pipe::PipeWrite);
133+
impl_from_for_stdio!(deprecated tokio_pipe::PipeRead);
88134

89135
impl_from_for_stdio!(process::ChildStdin);
90136
impl_from_for_stdio!(process::ChildStdout);

tests/openssh.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use once_cell::sync::Lazy;
22
use regex::Regex;
3-
use std::env;
4-
use std::io;
5-
use std::io::Write;
6-
use std::net::IpAddr;
7-
use std::path::PathBuf;
8-
use std::time::Duration;
3+
use std::{
4+
env,
5+
io::{self, Write},
6+
net::IpAddr,
7+
path::PathBuf,
8+
process,
9+
time::Duration,
10+
};
911
use tempfile::tempdir;
10-
11-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
1212
use tokio::{
13+
io::{AsyncReadExt, AsyncWriteExt},
1314
net::{UnixListener, UnixStream},
1415
time::sleep,
1516
};
@@ -52,7 +53,7 @@ async fn session_builder_connect(mut builder: SessionBuilder, addr: &str) -> Vec
5253
sessions
5354
}
5455

55-
async fn connects() -> Vec<Session> {
56+
async fn connects_with_name() -> Vec<(Session, &'static str)> {
5657
let mut sessions = Vec::with_capacity(2);
5758

5859
let mut builder = SessionBuilder::default();
@@ -63,17 +64,25 @@ async fn connects() -> Vec<Session> {
6364

6465
#[cfg(feature = "process-mux")]
6566
{
66-
sessions.push(builder.connect(&addr()).await.unwrap());
67+
sessions.push((builder.connect(&addr()).await.unwrap(), "process-mux"));
6768
}
6869

6970
#[cfg(feature = "native-mux")]
7071
{
71-
sessions.push(builder.connect_mux(&addr()).await.unwrap());
72+
sessions.push((builder.connect_mux(&addr()).await.unwrap(), "native-mux"));
7273
}
7374

7475
sessions
7576
}
7677

78+
async fn connects() -> Vec<Session> {
79+
connects_with_name()
80+
.await
81+
.into_iter()
82+
.map(|(session, _name)| session)
83+
.collect()
84+
}
85+
7786
async fn connects_err(host: &str) -> Vec<Error> {
7887
session_builder_connects_err(host, SessionBuilder::default()).await
7988
}
@@ -942,3 +951,25 @@ async fn test_sftp_subsystem() {
942951
sftp.close().await.unwrap();
943952
}
944953
}
954+
955+
#[tokio::test]
956+
#[cfg_attr(not(ci), ignore)]
957+
async fn test_read_large_file_bug() {
958+
for (session, name) in connects_with_name().await {
959+
eprintln!("Testing {name} implementation");
960+
961+
let bs = 1024;
962+
let count = 20480;
963+
964+
let process::Output { status, stdout, .. } = session
965+
.shell(format!("dd if=/dev/zero bs={bs} count={count}"))
966+
.output()
967+
.await
968+
.unwrap();
969+
970+
assert!(status.success());
971+
972+
stdout.iter().copied().for_each(|byte| assert_eq!(byte, 0));
973+
assert_eq!(stdout.len(), bs * count);
974+
}
975+
}

0 commit comments

Comments
 (0)