Skip to content

Commit 6e43b1b

Browse files
committed
feat(fs): generic AsyncFd
1 parent 7f92bc9 commit 6e43b1b

File tree

3 files changed

+172
-0
lines changed

3 files changed

+172
-0
lines changed

compio-driver/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ macro_rules! impl_raw_fd {
138138
}
139139
}
140140
}
141+
#[cfg(windows)]
142+
impl std::os::windows::io::AsRawHandle for $t {
143+
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
144+
self.$inner.as_raw_handle()
145+
}
146+
}
141147
};
142148
($t:ty, $it:ty, $inner:ident,socket) => {
143149
$crate::impl_raw_fd!($t, $it, $inner);
@@ -149,6 +155,12 @@ macro_rules! impl_raw_fd {
149155
}
150156
}
151157
}
158+
#[cfg(windows)]
159+
impl std::os::windows::io::AsRawSocket for $t {
160+
fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
161+
self.$inner.as_raw_socket()
162+
}
163+
}
152164
};
153165
}
154166

compio-fs/src/async_fd.rs

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
use std::io;
2+
#[cfg(unix)]
3+
use std::os::fd::{FromRawFd, RawFd};
4+
#[cfg(windows)]
5+
use std::os::windows::io::{
6+
AsRawHandle, AsRawSocket, FromRawHandle, FromRawSocket, RawHandle, RawSocket,
7+
};
8+
9+
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut};
10+
use compio_driver::{
11+
op::{BufResultExt, Recv, Send},
12+
AsRawFd, SharedFd, ToSharedFd,
13+
};
14+
use compio_io::{AsyncRead, AsyncWrite};
15+
use compio_runtime::{Attacher, Runtime};
16+
17+
/// A wrapper for IO source, providing implementations for [`AsyncRead`] and
18+
/// [`AsyncWrite`].
19+
#[derive(Debug)]
20+
pub struct AsyncFd<T: AsRawFd> {
21+
inner: Attacher<T>,
22+
}
23+
24+
impl<T: AsRawFd> AsyncFd<T> {
25+
/// Create [`AsyncFd`] and attach the source to the current runtime.
26+
pub fn new(source: T) -> io::Result<Self> {
27+
Ok(Self {
28+
inner: Attacher::new(source)?,
29+
})
30+
}
31+
32+
/// Create [`AsyncFd`] without attaching the source.
33+
///
34+
/// # Safety
35+
///
36+
/// The user should handle the attachment correctly.
37+
pub unsafe fn new_unchecked(source: T) -> Self {
38+
Self {
39+
inner: Attacher::new_unchecked(source),
40+
}
41+
}
42+
}
43+
44+
impl<T: AsRawFd + 'static> AsyncRead for AsyncFd<T> {
45+
#[inline]
46+
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
47+
(&*self).read(buf).await
48+
}
49+
}
50+
51+
impl<T: AsRawFd + 'static> AsyncRead for &AsyncFd<T> {
52+
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
53+
let fd = self.inner.to_shared_fd();
54+
let op = Recv::new(fd, buf);
55+
Runtime::current()
56+
.submit(op)
57+
.await
58+
.into_inner()
59+
.map_advanced()
60+
}
61+
}
62+
63+
impl<T: AsRawFd + 'static> AsyncWrite for AsyncFd<T> {
64+
#[inline]
65+
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
66+
(&*self).write(buf).await
67+
}
68+
69+
#[inline]
70+
async fn flush(&mut self) -> io::Result<()> {
71+
(&*self).flush().await
72+
}
73+
74+
#[inline]
75+
async fn shutdown(&mut self) -> io::Result<()> {
76+
(&*self).shutdown().await
77+
}
78+
}
79+
80+
impl<T: AsRawFd + 'static> AsyncWrite for &AsyncFd<T> {
81+
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
82+
let fd = self.inner.to_shared_fd();
83+
let op = Send::new(fd, buf);
84+
Runtime::current().submit(op).await.into_inner()
85+
}
86+
87+
async fn flush(&mut self) -> io::Result<()> {
88+
Ok(())
89+
}
90+
91+
async fn shutdown(&mut self) -> io::Result<()> {
92+
Ok(())
93+
}
94+
}
95+
96+
impl<T: AsRawFd> IntoInner for AsyncFd<T> {
97+
type Inner = SharedFd<T>;
98+
99+
fn into_inner(self) -> Self::Inner {
100+
self.inner.into_inner()
101+
}
102+
}
103+
104+
impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
105+
fn as_raw_fd(&self) -> compio_driver::RawFd {
106+
self.inner.as_raw_fd()
107+
}
108+
}
109+
110+
#[cfg(windows)]
111+
impl<T: AsRawFd + AsRawHandle> AsRawHandle for AsyncFd<T> {
112+
fn as_raw_handle(&self) -> RawHandle {
113+
self.inner.as_raw_handle()
114+
}
115+
}
116+
117+
#[cfg(windows)]
118+
impl<T: AsRawFd + AsRawSocket> AsRawSocket for AsyncFd<T> {
119+
fn as_raw_socket(&self) -> RawSocket {
120+
self.inner.as_raw_socket()
121+
}
122+
}
123+
124+
impl<T: AsRawFd> ToSharedFd<T> for AsyncFd<T> {
125+
fn to_shared_fd(&self) -> SharedFd<T> {
126+
self.inner.to_shared_fd()
127+
}
128+
}
129+
130+
impl<T: AsRawFd> Clone for AsyncFd<T> {
131+
fn clone(&self) -> Self {
132+
Self {
133+
inner: self.inner.clone(),
134+
}
135+
}
136+
}
137+
138+
#[cfg(unix)]
139+
impl<T: AsRawFd + FromRawFd> FromRawFd for AsyncFd<T> {
140+
unsafe fn from_raw_fd(fd: RawFd) -> Self {
141+
Self::new_unchecked(FromRawFd::from_raw_fd(fd))
142+
}
143+
}
144+
145+
#[cfg(windows)]
146+
impl<T: AsRawFd + FromRawHandle> FromRawHandle for AsyncFd<T> {
147+
unsafe fn from_raw_handle(handle: RawHandle) -> Self {
148+
Self::new_unchecked(FromRawHandle::from_raw_handle(handle))
149+
}
150+
}
151+
152+
#[cfg(windows)]
153+
impl<T: AsRawFd + FromRawSocket> FromRawSocket for AsyncFd<T> {
154+
unsafe fn from_raw_socket(sock: RawSocket) -> Self {
155+
Self::new_unchecked(FromRawSocket::from_raw_socket(sock))
156+
}
157+
}

compio-fs/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub use stdio::*;
2323
mod utils;
2424
pub use utils::*;
2525

26+
mod async_fd;
27+
pub use async_fd::*;
28+
2629
#[cfg(windows)]
2730
pub mod named_pipe;
2831

0 commit comments

Comments
 (0)