Skip to content

Commit cc1ba57

Browse files
committed
Add sendmmsg syscall
1 parent 36dec54 commit cc1ba57

File tree

4 files changed

+249
-0
lines changed

4 files changed

+249
-0
lines changed

src/lib.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,21 @@ macro_rules! man_links {
172172
};
173173
}
174174

175+
macro_rules! define_mmsg_if_supported {
176+
($($item:item)*) => {
177+
$(
178+
#[cfg(all(
179+
feature = "all",
180+
any(
181+
target_os = "linux",
182+
target_os = "android",
183+
)
184+
))]
185+
$item
186+
)*
187+
};
188+
}
189+
175190
mod sockaddr;
176191
mod socket;
177192
mod sockref;
@@ -697,3 +712,107 @@ impl<'name, 'bufs, 'control> fmt::Debug for MsgHdrMut<'name, 'bufs, 'control> {
697712
"MsgHdrMut".fmt(fmt)
698713
}
699714
}
715+
716+
define_mmsg_if_supported! {
717+
/// Wraps `mmsghdr` on Unix for a `sendmmsg(2)` system call.
718+
///
719+
/// Also see [`MsgHdr`] for the variant used by `sendmsg(2)`.
720+
#[repr(transparent)]
721+
pub struct MMsgHdr<'addr, 'bufs, 'control> {
722+
inner: sys::mmsghdr,
723+
#[allow(clippy::type_complexity)]
724+
_lifetimes: PhantomData<(&'addr SockAddr, &'bufs IoSlice<'bufs>, &'control [u8])>,
725+
}
726+
727+
impl<'addr, 'bufs, 'control> MMsgHdr<'addr, 'bufs, 'control> {
728+
/// Create a new `MMsgHdr` from `MsgHdr` and with the `msg_len` set to zero.
729+
pub fn new(msg: MsgHdr<'_, '_, '_>) -> Self {
730+
Self {
731+
inner: sys::mmsghdr {
732+
msg_hdr: msg.inner,
733+
msg_len: 0,
734+
},
735+
_lifetimes: PhantomData,
736+
}
737+
}
738+
739+
/// Number of bytes transmitted.
740+
pub fn transmitted_bytes(&self) -> u32 {
741+
self.inner.msg_len
742+
}
743+
}
744+
745+
impl<'addr, 'bufs, 'control> From<MsgHdr<'addr, 'bufs, 'control>>
746+
for MMsgHdr<'addr, 'bufs, 'control>
747+
{
748+
fn from(value: MsgHdr<'_, '_, '_>) -> Self {
749+
Self::new(value)
750+
}
751+
}
752+
753+
impl<'addr, 'bufs, 'control> fmt::Debug for MMsgHdr<'addr, 'bufs, 'control> {
754+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
755+
f.write_str("MMsgHdr({})", self.len())
756+
}
757+
}
758+
759+
/// Wraps `mmsghdr` on Unix for a `recvmmsg(2)` system call.
760+
///
761+
/// Also see [`MsgHdrMut`] for the variant used by `recvmsg(2)`.
762+
#[repr(transparent)]
763+
pub struct MMsgHdrMut<'addr, 'bufs, 'control> {
764+
inner: sys::mmsghdr,
765+
#[allow(clippy::type_complexity)]
766+
_lifetimes: PhantomData<(
767+
&'addr mut SockAddr,
768+
&'bufs mut MaybeUninitSlice<'bufs>,
769+
&'control mut [u8],
770+
)>,
771+
}
772+
773+
impl<'addr, 'bufs, 'control> MMsgHdrMut<'addr, 'bufs, 'control> {
774+
/// Create a new `MMsgHdrMut` from `MsgHdrMut` and with the `msg_len` set to zero.
775+
pub fn new(msg: MsgHdrMut<'_, '_, '_>) -> Self {
776+
Self {
777+
inner: sys::mmsghdr {
778+
msg_hdr: msg.inner,
779+
msg_len: 0,
780+
},
781+
_lifetimes: PhantomData,
782+
}
783+
}
784+
785+
/// Number of received bytes.
786+
pub fn transmitted_bytes(&self) -> u32 {
787+
self.inner.msg_len
788+
}
789+
790+
/// Returns the flags of the message.
791+
pub fn flags(&self) -> RecvFlags {
792+
sys::msghdr_flags(&self.inner.msg_hdr)
793+
}
794+
795+
/// Gets the length of the control buffer.
796+
///
797+
/// Can be used to determine how much, if any, of the control buffer was filled by `recvmsg`.
798+
///
799+
/// Corresponds to `msg_controllen` on Unix and `Control.len` on Windows.
800+
pub fn control_len(&self) -> usize {
801+
sys::msghdr_control_len(&self.inner.msg_hdr)
802+
}
803+
}
804+
805+
impl<'addr, 'bufs, 'control> From<MsgHdrMut<'addr, 'bufs, 'control>>
806+
for MMsgHdrMut<'addr, 'bufs, 'control>
807+
{
808+
fn from(value: MsgHdrMut<'_, '_, '_>) -> Self {
809+
Self::new(value)
810+
}
811+
}
812+
813+
impl<'addr, 'bufs, 'control> fmt::Debug for MMsgHdrMut<'addr, 'bufs, 'control> {
814+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815+
f.write_str("MMsgHdrMut({})", self.len())
816+
}
817+
}
818+
}

src/socket.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ use crate::{Domain, Protocol, SockAddr, TcpKeepalive, Type};
2727
#[cfg(not(target_os = "redox"))]
2828
use crate::{MaybeUninitSlice, MsgHdr, RecvFlags};
2929

30+
define_mmsg_if_supported! {
31+
use crate::{MMsgHdr, MMsgHdrMut};
32+
}
33+
3034
/// Owned wrapper around a system socket.
3135
///
3236
/// This type simply wraps an instance of a file descriptor (`c_int`) on Unix
@@ -2196,6 +2200,25 @@ impl Socket {
21962200
pub fn original_dst_ipv6(&self) -> io::Result<SockAddr> {
21972201
sys::original_dst_ipv6(self.as_raw())
21982202
}
2203+
2204+
define_mmsg_if_supported! {
2205+
/// Receive multiple messages on the socket using a single system call.
2206+
#[doc = man_links!(unix: recvmmsg(2))]
2207+
pub fn recvmmsg(
2208+
&self,
2209+
msgvec: &mut [MMsgHdrMut<'_, '_, '_>],
2210+
flags: c_int,
2211+
timeout: Option<Duration>,
2212+
) -> io::Result<usize> {
2213+
sys::recvmmsg(self.as_raw(), msgvec, flags, timeout)
2214+
}
2215+
2216+
/// Send multiple messages on the socket using a single system call.
2217+
#[doc = man_links!(unix: sendmmsg(2))]
2218+
pub fn sendmmsg(&self, msgvec: &mut [MMsgHdr<'_, '_, '_>], flags: c_int) -> io::Result<usize> {
2219+
sys::sendmmsg(self.as_raw(), msgvec, flags)
2220+
}
2221+
}
21992222
}
22002223

22012224
impl Read for Socket {

src/sys/unix.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ use crate::{Domain, Protocol, SockAddr, TcpKeepalive, Type};
8080
#[cfg(not(target_os = "redox"))]
8181
use crate::{MsgHdr, MsgHdrMut, RecvFlags};
8282

83+
define_mmsg_if_supported! {
84+
use crate::{MMsgHdr, MMsgHdrMut};
85+
86+
// Used in `MMsgHdr`.
87+
pub(crate) use libc::mmsghdr;
88+
}
89+
8390
pub(crate) use libc::c_int;
8491

8592
// Used in `Domain`.
@@ -1076,6 +1083,37 @@ pub(crate) fn recvmsg(
10761083
syscall!(recvmsg(fd, &mut msg.inner, flags)).map(|n| n as usize)
10771084
}
10781085

1086+
define_mmsg_if_supported! {
1087+
pub(crate) fn recvmmsg(
1088+
fd: Socket,
1089+
msgvec: &mut [MMsgHdrMut<'_, '_, '_>],
1090+
flags: c_int,
1091+
timeout: Option<Duration>,
1092+
) -> io::Result<usize> {
1093+
if cfg!(target_env = "musl") {
1094+
debug_assert!(flags >= 0, "socket flags must be non-negative");
1095+
}
1096+
1097+
let mut timeout = timeout.map(into_timespec);
1098+
let timeout_ptr = timeout
1099+
.as_mut()
1100+
.map(|t| t as *mut _)
1101+
.unwrap_or(ptr::null_mut());
1102+
1103+
syscall!(recvmmsg(
1104+
fd,
1105+
// SAFETY: `MMsgHdrMut` is `#[repr(transparent)]` and wraps a `libc::mmsghdr`
1106+
msgvec.as_mut_ptr() as *mut mmsghdr,
1107+
msgvec.len() as _,
1108+
// On glibc: `c_int` to `c_int` (no change).
1109+
// On musl: `c_int` to `c_uint`.
1110+
flags as _,
1111+
timeout_ptr
1112+
))
1113+
.map(|n| n as usize)
1114+
}
1115+
}
1116+
10791117
pub(crate) fn send(fd: Socket, buf: &[u8], flags: c_int) -> io::Result<usize> {
10801118
syscall!(send(
10811119
fd,
@@ -1120,6 +1158,29 @@ pub(crate) fn sendmsg(fd: Socket, msg: &MsgHdr<'_, '_, '_>, flags: c_int) -> io:
11201158
syscall!(sendmsg(fd, &msg.inner, flags)).map(|n| n as usize)
11211159
}
11221160

1161+
define_mmsg_if_supported! {
1162+
pub(crate) fn sendmmsg(
1163+
fd: Socket,
1164+
msgvec: &mut [MMsgHdr<'_, '_, '_>],
1165+
flags: c_int,
1166+
) -> io::Result<usize> {
1167+
if cfg!(target_env = "musl") {
1168+
debug_assert!(flags >= 0, "socket flags must be non-negative");
1169+
}
1170+
1171+
syscall!(sendmmsg(
1172+
fd,
1173+
// SAFETY: `MMsgHdr` is `#[repr(transparent)]` and wraps a `libc::mmsghdr`
1174+
msgvec.as_mut_ptr() as *mut mmsghdr,
1175+
msgvec.len() as _,
1176+
// On glibc: `c_int` to `c_int` (no change).
1177+
// On musl: `c_int` to `c_uint`.
1178+
flags as _
1179+
))
1180+
.map(|n| n as usize)
1181+
}
1182+
}
1183+
11231184
/// Wrapper around `getsockopt` to deal with platform specific timeouts.
11241185
pub(crate) fn timeout_opt(fd: Socket, opt: c_int, val: c_int) -> io::Result<Option<Duration>> {
11251186
unsafe { getsockopt(fd, opt, val).map(from_timeval) }
@@ -1161,6 +1222,26 @@ fn into_timeval(duration: Option<Duration>) -> libc::timeval {
11611222
}
11621223
}
11631224

1225+
#[allow(dead_code)] // Used by `recvmmsg()`.
1226+
fn into_timespec(duration: Duration) -> libc::timespec {
1227+
// https://github.com/rust-lang/libc/issues/1848
1228+
#[cfg_attr(target_env = "musl", allow(deprecated))]
1229+
libc::timespec {
1230+
tv_sec: min(duration.as_secs(), libc::time_t::MAX as u64) as libc::time_t,
1231+
#[cfg(any(
1232+
all(target_arch = "x86_64", target_pointer_width = "32"),
1233+
target_pointer_width = "64"
1234+
))]
1235+
tv_nsec: duration.subsec_nanos() as i64,
1236+
1237+
#[cfg(not(any(
1238+
all(target_arch = "x86_64", target_pointer_width = "32"),
1239+
target_pointer_width = "64"
1240+
)))]
1241+
tv_nsec: duration.subsec_nanos().clamp(0, i32::MAX as u32) as i32,
1242+
}
1243+
}
1244+
11641245
#[cfg(all(
11651246
feature = "all",
11661247
not(any(target_os = "haiku", target_os = "openbsd", target_os = "vita"))

tests/socket.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,32 @@ fn sendmsg() {
769769
assert_eq!(received, DATA.len());
770770
}
771771

772+
#[test]
773+
#[cfg(any(target_os = "linux", target_os = "android"))]
774+
fn sendmmsg() {
775+
let (socket_a, socket_b) = udp_pair_unconnected();
776+
777+
const DATA: &[u8] = b"Hello, World!";
778+
779+
let bufs = &[IoSlice::new(DATA)];
780+
let addr_b = socket_b.local_addr().unwrap();
781+
let batched_msgs = Vec::new();
782+
for _ in 0..5 {
783+
batched_msgs.push(socket2::MMsgHdr::new(
784+
socket2::MsgHdr::new().with_addr(&addr_b).with_buffers(bufs),
785+
));
786+
}
787+
788+
let sent = socket_a.sendmmsg(batched_msgs.as_mut_slice(), 0).unwrap();
789+
790+
let mut sent_data = 0;
791+
// Calculate transmitted length
792+
for i in 0..5 {
793+
sent_data += batched_msgs[i].transmitted_bytes()
794+
}
795+
assert!(sent_data == 5 * DATA.len());
796+
}
797+
772798
#[test]
773799
#[cfg(not(any(target_os = "redox", target_os = "vita")))]
774800
fn recv_vectored_truncated() {

0 commit comments

Comments
 (0)