Skip to content

Commit 4cd7785

Browse files
committed
feat(fs, unix): use AsyncFd in stdio
1 parent d8cfdde commit 4cd7785

File tree

2 files changed

+58
-19
lines changed

2 files changed

+58
-19
lines changed

compio-fs/src/async_fd.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ use compio_driver::{
1313
};
1414
use compio_io::{AsyncRead, AsyncWrite};
1515
use compio_runtime::{Attacher, Runtime};
16+
#[cfg(unix)]
17+
use {
18+
compio_buf::{IoVectoredBuf, IoVectoredBufMut},
19+
compio_driver::op::{RecvVectored, SendVectored},
20+
};
1621

1722
/// A wrapper for IO source, providing implementations for [`AsyncRead`] and
1823
/// [`AsyncWrite`].
@@ -46,6 +51,12 @@ impl<T: AsRawFd + 'static> AsyncRead for AsyncFd<T> {
4651
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
4752
(&*self).read(buf).await
4853
}
54+
55+
#[cfg(unix)]
56+
#[inline]
57+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
58+
(&*self).read_vectored(buf).await
59+
}
4960
}
5061

5162
impl<T: AsRawFd + 'static> AsyncRead for &AsyncFd<T> {
@@ -58,6 +69,17 @@ impl<T: AsRawFd + 'static> AsyncRead for &AsyncFd<T> {
5869
.into_inner()
5970
.map_advanced()
6071
}
72+
73+
#[cfg(unix)]
74+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
75+
let fd = self.inner.to_shared_fd();
76+
let op = RecvVectored::new(fd, buf);
77+
Runtime::current()
78+
.submit(op)
79+
.await
80+
.into_inner()
81+
.map_advanced()
82+
}
6183
}
6284

6385
impl<T: AsRawFd + 'static> AsyncWrite for AsyncFd<T> {
@@ -66,6 +88,12 @@ impl<T: AsRawFd + 'static> AsyncWrite for AsyncFd<T> {
6688
(&*self).write(buf).await
6789
}
6890

91+
#[cfg(unix)]
92+
#[inline]
93+
async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
94+
(&*self).write_vectored(buf).await
95+
}
96+
6997
#[inline]
7098
async fn flush(&mut self) -> io::Result<()> {
7199
(&*self).flush().await
@@ -84,6 +112,13 @@ impl<T: AsRawFd + 'static> AsyncWrite for &AsyncFd<T> {
84112
Runtime::current().submit(op).await.into_inner()
85113
}
86114

115+
#[cfg(unix)]
116+
async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
117+
let fd = self.inner.to_shared_fd();
118+
let op = SendVectored::new(fd, buf);
119+
Runtime::current().submit(op).await.into_inner()
120+
}
121+
87122
async fn flush(&mut self) -> io::Result<()> {
88123
Ok(())
89124
}

compio-fs/src/stdio/unix.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,43 @@
1-
use std::{io, mem::ManuallyDrop, os::fd::FromRawFd};
1+
use std::io;
22

33
use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
44
use compio_driver::{AsRawFd, RawFd};
55
use compio_io::{AsyncRead, AsyncWrite};
66

77
#[cfg(doc)]
88
use super::{stderr, stdin, stdout};
9-
use crate::pipe::{Receiver, Sender};
9+
use crate::AsyncFd;
1010

1111
/// A handle to the standard input stream of a process.
1212
///
1313
/// See [`stdin`].
1414
#[derive(Debug, Clone)]
15-
pub struct Stdin(ManuallyDrop<Receiver>);
15+
pub struct Stdin(AsyncFd<RawFd>);
1616

1717
impl Stdin {
1818
pub(crate) fn new() -> Self {
19-
// SAFETY: we don't drop it
20-
Self(ManuallyDrop::new(unsafe {
21-
Receiver::from_raw_fd(libc::STDIN_FILENO)
22-
}))
19+
// SAFETY: no need to attach on unix
20+
Self(unsafe { AsyncFd::new_unchecked(libc::STDIN_FILENO) })
2321
}
2422
}
2523

2624
impl AsyncRead for Stdin {
2725
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
28-
self.0.read(buf).await
26+
(&*self).read(buf).await
2927
}
3028

3129
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
32-
self.0.read_vectored(buf).await
30+
(&*self).read_vectored(buf).await
31+
}
32+
}
33+
34+
impl AsyncRead for &Stdin {
35+
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
36+
(&self.0).read(buf).await
37+
}
38+
39+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
40+
(&self.0).read_vectored(buf).await
3341
}
3442
}
3543

@@ -43,14 +51,12 @@ impl AsRawFd for Stdin {
4351
///
4452
/// See [`stdout`].
4553
#[derive(Debug, Clone)]
46-
pub struct Stdout(ManuallyDrop<Sender>);
54+
pub struct Stdout(AsyncFd<RawFd>);
4755

4856
impl Stdout {
4957
pub(crate) fn new() -> Self {
50-
// SAFETY: we don't drop it
51-
Self(ManuallyDrop::new(unsafe {
52-
Sender::from_raw_fd(libc::STDOUT_FILENO)
53-
}))
58+
// SAFETY: no need to attach on unix
59+
Self(unsafe { AsyncFd::new_unchecked(libc::STDOUT_FILENO) })
5460
}
5561
}
5662

@@ -82,14 +88,12 @@ impl AsRawFd for Stdout {
8288
///
8389
/// See [`stderr`].
8490
#[derive(Debug, Clone)]
85-
pub struct Stderr(ManuallyDrop<Sender>);
91+
pub struct Stderr(AsyncFd<RawFd>);
8692

8793
impl Stderr {
8894
pub(crate) fn new() -> Self {
89-
// SAFETY: we don't drop it
90-
Self(ManuallyDrop::new(unsafe {
91-
Sender::from_raw_fd(libc::STDERR_FILENO)
92-
}))
95+
// SAFETY: no need to attach on unix
96+
Self(unsafe { AsyncFd::new_unchecked(libc::STDERR_FILENO) })
9397
}
9498
}
9599

0 commit comments

Comments
 (0)