Skip to content

Commit 47fbddf

Browse files
authored
net: add UdpStream sendmsg. (#263)
Fixes #261
1 parent 1a089de commit 47fbddf

File tree

4 files changed

+135
-0
lines changed

4 files changed

+135
-0
lines changed

src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ mod send_to;
3131

3232
mod send_zc;
3333

34+
mod sendmsg;
35+
3436
mod sendmsg_zc;
3537

3638
mod shared_fd;

src/io/sendmsg.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use crate::buf::BoundedBuf;
2+
use crate::io::SharedFd;
3+
use crate::runtime::driver::op::{Completable, CqeResult, Op};
4+
use crate::runtime::CONTEXT;
5+
use socket2::SockAddr;
6+
use std::io;
7+
use std::io::IoSlice;
8+
use std::net::SocketAddr;
9+
10+
pub(crate) struct SendMsg<T, U> {
11+
_fd: SharedFd,
12+
_io_bufs: Vec<T>,
13+
_io_slices: Vec<IoSlice<'static>>,
14+
_socket_addr: Option<Box<SockAddr>>,
15+
msg_control: Option<U>,
16+
msghdr: libc::msghdr,
17+
}
18+
19+
impl<T: BoundedBuf, U: BoundedBuf> Op<SendMsg<T, U>> {
20+
pub(crate) fn sendmsg(
21+
fd: &SharedFd,
22+
io_bufs: Vec<T>,
23+
socket_addr: Option<SocketAddr>,
24+
msg_control: Option<U>,
25+
) -> io::Result<Self> {
26+
use io_uring::{opcode, types};
27+
28+
let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() };
29+
30+
let mut io_slices: Vec<IoSlice<'static>> = Vec::with_capacity(io_bufs.len());
31+
32+
for io_buf in &io_bufs {
33+
io_slices.push(IoSlice::new(unsafe {
34+
std::slice::from_raw_parts(io_buf.stable_ptr(), io_buf.bytes_init())
35+
}))
36+
}
37+
38+
msghdr.msg_iov = io_slices.as_ptr() as *mut _;
39+
msghdr.msg_iovlen = io_slices.len() as _;
40+
41+
let socket_addr = match socket_addr {
42+
Some(_socket_addr) => {
43+
let socket_addr = Box::new(SockAddr::from(_socket_addr));
44+
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
45+
msghdr.msg_namelen = socket_addr.len();
46+
Some(socket_addr)
47+
}
48+
None => {
49+
msghdr.msg_name = std::ptr::null_mut();
50+
msghdr.msg_namelen = 0;
51+
None
52+
}
53+
};
54+
55+
match msg_control {
56+
Some(ref _msg_control) => {
57+
msghdr.msg_control = _msg_control.stable_ptr() as *mut _;
58+
msghdr.msg_controllen = _msg_control.bytes_init();
59+
}
60+
None => {
61+
msghdr.msg_control = std::ptr::null_mut();
62+
msghdr.msg_controllen = 0_usize;
63+
}
64+
}
65+
66+
CONTEXT.with(|x| {
67+
x.handle().expect("Not in a runtime context").submit_op(
68+
SendMsg {
69+
_fd: fd.clone(),
70+
_io_bufs: io_bufs,
71+
_socket_addr: socket_addr,
72+
_io_slices: io_slices,
73+
msg_control,
74+
msghdr,
75+
},
76+
|sendmsg| {
77+
opcode::SendMsg::new(
78+
types::Fd(sendmsg._fd.raw_fd()),
79+
&sendmsg.msghdr as *const _,
80+
)
81+
.build()
82+
},
83+
)
84+
})
85+
}
86+
}
87+
88+
impl<T, U> Completable for SendMsg<T, U> {
89+
type Output = (io::Result<usize>, Vec<T>, Option<U>);
90+
91+
fn complete(self, cqe: CqeResult) -> (io::Result<usize>, Vec<T>, Option<U>) {
92+
// Convert the operation result to `usize`
93+
let res = cqe.result.map(|n| n as usize);
94+
95+
// Recover the data buffers.
96+
let io_bufs = self._io_bufs;
97+
98+
// Recover the ancillary data buffer.
99+
let msg_control = self.msg_control;
100+
101+
(res, io_bufs, msg_control)
102+
}
103+
}

src/io/socket.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,16 @@ impl Socket {
148148
op.await
149149
}
150150

151+
pub(crate) async fn sendmsg<T: BoundedBuf, U: BoundedBuf>(
152+
&self,
153+
io_slices: Vec<T>,
154+
socket_addr: Option<SocketAddr>,
155+
msg_control: Option<U>,
156+
) -> (io::Result<usize>, Vec<T>, Option<U>) {
157+
let op = Op::sendmsg(&self.fd, io_slices, socket_addr, msg_control).unwrap();
158+
op.await
159+
}
160+
151161
pub(crate) async fn sendmsg_zc<T: BoundedBuf, U: BoundedBuf>(
152162
&self,
153163
io_slices: Vec<T>,

src/net/udp.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,26 @@ impl UdpSocket {
244244
self.inner.send_zc(buf).await
245245
}
246246

247+
/// Sends a message on the socket using a msghdr.
248+
///
249+
/// Returns a tuple of:
250+
///
251+
/// * Result containing bytes written on success
252+
/// * The original `io_slices` `Vec<T>`
253+
/// * The original `msg_contol` `Option<U>`
254+
///
255+
/// Consider using [`Self::sendmsg_zc`] for a zero-copy alternative.
256+
pub async fn sendmsg<T: BoundedBuf, U: BoundedBuf>(
257+
&self,
258+
io_slices: Vec<T>,
259+
socket_addr: Option<SocketAddr>,
260+
msg_control: Option<U>,
261+
) -> (io::Result<usize>, Vec<T>, Option<U>) {
262+
self.inner
263+
.sendmsg(io_slices, socket_addr, msg_control)
264+
.await
265+
}
266+
247267
/// Sends a message on the socket using a msghdr.
248268
///
249269
/// Returns a tuple of:

0 commit comments

Comments
 (0)