Skip to content

Commit f562364

Browse files
authored
Add recvmsg implmentation (#252)
Adds the missing `recvmsg` function to the udp socket. the level of duplication required vs `recv_from` is uncomfortable, but required due to the lack of specialization.
1 parent d8ae38b commit f562364

File tree

4 files changed

+117
-0
lines changed

4 files changed

+117
-0
lines changed

src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ mod readv;
2323

2424
mod recv_from;
2525

26+
mod recvmsg;
27+
2628
mod rename_at;
2729

2830
mod send_to;

src/io/recvmsg.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use crate::runtime::driver::op::{Completable, CqeResult, Op};
2+
use crate::runtime::CONTEXT;
3+
use crate::{buf::BoundedBufMut, io::SharedFd, BufResult};
4+
use socket2::SockAddr;
5+
use std::{
6+
io::IoSliceMut,
7+
{boxed::Box, io, net::SocketAddr},
8+
};
9+
10+
pub(crate) struct RecvMsg<T> {
11+
#[allow(dead_code)]
12+
fd: SharedFd,
13+
pub(crate) buf: Vec<T>,
14+
#[allow(dead_code)]
15+
io_slices: Vec<IoSliceMut<'static>>,
16+
pub(crate) socket_addr: Box<SockAddr>,
17+
pub(crate) msghdr: Box<libc::msghdr>,
18+
}
19+
20+
impl<T: BoundedBufMut> Op<RecvMsg<T>> {
21+
pub(crate) fn recvmsg(fd: &SharedFd, mut bufs: Vec<T>) -> io::Result<Op<RecvMsg<T>>> {
22+
use io_uring::{opcode, types};
23+
24+
let mut io_slices = Vec::with_capacity(bufs.len());
25+
for buf in &mut bufs {
26+
io_slices.push(IoSliceMut::new(unsafe {
27+
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
28+
}));
29+
}
30+
31+
let socket_addr = Box::new(unsafe { SockAddr::init(|_, _| Ok(()))?.1 });
32+
33+
let mut msghdr: Box<libc::msghdr> = Box::new(unsafe { std::mem::zeroed() });
34+
msghdr.msg_iov = io_slices.as_mut_ptr().cast();
35+
msghdr.msg_iovlen = io_slices.len() as _;
36+
msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void;
37+
msghdr.msg_namelen = socket_addr.len();
38+
39+
CONTEXT.with(|x| {
40+
x.handle().expect("Not in a runtime context").submit_op(
41+
RecvMsg {
42+
fd: fd.clone(),
43+
buf: bufs,
44+
io_slices,
45+
socket_addr,
46+
msghdr,
47+
},
48+
|recv_from| {
49+
opcode::RecvMsg::new(
50+
types::Fd(recv_from.fd.raw_fd()),
51+
recv_from.msghdr.as_mut() as *mut _,
52+
)
53+
.build()
54+
},
55+
)
56+
})
57+
}
58+
}
59+
60+
impl<T> Completable for RecvMsg<T>
61+
where
62+
T: BoundedBufMut,
63+
{
64+
type Output = BufResult<(usize, SocketAddr), Vec<T>>;
65+
66+
fn complete(self, cqe: CqeResult) -> Self::Output {
67+
// Convert the operation result to `usize`
68+
let res = cqe.result.map(|v| v as usize);
69+
// Recover the buffers
70+
let mut bufs = self.buf;
71+
72+
let socket_addr = (*self.socket_addr).as_socket();
73+
74+
let res = res.map(|n| {
75+
let socket_addr: SocketAddr = socket_addr.unwrap();
76+
77+
let mut bytes = n;
78+
for buf in &mut bufs {
79+
// Safety: the kernel wrote `n` bytes to the buffer.
80+
unsafe {
81+
buf.set_init(bytes);
82+
}
83+
let total = buf.bytes_total();
84+
if bytes > total {
85+
bytes -= total;
86+
} else {
87+
// In the current API bytes_init is a watermark,
88+
// so remaining don't need zeroing.
89+
break;
90+
}
91+
}
92+
(n, socket_addr)
93+
});
94+
95+
(res, bufs)
96+
}
97+
}

src/io/socket.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,14 @@ impl Socket {
179179
op.await
180180
}
181181

182+
pub(crate) async fn recvmsg<T: BoundedBufMut>(
183+
&self,
184+
buf: Vec<T>,
185+
) -> crate::BufResult<(usize, SocketAddr), Vec<T>> {
186+
let op = Op::recvmsg(&self.fd, buf).unwrap();
187+
op.await
188+
}
189+
182190
pub(crate) async fn accept(&self) -> io::Result<(Socket, Option<SocketAddr>)> {
183191
let op = Op::accept(&self.fd)?;
184192
op.await

src/net/udp.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,16 @@ impl UdpSocket {
283283
self.inner.recv_from(buf).await
284284
}
285285

286+
/// Receives a single datagram message on the socket, into multiple buffers
287+
///
288+
/// On success, returns the number of bytes read and the origin.
289+
pub async fn recvmsg<T: BoundedBufMut>(
290+
&self,
291+
buf: Vec<T>,
292+
) -> crate::BufResult<(usize, SocketAddr), Vec<T>> {
293+
self.inner.recvmsg(buf).await
294+
}
295+
286296
/// Reads a packet of data from the socket into the buffer.
287297
///
288298
/// Returns the original buffer and quantity of data read.

0 commit comments

Comments
 (0)