Skip to content

Commit d690167

Browse files
committed
basic wrappers for sendmmsg/recvmmsg (#823)
1 parent 238b8f4 commit d690167

File tree

5 files changed

+645
-76
lines changed

5 files changed

+645
-76
lines changed

crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs

Lines changed: 164 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ use std::{
44
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
55
os::unix::io::{AsRawFd, RawFd},
66
pin::Pin,
7+
ptr,
8+
sync::atomic::{AtomicBool, Ordering},
79
task::{self, Poll},
810
};
911

10-
use log::{error, warn};
12+
use log::{debug, error, warn};
1113
use pin_project::pin_project;
12-
use socket2::{Domain, Protocol, Socket, Type};
14+
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
1315
use tokio::{
1416
io::{AsyncRead, AsyncWrite, ReadBuf},
1517
net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket},
@@ -18,6 +20,7 @@ use tokio_tfo::TfoStream;
1820

1921
use crate::net::{
2022
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
23+
udp::{BatchRecvMessage, BatchSendMessage},
2124
AddrFamily,
2225
ConnectOpts,
2326
};
@@ -241,3 +244,162 @@ pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) ->
241244

242245
Ok(socket)
243246
}
247+
248+
static SUPPORT_BATCH_SEND_RECV_MSG: AtomicBool = AtomicBool::new(true);
249+
250+
fn recvmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchRecvMessage<'_>) -> io::Result<()> {
251+
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
252+
253+
let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
254+
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
255+
let sock_addr = unsafe { SockAddr::new(addr_storage, addr_len) };
256+
hdr.msg_name = sock_addr.as_ptr() as *mut _;
257+
hdr.msg_namelen = sock_addr.len() as _;
258+
259+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
260+
hdr.msg_iovlen = msg.data.len() as _;
261+
262+
let ret = unsafe { libc::recvmsg(sock.as_raw_fd(), &mut hdr as *mut _, 0) };
263+
if ret < 0 {
264+
return Err(io::Error::last_os_error());
265+
}
266+
267+
msg.addr = sock_addr.as_socket().expect("SockAddr.as_socket");
268+
msg.data_len = ret as usize;
269+
270+
Ok(())
271+
}
272+
273+
pub fn batch_recvmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchRecvMessage<'_>]) -> io::Result<usize> {
274+
if msgs.is_empty() {
275+
return Ok(0);
276+
}
277+
278+
if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
279+
recvmsg_fallback(sock, &mut msgs[0])?;
280+
return Ok(1);
281+
}
282+
283+
let mut vec_msg_name = Vec::with_capacity(msgs.len());
284+
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());
285+
286+
for msg in msgs.iter_mut() {
287+
let mut hdr: libc::mmsghdr = unsafe { mem::zeroed() };
288+
289+
let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
290+
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
291+
292+
vec_msg_name.push(unsafe { SockAddr::new(addr_storage, addr_len) });
293+
let sock_addr = vec_msg_name.last_mut().unwrap();
294+
hdr.msg_hdr.msg_name = sock_addr.as_ptr() as *mut _;
295+
hdr.msg_hdr.msg_namelen = sock_addr.len() as _;
296+
297+
hdr.msg_hdr.msg_iov = msg.data.as_ptr() as *mut _;
298+
hdr.msg_hdr.msg_iovlen = msg.data.len() as _;
299+
300+
vec_msg_hdr.push(hdr);
301+
}
302+
303+
let ret = unsafe {
304+
libc::recvmmsg(
305+
sock.as_raw_fd(),
306+
vec_msg_hdr.as_mut_ptr(),
307+
vec_msg_hdr.len() as _,
308+
0,
309+
ptr::null(),
310+
)
311+
};
312+
if ret < 0 {
313+
let err = io::Error::last_os_error();
314+
if let Some(libc::ENOSYS) = err.raw_os_error() {
315+
debug!("recvmmsg is not supported, fallback to recvmsg, error: {:?}", err);
316+
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);
317+
318+
recvmsg_fallback(sock, &mut msgs[0])?;
319+
return Ok(1);
320+
}
321+
return Err(err);
322+
}
323+
324+
for idx in 0..ret as usize {
325+
let msg = &mut msgs[idx];
326+
let hdr = &vec_msg_hdr[idx];
327+
let name = &vec_msg_name[idx];
328+
msg.addr = name.as_socket().expect("SockAddr.as_socket");
329+
msg.data_len = hdr.msg_len as usize;
330+
}
331+
332+
Ok(ret as usize)
333+
}
334+
335+
fn sendmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchSendMessage<'_>) -> io::Result<()> {
336+
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
337+
338+
let sock_addr = msg.addr.map(SockAddr::from);
339+
if let Some(ref sa) = sock_addr {
340+
hdr.msg_name = sa.as_ptr() as *mut _;
341+
hdr.msg_namelen = sa.len() as _;
342+
}
343+
344+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
345+
hdr.msg_iovlen = msg.data.len() as _;
346+
347+
let ret = unsafe { libc::sendmsg(sock.as_raw_fd(), &hdr as *const _, 0) };
348+
if ret < 0 {
349+
return Err(io::Error::last_os_error());
350+
}
351+
msg.data_len = ret as usize;
352+
353+
Ok(())
354+
}
355+
356+
pub fn batch_sendmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchSendMessage<'_>]) -> io::Result<usize> {
357+
if msgs.is_empty() {
358+
return Ok(0);
359+
}
360+
361+
if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
362+
sendmsg_fallback(sock, &mut msgs[0])?;
363+
return Ok(1);
364+
}
365+
366+
let mut vec_msg_name = Vec::with_capacity(msgs.len());
367+
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());
368+
369+
for msg in msgs.iter_mut() {
370+
let mut hdr: libc::mmsghdr = unsafe { mem::zeroed() };
371+
372+
if let Some(addr) = msg.addr {
373+
vec_msg_name.push(SockAddr::from(addr));
374+
let sock_addr = vec_msg_name.last_mut().unwrap();
375+
hdr.msg_hdr.msg_name = sock_addr.as_ptr() as *mut _;
376+
hdr.msg_hdr.msg_namelen = sock_addr.len() as _;
377+
}
378+
379+
hdr.msg_hdr.msg_iov = msg.data.as_ptr() as *mut _;
380+
hdr.msg_hdr.msg_iovlen = msg.data.len() as _;
381+
382+
vec_msg_hdr.push(hdr);
383+
}
384+
385+
let ret = unsafe { libc::sendmmsg(sock.as_raw_fd(), vec_msg_hdr.as_mut_ptr(), vec_msg_hdr.len() as _, 0) };
386+
if ret < 0 {
387+
let err = io::Error::last_os_error();
388+
if let Some(libc::ENOSYS) = err.raw_os_error() {
389+
debug!("sendmmsg is not supported, fallback to sendmsg, error: {:?}", err);
390+
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);
391+
392+
sendmsg_fallback(sock, &mut msgs[0])?;
393+
return Ok(1);
394+
}
395+
return Err(err);
396+
}
397+
398+
for idx in 0..ret as usize {
399+
let msg = &mut msgs[idx];
400+
let hdr = &vec_msg_hdr[idx];
401+
msg.data_len = hdr.msg_len as usize;
402+
}
403+
404+
Ok(ret as usize)
405+
}

crates/shadowsocks/src/net/sys/unix/bsd/macos.rs

Lines changed: 173 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ use std::{
55
os::unix::io::{AsRawFd, RawFd},
66
pin::Pin,
77
ptr,
8+
sync::atomic::{AtomicBool, Ordering},
89
task::{self, Poll},
910
};
1011

11-
use log::{error, warn};
12+
use log::{debug, error, warn};
1213
use pin_project::pin_project;
13-
use socket2::{Domain, Protocol, Socket, Type};
14+
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
1415
use tokio::{
1516
io::{AsyncRead, AsyncWrite, ReadBuf},
1617
net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket},
@@ -19,6 +20,7 @@ use tokio_tfo::TfoStream;
1920

2021
use crate::net::{
2122
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
23+
udp::{BatchRecvMessage, BatchSendMessage},
2224
AddrFamily,
2325
ConnectOpts,
2426
};
@@ -273,3 +275,172 @@ pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) ->
273275

274276
Ok(socket)
275277
}
278+
279+
/// https://github.com/apple/darwin-xnu/blob/main/bsd/sys/socket.h
280+
#[repr(C)]
281+
struct msghdr_x {
282+
msg_name: *mut libc::c_void, //< optional address
283+
msg_namelen: libc::socklen_t, //< size of address
284+
msg_iov: *mut libc::iovec, //< scatter/gather array
285+
msg_iovlen: libc::c_int, //< # elements in msg_iov
286+
msg_control: *mut libc::c_void, //< ancillary data, see below
287+
msg_controllen: libc::socklen_t, //< ancillary data buffer len
288+
msg_flags: libc::c_int, //< flags on received message
289+
msg_datalen: libc::size_t, //< byte length of buffer in msg_iov
290+
}
291+
292+
extern "C" {
293+
fn recvmsg_x(s: libc::c_int, msgp: *const msghdr_x, cnt: libc::c_uint, flags: libc::c_int) -> libc::ssize_t;
294+
fn sendmsg_x(s: libc::c_int, msgp: *const msghdr_x, cnt: libc::c_uint, flags: libc::c_int) -> libc::ssize_t;
295+
}
296+
297+
static SUPPORT_BATCH_SEND_RECV_MSG: AtomicBool = AtomicBool::new(true);
298+
299+
fn recvmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchRecvMessage<'_>) -> io::Result<()> {
300+
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
301+
302+
let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
303+
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
304+
let sock_addr = unsafe { SockAddr::new(addr_storage, addr_len) };
305+
hdr.msg_name = sock_addr.as_ptr() as *mut _;
306+
hdr.msg_namelen = sock_addr.len() as _;
307+
308+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
309+
hdr.msg_iovlen = msg.data.len() as _;
310+
311+
let ret = unsafe { libc::recvmsg(sock.as_raw_fd(), &mut hdr as *mut _, 0) };
312+
if ret < 0 {
313+
return Err(io::Error::last_os_error());
314+
}
315+
316+
msg.addr = sock_addr.as_socket().expect("SockAddr.as_socket");
317+
msg.data_len = ret as usize;
318+
319+
Ok(())
320+
}
321+
322+
pub fn batch_recvmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchRecvMessage<'_>]) -> io::Result<usize> {
323+
if msgs.is_empty() {
324+
return Ok(0);
325+
}
326+
327+
if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
328+
recvmsg_fallback(sock, &mut msgs[0])?;
329+
return Ok(1);
330+
}
331+
332+
let mut vec_msg_name = Vec::with_capacity(msgs.len());
333+
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());
334+
335+
for msg in msgs.iter_mut() {
336+
let mut hdr: msghdr_x = unsafe { mem::zeroed() };
337+
338+
let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
339+
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
340+
341+
vec_msg_name.push(unsafe { SockAddr::new(addr_storage, addr_len) });
342+
let sock_addr = vec_msg_name.last_mut().unwrap();
343+
hdr.msg_name = sock_addr.as_ptr() as *mut _;
344+
hdr.msg_namelen = sock_addr.len() as _;
345+
346+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
347+
hdr.msg_iovlen = msg.data.len() as _;
348+
349+
vec_msg_hdr.push(hdr);
350+
}
351+
352+
let ret = unsafe { recvmsg_x(sock.as_raw_fd(), vec_msg_hdr.as_ptr(), vec_msg_hdr.len() as _, 0) };
353+
if ret < 0 {
354+
let err = io::Error::last_os_error();
355+
if let Some(libc::ENOSYS) = err.raw_os_error() {
356+
debug!("recvmsg_x is not supported, fallback to recvmsg, error: {:?}", err);
357+
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);
358+
359+
recvmsg_fallback(sock, &mut msgs[0])?;
360+
return Ok(1);
361+
}
362+
return Err(err);
363+
}
364+
365+
for idx in 0..ret as usize {
366+
let msg = &mut msgs[idx];
367+
let hdr = &vec_msg_hdr[idx];
368+
let name = &vec_msg_name[idx];
369+
msg.addr = name.as_socket().expect("SockAddr.as_socket");
370+
msg.data_len = hdr.msg_datalen as usize;
371+
}
372+
373+
Ok(ret as usize)
374+
}
375+
376+
fn sendmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchSendMessage<'_>) -> io::Result<()> {
377+
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };
378+
379+
let sock_addr = msg.addr.map(SockAddr::from);
380+
if let Some(ref sa) = sock_addr {
381+
hdr.msg_name = sa.as_ptr() as *mut _;
382+
hdr.msg_namelen = sa.len() as _;
383+
}
384+
385+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
386+
hdr.msg_iovlen = msg.data.len() as _;
387+
388+
let ret = unsafe { libc::sendmsg(sock.as_raw_fd(), &hdr as *const _, 0) };
389+
if ret < 0 {
390+
return Err(io::Error::last_os_error());
391+
}
392+
msg.data_len = ret as usize;
393+
394+
Ok(())
395+
}
396+
397+
pub fn batch_sendmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchSendMessage<'_>]) -> io::Result<usize> {
398+
if msgs.is_empty() {
399+
return Ok(0);
400+
}
401+
402+
if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
403+
sendmsg_fallback(sock, &mut msgs[0])?;
404+
return Ok(1);
405+
}
406+
407+
let mut vec_msg_name = Vec::with_capacity(msgs.len());
408+
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());
409+
410+
for msg in msgs.iter_mut() {
411+
let mut hdr: msghdr_x = unsafe { mem::zeroed() };
412+
413+
if let Some(addr) = msg.addr {
414+
vec_msg_name.push(SockAddr::from(addr));
415+
let sock_addr = vec_msg_name.last_mut().unwrap();
416+
hdr.msg_name = sock_addr.as_ptr() as *mut _;
417+
hdr.msg_namelen = sock_addr.len() as _;
418+
}
419+
420+
hdr.msg_iov = msg.data.as_ptr() as *mut _;
421+
hdr.msg_iovlen = msg.data.len() as _;
422+
423+
vec_msg_hdr.push(hdr);
424+
}
425+
426+
let ret = unsafe { sendmsg_x(sock.as_raw_fd(), vec_msg_hdr.as_ptr(), vec_msg_hdr.len() as _, 0) };
427+
if ret < 0 {
428+
let err = io::Error::last_os_error();
429+
if let Some(libc::ENOSYS) = err.raw_os_error() {
430+
debug!("sendmsg_x is not supported, fallback to sendmsg, error: {:?}", err);
431+
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);
432+
433+
sendmsg_fallback(sock, &mut msgs[0])?;
434+
return Ok(1);
435+
}
436+
return Err(err);
437+
}
438+
439+
for idx in 0..ret as usize {
440+
let msg = &mut msgs[idx];
441+
let hdr = &vec_msg_hdr[idx];
442+
msg.data_len = hdr.msg_datalen as usize;
443+
}
444+
445+
Ok(ret as usize)
446+
}

0 commit comments

Comments
 (0)