Skip to content

Commit 40c7a3b

Browse files
authored
Merge pull request #248 from Berrysoft:dev/async-fd
feat(fs): AsyncFd
2 parents c2d8de0 + 4cd7785 commit 40c7a3b

File tree

5 files changed

+235
-26
lines changed

5 files changed

+235
-26
lines changed

compio-driver/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ macro_rules! impl_raw_fd {
138138
}
139139
}
140140
}
141+
#[cfg(windows)]
142+
impl std::os::windows::io::AsRawHandle for $t {
143+
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
144+
self.$inner.as_raw_handle()
145+
}
146+
}
141147
};
142148
($t:ty, $it:ty, $inner:ident,socket) => {
143149
$crate::impl_raw_fd!($t, $it, $inner);
@@ -149,6 +155,12 @@ macro_rules! impl_raw_fd {
149155
}
150156
}
151157
}
158+
#[cfg(windows)]
159+
impl std::os::windows::io::AsRawSocket for $t {
160+
fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
161+
self.$inner.as_raw_socket()
162+
}
163+
}
152164
};
153165
}
154166

compio-fs/src/async_fd.rs

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
use std::io;
2+
#[cfg(unix)]
3+
use std::os::fd::{FromRawFd, RawFd};
4+
#[cfg(windows)]
5+
use std::os::windows::io::{
6+
AsRawHandle, AsRawSocket, FromRawHandle, FromRawSocket, RawHandle, RawSocket,
7+
};
8+
9+
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
10+
use compio_driver::{
11+
op::{BufResultExt, Recv, Send},
12+
AsRawFd, SharedFd, ToSharedFd,
13+
};
14+
use compio_io::{AsyncRead, AsyncWrite};
15+
use compio_runtime::{Attacher, Runtime};
16+
#[cfg(unix)]
17+
use {
18+
compio_buf::{IoVectoredBuf, IoVectoredBufMut},
19+
compio_driver::op::{RecvVectored, SendVectored},
20+
};
21+
22+
/// A wrapper for IO source, providing implementations for [`AsyncRead`] and
23+
/// [`AsyncWrite`].
24+
#[derive(Debug)]
25+
pub struct AsyncFd<T: AsRawFd> {
26+
inner: Attacher<T>,
27+
}
28+
29+
impl<T: AsRawFd> AsyncFd<T> {
30+
/// Create [`AsyncFd`] and attach the source to the current runtime.
31+
pub fn new(source: T) -> io::Result<Self> {
32+
Ok(Self {
33+
inner: Attacher::new(source)?,
34+
})
35+
}
36+
37+
/// Create [`AsyncFd`] without attaching the source.
38+
///
39+
/// # Safety
40+
///
41+
/// The user should handle the attachment correctly.
42+
pub unsafe fn new_unchecked(source: T) -> Self {
43+
Self {
44+
inner: Attacher::new_unchecked(source),
45+
}
46+
}
47+
}
48+
49+
impl<T: AsRawFd + 'static> AsyncRead for AsyncFd<T> {
50+
#[inline]
51+
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
52+
(&*self).read(buf).await
53+
}
54+
55+
#[cfg(unix)]
56+
#[inline]
57+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
58+
(&*self).read_vectored(buf).await
59+
}
60+
}
61+
62+
impl<T: AsRawFd + 'static> AsyncRead for &AsyncFd<T> {
63+
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
64+
let fd = self.inner.to_shared_fd();
65+
let op = Recv::new(fd, buf);
66+
Runtime::current()
67+
.submit(op)
68+
.await
69+
.into_inner()
70+
.map_advanced()
71+
}
72+
73+
#[cfg(unix)]
74+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
75+
let fd = self.inner.to_shared_fd();
76+
let op = RecvVectored::new(fd, buf);
77+
Runtime::current()
78+
.submit(op)
79+
.await
80+
.into_inner()
81+
.map_advanced()
82+
}
83+
}
84+
85+
impl<T: AsRawFd + 'static> AsyncWrite for AsyncFd<T> {
86+
#[inline]
87+
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
88+
(&*self).write(buf).await
89+
}
90+
91+
#[cfg(unix)]
92+
#[inline]
93+
async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
94+
(&*self).write_vectored(buf).await
95+
}
96+
97+
#[inline]
98+
async fn flush(&mut self) -> io::Result<()> {
99+
(&*self).flush().await
100+
}
101+
102+
#[inline]
103+
async fn shutdown(&mut self) -> io::Result<()> {
104+
(&*self).shutdown().await
105+
}
106+
}
107+
108+
impl<T: AsRawFd + 'static> AsyncWrite for &AsyncFd<T> {
109+
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
110+
let fd = self.inner.to_shared_fd();
111+
let op = Send::new(fd, buf);
112+
Runtime::current().submit(op).await.into_inner()
113+
}
114+
115+
#[cfg(unix)]
116+
async fn write_vectored<V: IoVectoredBuf>(&mut self, buf: V) -> BufResult<usize, V> {
117+
let fd = self.inner.to_shared_fd();
118+
let op = SendVectored::new(fd, buf);
119+
Runtime::current().submit(op).await.into_inner()
120+
}
121+
122+
async fn flush(&mut self) -> io::Result<()> {
123+
Ok(())
124+
}
125+
126+
async fn shutdown(&mut self) -> io::Result<()> {
127+
Ok(())
128+
}
129+
}
130+
131+
impl<T: AsRawFd> IntoInner for AsyncFd<T> {
132+
type Inner = SharedFd<T>;
133+
134+
fn into_inner(self) -> Self::Inner {
135+
self.inner.into_inner()
136+
}
137+
}
138+
139+
impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
140+
fn as_raw_fd(&self) -> compio_driver::RawFd {
141+
self.inner.as_raw_fd()
142+
}
143+
}
144+
145+
#[cfg(windows)]
146+
impl<T: AsRawFd + AsRawHandle> AsRawHandle for AsyncFd<T> {
147+
fn as_raw_handle(&self) -> RawHandle {
148+
self.inner.as_raw_handle()
149+
}
150+
}
151+
152+
#[cfg(windows)]
153+
impl<T: AsRawFd + AsRawSocket> AsRawSocket for AsyncFd<T> {
154+
fn as_raw_socket(&self) -> RawSocket {
155+
self.inner.as_raw_socket()
156+
}
157+
}
158+
159+
impl<T: AsRawFd> ToSharedFd<T> for AsyncFd<T> {
160+
fn to_shared_fd(&self) -> SharedFd<T> {
161+
self.inner.to_shared_fd()
162+
}
163+
}
164+
165+
impl<T: AsRawFd> Clone for AsyncFd<T> {
166+
fn clone(&self) -> Self {
167+
Self {
168+
inner: self.inner.clone(),
169+
}
170+
}
171+
}
172+
173+
#[cfg(unix)]
174+
impl<T: AsRawFd + FromRawFd> FromRawFd for AsyncFd<T> {
175+
unsafe fn from_raw_fd(fd: RawFd) -> Self {
176+
Self::new_unchecked(FromRawFd::from_raw_fd(fd))
177+
}
178+
}
179+
180+
#[cfg(windows)]
181+
impl<T: AsRawFd + FromRawHandle> FromRawHandle for AsyncFd<T> {
182+
unsafe fn from_raw_handle(handle: RawHandle) -> Self {
183+
Self::new_unchecked(FromRawHandle::from_raw_handle(handle))
184+
}
185+
}
186+
187+
#[cfg(windows)]
188+
impl<T: AsRawFd + FromRawSocket> FromRawSocket for AsyncFd<T> {
189+
unsafe fn from_raw_socket(sock: RawSocket) -> Self {
190+
Self::new_unchecked(FromRawSocket::from_raw_socket(sock))
191+
}
192+
}

compio-fs/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub use stdio::*;
2323
mod utils;
2424
pub use utils::*;
2525

26+
mod async_fd;
27+
pub use async_fd::*;
28+
2629
#[cfg(windows)]
2730
pub mod named_pipe;
2831

compio-fs/src/named_pipe.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use windows_sys::Win32::{
2626
},
2727
};
2828

29-
use crate::{File, OpenOptions};
29+
use crate::{AsyncFd, File, OpenOptions};
3030

3131
/// A [Windows named pipe] server.
3232
///
@@ -88,7 +88,7 @@ use crate::{File, OpenOptions};
8888
/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
8989
#[derive(Debug, Clone)]
9090
pub struct NamedPipeServer {
91-
handle: File,
91+
handle: AsyncFd<std::fs::File>,
9292
}
9393

9494
impl NamedPipeServer {
@@ -189,8 +189,7 @@ impl AsyncRead for NamedPipeServer {
189189
impl AsyncRead for &NamedPipeServer {
190190
#[inline]
191191
async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
192-
// The position is ignored.
193-
self.handle.read_at(buffer, 0).await
192+
(&self.handle).read(buffer).await
194193
}
195194
}
196195

@@ -214,8 +213,7 @@ impl AsyncWrite for NamedPipeServer {
214213
impl AsyncWrite for &NamedPipeServer {
215214
#[inline]
216215
async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
217-
// The position is ignored.
218-
(&self.handle).write_at(buffer, 0).await
216+
(&self.handle).write(buffer).await
219217
}
220218

221219
#[inline]
@@ -974,7 +972,7 @@ impl ServerOptions {
974972
)?;
975973

976974
Ok(NamedPipeServer {
977-
handle: File::from_std(unsafe { std::fs::File::from_raw_handle(h as _) })?,
975+
handle: AsyncFd::new(unsafe { std::fs::File::from_raw_handle(h as _) })?,
978976
})
979977
}
980978
}

compio-fs/src/stdio/unix.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,43 @@
1-
use std::{io, mem::ManuallyDrop, os::fd::FromRawFd};
1+
use std::io;
22

33
use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
44
use compio_driver::{AsRawFd, RawFd};
55
use compio_io::{AsyncRead, AsyncWrite};
66

77
#[cfg(doc)]
88
use super::{stderr, stdin, stdout};
9-
use crate::pipe::{Receiver, Sender};
9+
use crate::AsyncFd;
1010

1111
/// A handle to the standard input stream of a process.
1212
///
1313
/// See [`stdin`].
1414
#[derive(Debug, Clone)]
15-
pub struct Stdin(ManuallyDrop<Receiver>);
15+
pub struct Stdin(AsyncFd<RawFd>);
1616

1717
impl Stdin {
1818
pub(crate) fn new() -> Self {
19-
// SAFETY: we don't drop it
20-
Self(ManuallyDrop::new(unsafe {
21-
Receiver::from_raw_fd(libc::STDIN_FILENO)
22-
}))
19+
// SAFETY: no need to attach on unix
20+
Self(unsafe { AsyncFd::new_unchecked(libc::STDIN_FILENO) })
2321
}
2422
}
2523

2624
impl AsyncRead for Stdin {
2725
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
28-
self.0.read(buf).await
26+
(&*self).read(buf).await
2927
}
3028

3129
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
32-
self.0.read_vectored(buf).await
30+
(&*self).read_vectored(buf).await
31+
}
32+
}
33+
34+
impl AsyncRead for &Stdin {
35+
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
36+
(&self.0).read(buf).await
37+
}
38+
39+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
40+
(&self.0).read_vectored(buf).await
3341
}
3442
}
3543

@@ -43,14 +51,12 @@ impl AsRawFd for Stdin {
4351
///
4452
/// See [`stdout`].
4553
#[derive(Debug, Clone)]
46-
pub struct Stdout(ManuallyDrop<Sender>);
54+
pub struct Stdout(AsyncFd<RawFd>);
4755

4856
impl Stdout {
4957
pub(crate) fn new() -> Self {
50-
// SAFETY: we don't drop it
51-
Self(ManuallyDrop::new(unsafe {
52-
Sender::from_raw_fd(libc::STDOUT_FILENO)
53-
}))
58+
// SAFETY: no need to attach on unix
59+
Self(unsafe { AsyncFd::new_unchecked(libc::STDOUT_FILENO) })
5460
}
5561
}
5662

@@ -82,14 +88,12 @@ impl AsRawFd for Stdout {
8288
///
8389
/// See [`stderr`].
8490
#[derive(Debug, Clone)]
85-
pub struct Stderr(ManuallyDrop<Sender>);
91+
pub struct Stderr(AsyncFd<RawFd>);
8692

8793
impl Stderr {
8894
pub(crate) fn new() -> Self {
89-
// SAFETY: we don't drop it
90-
Self(ManuallyDrop::new(unsafe {
91-
Sender::from_raw_fd(libc::STDERR_FILENO)
92-
}))
95+
// SAFETY: no need to attach on unix
96+
Self(unsafe { AsyncFd::new_unchecked(libc::STDERR_FILENO) })
9397
}
9498
}
9599

0 commit comments

Comments
 (0)