Skip to content

Commit 56024ab

Browse files
authored
Merge pull request #275 from AsakuraMizu/master
feat: add ancillary data support
2 parents 1aeeb52 + f9b286a commit 56024ab

File tree

13 files changed

+934
-11
lines changed

13 files changed

+934
-11
lines changed

compio-driver/src/iocp/op.rs

Lines changed: 148 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use std::{
1111
};
1212

1313
use aligned_array::{Aligned, A8};
14-
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
14+
use compio_buf::{
15+
BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
16+
};
1517
#[cfg(not(feature = "once_cell_try"))]
1618
use once_cell::sync::OnceCell as OnceLock;
1719
use socket2::SockAddr;
@@ -25,10 +27,11 @@ use windows_sys::{
2527
},
2628
Networking::WinSock::{
2729
closesocket, setsockopt, shutdown, socklen_t, WSAIoctl, WSARecv, WSARecvFrom, WSASend,
28-
WSASendTo, LPFN_ACCEPTEX, LPFN_CONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, SD_BOTH,
29-
SD_RECEIVE, SD_SEND, SIO_GET_EXTENSION_FUNCTION_POINTER, SOCKADDR, SOCKADDR_STORAGE,
30-
SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, WSAID_ACCEPTEX,
31-
WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS,
30+
WSASendMsg, WSASendTo, CMSGHDR, LPFN_ACCEPTEX, LPFN_CONNECTEX,
31+
LPFN_GETACCEPTEXSOCKADDRS, LPFN_WSARECVMSG, SD_BOTH, SD_RECEIVE, SD_SEND,
32+
SIO_GET_EXTENSION_FUNCTION_POINTER, SOCKADDR, SOCKADDR_STORAGE, SOL_SOCKET,
33+
SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, WSABUF, WSAID_ACCEPTEX,
34+
WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS, WSAID_WSARECVMSG, WSAMSG,
3235
},
3336
Storage::FileSystem::{FlushFileBuffers, ReadFile, WriteFile},
3437
System::{
@@ -774,6 +777,146 @@ impl<T: IoVectoredBuf, S: AsRawFd> OpCode for SendToVectored<T, S> {
774777
}
775778
}
776779

780+
static WSA_RECVMSG: OnceLock<LPFN_WSARECVMSG> = OnceLock::new();
781+
782+
/// Receive data and source address with ancillary data into vectored buffer.
783+
pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
784+
addr: SOCKADDR_STORAGE,
785+
addr_len: socklen_t,
786+
fd: SharedFd<S>,
787+
buffer: T,
788+
control: C,
789+
_p: PhantomPinned,
790+
}
791+
792+
impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
793+
/// Create [`RecvMsg`].
794+
///
795+
/// # Panics
796+
///
797+
/// This function will panic if the control message buffer is misaligned.
798+
pub fn new(fd: SharedFd<S>, buffer: T, control: C) -> Self {
799+
assert!(
800+
control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
801+
"misaligned control message buffer"
802+
);
803+
Self {
804+
addr: unsafe { std::mem::zeroed() },
805+
addr_len: std::mem::size_of::<SOCKADDR_STORAGE>() as _,
806+
fd,
807+
buffer,
808+
control,
809+
_p: PhantomPinned,
810+
}
811+
}
812+
}
813+
814+
impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
815+
type Inner = ((T, C), SOCKADDR_STORAGE, socklen_t);
816+
817+
fn into_inner(self) -> Self::Inner {
818+
((self.buffer, self.control), self.addr, self.addr_len)
819+
}
820+
}
821+
822+
impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> OpCode for RecvMsg<T, C, S> {
823+
unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
824+
let recvmsg_fn = WSA_RECVMSG
825+
.get_or_try_init(|| get_wsa_fn(self.fd.as_raw_fd(), WSAID_WSARECVMSG))?
826+
.ok_or_else(|| {
827+
io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve WSARecvMsg")
828+
})?;
829+
830+
let this = self.get_unchecked_mut();
831+
let mut slices = this.buffer.as_io_slices_mut();
832+
let mut msg = WSAMSG {
833+
name: &mut this.addr as *mut _ as _,
834+
namelen: this.addr_len,
835+
lpBuffers: slices.as_mut_ptr() as _,
836+
dwBufferCount: slices.len() as _,
837+
Control: std::mem::transmute::<IoSliceMut, WSABUF>(this.control.as_io_slice_mut()),
838+
dwFlags: 0,
839+
};
840+
841+
let mut received = 0;
842+
let res = recvmsg_fn(
843+
this.fd.as_raw_fd() as _,
844+
&mut msg,
845+
&mut received,
846+
optr,
847+
None,
848+
);
849+
winsock_result(res, received)
850+
}
851+
852+
unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
853+
cancel(self.fd.as_raw_fd(), optr)
854+
}
855+
}
856+
857+
/// Send data to specified address accompanied by ancillary data from vectored
858+
/// buffer.
859+
pub struct SendMsg<T: IoVectoredBuf, C: IoBuf, S> {
860+
fd: SharedFd<S>,
861+
buffer: T,
862+
control: C,
863+
addr: SockAddr,
864+
_p: PhantomPinned,
865+
}
866+
867+
impl<T: IoVectoredBuf, C: IoBuf, S> SendMsg<T, C, S> {
868+
/// Create [`SendMsg`].
869+
///
870+
/// # Panics
871+
///
872+
/// This function will panic if the control message buffer is misaligned.
873+
pub fn new(fd: SharedFd<S>, buffer: T, control: C, addr: SockAddr) -> Self {
874+
assert!(
875+
control.as_buf_ptr().cast::<CMSGHDR>().is_aligned(),
876+
"misaligned control message buffer"
877+
);
878+
Self {
879+
fd,
880+
buffer,
881+
control,
882+
addr,
883+
_p: PhantomPinned,
884+
}
885+
}
886+
}
887+
888+
impl<T: IoVectoredBuf, C: IoBuf, S> IntoInner for SendMsg<T, C, S> {
889+
type Inner = (T, C);
890+
891+
fn into_inner(self) -> Self::Inner {
892+
(self.buffer, self.control)
893+
}
894+
}
895+
896+
impl<T: IoVectoredBuf, C: IoBuf, S: AsRawFd> OpCode for SendMsg<T, C, S> {
897+
unsafe fn operate(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
898+
let this = self.get_unchecked_mut();
899+
900+
let slices = this.buffer.as_io_slices();
901+
let msg = WSAMSG {
902+
name: this.addr.as_ptr() as _,
903+
namelen: this.addr.len(),
904+
lpBuffers: slices.as_ptr() as _,
905+
dwBufferCount: slices.len() as _,
906+
Control: std::mem::transmute::<IoSlice, WSABUF>(this.control.as_io_slice()),
907+
dwFlags: 0,
908+
};
909+
910+
let mut sent = 0;
911+
let res = WSASendMsg(this.fd.as_raw_fd() as _, &msg, 0, &mut sent, optr, None);
912+
winsock_result(res, sent)
913+
}
914+
915+
unsafe fn cancel(self: Pin<&mut Self>, optr: *mut OVERLAPPED) -> io::Result<()> {
916+
cancel(self.fd.as_raw_fd(), optr)
917+
}
918+
}
919+
777920
/// Connect a named pipe server.
778921
pub struct ConnectNamedPipe<S> {
779922
pub(crate) fd: SharedFd<S>,

compio-driver/src/iour/op.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,26 @@ impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
556556
}
557557
}
558558

559+
impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> OpCode for RecvMsg<T, C, S> {
560+
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
561+
let this = unsafe { self.get_unchecked_mut() };
562+
unsafe { this.set_msg() };
563+
opcode::RecvMsg::new(Fd(this.fd.as_raw_fd()), &mut this.msg)
564+
.build()
565+
.into()
566+
}
567+
}
568+
569+
impl<T: IoVectoredBuf, C: IoBuf, S: AsRawFd> OpCode for SendMsg<T, C, S> {
570+
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
571+
let this = unsafe { self.get_unchecked_mut() };
572+
unsafe { this.set_msg() };
573+
opcode::SendMsg::new(Fd(this.fd.as_raw_fd()), &this.msg)
574+
.build()
575+
.into()
576+
}
577+
}
578+
559579
impl<S: AsRawFd> OpCode for PollOnce<S> {
560580
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
561581
let flags = match self.interest {

compio-driver/src/op.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use socket2::SockAddr;
1111
#[cfg(windows)]
1212
pub use crate::sys::op::ConnectNamedPipe;
1313
pub use crate::sys::op::{
14-
Accept, Recv, RecvFrom, RecvFromVectored, RecvVectored, Send, SendTo, SendToVectored,
15-
SendVectored,
14+
Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
15+
SendToVectored, SendVectored,
1616
};
1717
#[cfg(unix)]
1818
pub use crate::sys::op::{

compio-driver/src/poll/op.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,47 @@ impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
749749
}
750750
}
751751

752+
impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> RecvMsg<T, C, S> {
753+
unsafe fn call(&mut self) -> libc::ssize_t {
754+
libc::recvmsg(self.fd.as_raw_fd(), &mut self.msg, 0)
755+
}
756+
}
757+
758+
impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> OpCode for RecvMsg<T, C, S> {
759+
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
760+
let this = unsafe { self.get_unchecked_mut() };
761+
unsafe { this.set_msg() };
762+
syscall!(this.call(), wait_readable(this.fd.as_raw_fd()))
763+
}
764+
765+
fn on_event(self: Pin<&mut Self>, event: &Event) -> Poll<io::Result<usize>> {
766+
debug_assert!(event.readable);
767+
768+
let this = unsafe { self.get_unchecked_mut() };
769+
syscall!(break this.call())
770+
}
771+
}
772+
773+
impl<T: IoVectoredBuf, C: IoBuf, S: AsRawFd> SendMsg<T, C, S> {
774+
unsafe fn call(&self) -> libc::ssize_t {
775+
libc::sendmsg(self.fd.as_raw_fd(), &self.msg, 0)
776+
}
777+
}
778+
779+
impl<T: IoVectoredBuf, C: IoBuf, S: AsRawFd> OpCode for SendMsg<T, C, S> {
780+
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
781+
let this = unsafe { self.get_unchecked_mut() };
782+
unsafe { this.set_msg() };
783+
syscall!(this.call(), wait_writable(this.fd.as_raw_fd()))
784+
}
785+
786+
fn on_event(self: Pin<&mut Self>, event: &Event) -> Poll<io::Result<usize>> {
787+
debug_assert!(event.writable);
788+
789+
syscall!(break self.call())
790+
}
791+
}
792+
752793
impl<S: AsRawFd> OpCode for PollOnce<S> {
753794
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
754795
Ok(Decision::wait_for(self.fd.as_raw_fd(), self.interest))

compio-driver/src/unix/op.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,113 @@ impl<T: IoVectoredBuf, S> IntoInner for SendVectored<T, S> {
370370
}
371371
}
372372

373+
/// Receive data and source address with ancillary data into vectored buffer.
374+
pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
375+
pub(crate) msg: libc::msghdr,
376+
pub(crate) addr: sockaddr_storage,
377+
pub(crate) fd: SharedFd<S>,
378+
pub(crate) buffer: T,
379+
pub(crate) control: C,
380+
pub(crate) slices: Vec<IoSliceMut>,
381+
_p: PhantomPinned,
382+
}
383+
384+
impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
385+
/// Create [`RecvMsg`].
386+
///
387+
/// # Panics
388+
///
389+
/// This function will panic if the control message buffer is misaligned.
390+
pub fn new(fd: SharedFd<S>, buffer: T, control: C) -> Self {
391+
assert!(
392+
control.as_buf_ptr().cast::<libc::cmsghdr>().is_aligned(),
393+
"misaligned control message buffer"
394+
);
395+
Self {
396+
addr: unsafe { std::mem::zeroed() },
397+
msg: unsafe { std::mem::zeroed() },
398+
fd,
399+
buffer,
400+
control,
401+
slices: vec![],
402+
_p: PhantomPinned,
403+
}
404+
}
405+
406+
pub(crate) unsafe fn set_msg(&mut self) {
407+
self.slices = self.buffer.as_io_slices_mut();
408+
409+
self.msg.msg_name = std::ptr::addr_of_mut!(self.addr) as _;
410+
self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
411+
self.msg.msg_iov = self.slices.as_mut_ptr() as _;
412+
self.msg.msg_iovlen = self.slices.len() as _;
413+
self.msg.msg_control = self.control.as_buf_mut_ptr() as _;
414+
self.msg.msg_controllen = self.control.buf_len() as _;
415+
}
416+
}
417+
418+
impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
419+
type Inner = ((T, C), sockaddr_storage, socklen_t);
420+
421+
fn into_inner(self) -> Self::Inner {
422+
((self.buffer, self.control), self.addr, self.msg.msg_namelen)
423+
}
424+
}
425+
426+
/// Send data to specified address accompanied by ancillary data from vectored
427+
/// buffer.
428+
pub struct SendMsg<T: IoVectoredBuf, C: IoBuf, S> {
429+
pub(crate) msg: libc::msghdr,
430+
pub(crate) fd: SharedFd<S>,
431+
pub(crate) buffer: T,
432+
pub(crate) control: C,
433+
pub(crate) addr: SockAddr,
434+
pub(crate) slices: Vec<IoSlice>,
435+
_p: PhantomPinned,
436+
}
437+
438+
impl<T: IoVectoredBuf, C: IoBuf, S> SendMsg<T, C, S> {
439+
/// Create [`SendMsg`].
440+
///
441+
/// # Panics
442+
///
443+
/// This function will panic if the control message buffer is misaligned.
444+
pub fn new(fd: SharedFd<S>, buffer: T, control: C, addr: SockAddr) -> Self {
445+
assert!(
446+
control.as_buf_ptr().cast::<libc::cmsghdr>().is_aligned(),
447+
"misaligned control message buffer"
448+
);
449+
Self {
450+
msg: unsafe { std::mem::zeroed() },
451+
fd,
452+
buffer,
453+
control,
454+
addr,
455+
slices: vec![],
456+
_p: PhantomPinned,
457+
}
458+
}
459+
460+
pub(crate) unsafe fn set_msg(&mut self) {
461+
self.slices = self.buffer.as_io_slices();
462+
463+
self.msg.msg_name = self.addr.as_ptr() as _;
464+
self.msg.msg_namelen = self.addr.len();
465+
self.msg.msg_iov = self.slices.as_ptr() as _;
466+
self.msg.msg_iovlen = self.slices.len() as _;
467+
self.msg.msg_control = self.control.as_buf_ptr() as _;
468+
self.msg.msg_controllen = self.control.buf_len() as _;
469+
}
470+
}
471+
472+
impl<T: IoVectoredBuf, C: IoBuf, S> IntoInner for SendMsg<T, C, S> {
473+
type Inner = (T, C);
474+
475+
fn into_inner(self) -> Self::Inner {
476+
(self.buffer, self.control)
477+
}
478+
}
479+
373480
/// The interest to poll a file descriptor.
374481
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
375482
pub enum Interest {

0 commit comments

Comments
 (0)