Skip to content

Commit 84ed6e6

Browse files
authored
Merge pull request #236 from Berrysoft/dev/async-socket
feat(driver): add CreateSocket
2 parents 7479c87 + fd75e80 commit 84ed6e6

File tree

14 files changed

+139
-33
lines changed

14 files changed

+139
-33
lines changed

.github/workflows/ci_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
setup:
2525
- os: "ubuntu-20.04"
2626
- os: "ubuntu-22.04"
27-
features: "io-uring-sqe128,io-uring-cqe32"
27+
features: "io-uring-sqe128,io-uring-cqe32,io-uring-socket"
2828
- os: "ubuntu-20.04"
2929
features: "polling"
3030
no_default_features: true

compio-driver/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ polling = ["dep:polling", "dep:os_pipe"]
8282

8383
io-uring-sqe128 = []
8484
io-uring-cqe32 = []
85+
io-uring-socket = []
8586

8687
iocp-global = []
8788

compio-driver/src/fusion/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,11 @@ mod driver_type {
8686
Close::CODE,
8787
Shutdown::CODE,
8888
// Linux kernel 5.19
89-
#[cfg(any(feature = "io-uring-seq128", feature = "io-uring-cqe32"))]
89+
#[cfg(any(
90+
feature = "io-uring-seq128",
91+
feature = "io-uring-cqe32",
92+
feature = "io-uring-socket"
93+
))]
9094
Socket::CODE,
9195
];
9296

compio-driver/src/iour/op.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{ffi::CString, marker::PhantomPinned, os::fd::RawFd, pin::Pin};
1+
use std::{ffi::CString, io, marker::PhantomPinned, os::fd::RawFd, pin::Pin};
22

33
use compio_buf::{
44
BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
@@ -12,7 +12,7 @@ use socket2::SockAddr;
1212

1313
use super::OpCode;
1414
pub use crate::unix::op::*;
15-
use crate::{op::*, OpEntry};
15+
use crate::{op::*, syscall, OpEntry};
1616

1717
impl<
1818
D: std::marker::Send + 'static,
@@ -254,6 +254,22 @@ impl OpCode for HardLink {
254254
}
255255
}
256256

257+
impl OpCode for CreateSocket {
258+
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
259+
if cfg!(feature = "io-uring-socket") {
260+
opcode::Socket::new(self.domain, self.socket_type, self.protocol)
261+
.build()
262+
.into()
263+
} else {
264+
OpEntry::Blocking
265+
}
266+
}
267+
268+
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
269+
Ok(syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _)
270+
}
271+
}
272+
257273
impl OpCode for ShutdownSocket {
258274
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
259275
opcode::Shutdown::new(Fd(self.fd), self.how())

compio-driver/src/op.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ pub use crate::sys::op::{
1616
};
1717
#[cfg(unix)]
1818
pub use crate::sys::op::{
19-
CreateDir, FileStat, HardLink, OpenFile, PathStat, ReadVectoredAt, Rename, Symlink, Unlink,
20-
WriteVectoredAt,
19+
CreateDir, CreateSocket, FileStat, HardLink, OpenFile, PathStat, ReadVectoredAt, Rename,
20+
Symlink, Unlink, WriteVectoredAt,
2121
};
2222
use crate::sys::{sockaddr_storage, socklen_t, RawFd};
2323

compio-driver/src/poll/op.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,18 @@ impl OpCode for HardLink {
347347
}
348348
}
349349

350+
impl OpCode for CreateSocket {
351+
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
352+
Ok(Decision::blocking_dummy())
353+
}
354+
355+
fn on_event(self: Pin<&mut Self>, _: &Event) -> Poll<io::Result<usize>> {
356+
Poll::Ready(Ok(
357+
syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _,
358+
))
359+
}
360+
}
361+
350362
impl OpCode for ShutdownSocket {
351363
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
352364
Ok(Decision::blocking_dummy())

compio-driver/src/unix/op.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,24 @@ impl HardLink {
209209
}
210210
}
211211

212+
/// Create a socket.
213+
pub struct CreateSocket {
214+
pub(crate) domain: i32,
215+
pub(crate) socket_type: i32,
216+
pub(crate) protocol: i32,
217+
}
218+
219+
impl CreateSocket {
220+
/// Create [`CreateSocket`].
221+
pub fn new(domain: i32, socket_type: i32, protocol: i32) -> Self {
222+
Self {
223+
domain,
224+
socket_type,
225+
protocol,
226+
}
227+
}
228+
}
229+
212230
impl ShutdownSocket {
213231
pub(crate) fn how(&self) -> i32 {
214232
match self.how {

compio-net/src/socket.rs

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::{future::Future, io, mem::ManuallyDrop};
22

33
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4+
#[cfg(unix)]
5+
use compio_driver::op::CreateSocket;
46
use compio_driver::{
57
impl_raw_fd,
68
op::{
@@ -32,8 +34,62 @@ impl Socket {
3234
self.socket.local_addr()
3335
}
3436

35-
pub fn new(domain: Domain, ty: Type, protocol: Option<Protocol>) -> io::Result<Self> {
36-
let socket = Socket2::new(domain, ty, protocol)?;
37+
#[cfg(windows)]
38+
pub async fn new(domain: Domain, ty: Type, protocol: Option<Protocol>) -> io::Result<Self> {
39+
let socket =
40+
compio_runtime::spawn_blocking(move || Socket2::new(domain, ty, protocol)).await?;
41+
Self::from_socket2(socket)
42+
}
43+
44+
#[cfg(unix)]
45+
pub async fn new(domain: Domain, ty: Type, protocol: Option<Protocol>) -> io::Result<Self> {
46+
use compio_driver::FromRawFd;
47+
48+
#[allow(unused_mut)]
49+
let mut ty: i32 = ty.into();
50+
#[cfg(any(
51+
target_os = "android",
52+
target_os = "dragonfly",
53+
target_os = "freebsd",
54+
target_os = "fuchsia",
55+
target_os = "hurd",
56+
target_os = "illumos",
57+
target_os = "linux",
58+
target_os = "netbsd",
59+
target_os = "openbsd",
60+
))]
61+
{
62+
ty |= libc::SOCK_CLOEXEC;
63+
}
64+
65+
let op = CreateSocket::new(
66+
domain.into(),
67+
ty,
68+
protocol.map(|p| p.into()).unwrap_or_default(),
69+
);
70+
let BufResult(res, _) = Runtime::current().submit(op).await;
71+
let socket = unsafe { Socket2::from_raw_fd(res? as _) };
72+
#[cfg(not(any(
73+
target_os = "android",
74+
target_os = "dragonfly",
75+
target_os = "freebsd",
76+
target_os = "fuchsia",
77+
target_os = "hurd",
78+
target_os = "illumos",
79+
target_os = "linux",
80+
target_os = "netbsd",
81+
target_os = "openbsd",
82+
target_os = "espidf",
83+
target_os = "vita",
84+
)))]
85+
socket.set_cloexec(true)?;
86+
#[cfg(any(
87+
target_os = "ios",
88+
target_os = "macos",
89+
target_os = "tvos",
90+
target_os = "watchos",
91+
))]
92+
socket.set_nosigpipe(true)?;
3793
// On Linux we use blocking socket
3894
// Newer kernels have the patch that allows to arm io_uring poll mechanism for
3995
// non blocking socket when there is no connections in listen queue
@@ -48,8 +104,8 @@ impl Socket {
48104
Self::from_socket2(socket)
49105
}
50106

51-
pub fn bind(addr: &SockAddr, ty: Type, protocol: Option<Protocol>) -> io::Result<Self> {
52-
let socket = Self::new(addr.domain(), ty, protocol)?;
107+
pub async fn bind(addr: &SockAddr, ty: Type, protocol: Option<Protocol>) -> io::Result<Self> {
108+
let socket = Self::new(addr.domain(), ty, protocol).await?;
53109
socket.socket.bind(addr)?;
54110
Ok(socket)
55111
}
@@ -97,19 +153,16 @@ impl Socket {
97153

98154
#[cfg(windows)]
99155
pub async fn accept(&self) -> io::Result<(Self, SockAddr)> {
100-
let local_addr = self.local_addr()?;
101-
// We should allow users sending this accepted socket to a new thread.
102-
let accept_sock = Socket2::new(
103-
local_addr.domain(),
104-
self.socket.r#type()?,
105-
self.socket.protocol()?,
106-
)?;
156+
let domain = self.local_addr()?.domain();
157+
let ty = self.socket.r#type()?;
158+
let protocol = self.socket.protocol()?;
159+
let accept_sock = Self::new(domain, ty, protocol).await?;
107160
let op = Accept::new(self.as_raw_fd(), accept_sock.as_raw_fd() as _);
108161
let BufResult(res, op) = Runtime::current().submit(op).await;
109162
res?;
110163
op.update_context()?;
111164
let addr = op.into_addr()?;
112-
Ok((Self::from_socket2(accept_sock)?, addr))
165+
Ok((accept_sock, addr))
113166
}
114167

115168
pub fn close(self) -> impl Future<Output = io::Result<()>> {

compio-net/src/tcp.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ impl TcpListener {
5555
/// to this listener.
5656
pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
5757
super::each_addr(addr, |addr| async move {
58-
let socket = Socket::bind(&SockAddr::from(addr), Type::STREAM, Some(Protocol::TCP))?;
58+
let socket =
59+
Socket::bind(&SockAddr::from(addr), Type::STREAM, Some(Protocol::TCP)).await?;
5960
socket.listen(128)?;
6061
Ok(Self { inner: socket })
6162
})
@@ -157,9 +158,9 @@ impl TcpStream {
157158
"Unsupported address domain.",
158159
));
159160
};
160-
Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP))?
161+
Socket::bind(&bind_addr, Type::STREAM, Some(Protocol::TCP)).await?
161162
} else {
162-
Socket::new(addr2.domain(), Type::STREAM, Some(Protocol::TCP))?
163+
Socket::new(addr2.domain(), Type::STREAM, Some(Protocol::TCP)).await?
163164
};
164165
socket.connect_async(&addr2).await?;
165166
Ok(Self { inner: socket })

compio-net/src/udp.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ impl UdpSocket {
9696
pub async fn bind(addr: impl ToSocketAddrsAsync) -> io::Result<Self> {
9797
super::each_addr(addr, |addr| async move {
9898
Ok(Self {
99-
inner: Socket::bind(&SockAddr::from(addr), Type::DGRAM, Some(Protocol::UDP))?,
99+
inner: Socket::bind(&SockAddr::from(addr), Type::DGRAM, Some(Protocol::UDP))
100+
.await?,
100101
})
101102
})
102103
.await

0 commit comments

Comments
 (0)