Skip to content

Commit c5e3497

Browse files
committed
feat(net): add split and into_split for TCP & UNIX stream
1 parent 9c3c5b9 commit c5e3497

File tree

5 files changed

+147
-2
lines changed

5 files changed

+147
-2
lines changed

compio-io/src/split.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub fn split<T: AsyncRead + AsyncWrite>(stream: T) -> (ReadHalf<T>, WriteHalf<T>
1313
}
1414

1515
/// The readable half of a value returned from [`split`].
16+
#[derive(Debug)]
1617
pub struct ReadHalf<T>(Arc<Mutex<T>>);
1718

1819
impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
@@ -26,6 +27,7 @@ impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
2627
}
2728

2829
/// The writable half of a value returned from [`split`].
30+
#[derive(Debug)]
2931
pub struct WriteHalf<T>(Arc<Mutex<T>>);
3032

3133
impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {

compio-net/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77

88
mod resolve;
99
mod socket;
10+
pub(crate) mod split;
1011
mod tcp;
1112
mod udp;
1213
mod unix;
1314

1415
pub use resolve::ToSocketAddrsAsync;
1516
pub(crate) use resolve::{each_addr, first_addr_buf};
1617
pub(crate) use socket::*;
18+
pub use split::*;
1719
pub use tcp::*;
1820
pub use udp::*;
1921
pub use unix::*;

compio-net/src/split.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use std::{io, ops::Deref, sync::Arc};
2+
3+
use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
4+
use compio_io::{AsyncRead, AsyncWrite};
5+
6+
pub(crate) fn split<T>(stream: &T) -> (ReadHalf<T>, WriteHalf<T>)
7+
where
8+
for<'a> &'a T: AsyncRead + AsyncWrite,
9+
{
10+
(ReadHalf(stream), WriteHalf(stream))
11+
}
12+
13+
/// Borrowed read half.
14+
#[derive(Debug)]
15+
pub struct ReadHalf<'a, T>(&'a T);
16+
17+
impl<T> AsyncRead for ReadHalf<'_, T>
18+
where
19+
for<'a> &'a T: AsyncRead,
20+
{
21+
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
22+
self.0.read(buf).await
23+
}
24+
25+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
26+
self.0.read_vectored(buf).await
27+
}
28+
}
29+
30+
/// Borrowed write half.
31+
#[derive(Debug)]
32+
pub struct WriteHalf<'a, T>(&'a T);
33+
34+
impl<T> AsyncWrite for WriteHalf<'_, T>
35+
where
36+
for<'a> &'a T: AsyncWrite,
37+
{
38+
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
39+
self.0.write(buf).await
40+
}
41+
42+
async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
43+
self.0.write_vectored(buf).await
44+
}
45+
46+
async fn flush(&mut self) -> io::Result<()> {
47+
self.0.flush().await
48+
}
49+
50+
async fn shutdown(&mut self) -> io::Result<()> {
51+
self.0.shutdown().await
52+
}
53+
}
54+
55+
pub(crate) fn into_split<T>(stream: T) -> (OwnedReadHalf<T>, OwnedWriteHalf<T>)
56+
where
57+
for<'a> &'a T: AsyncRead + AsyncWrite,
58+
{
59+
let stream = Arc::new(stream);
60+
(OwnedReadHalf(stream.clone()), OwnedWriteHalf(stream))
61+
}
62+
63+
/// Owned read half.
64+
#[derive(Debug)]
65+
pub struct OwnedReadHalf<T>(Arc<T>);
66+
67+
impl<T> AsyncRead for OwnedReadHalf<T>
68+
where
69+
for<'a> &'a T: AsyncRead,
70+
{
71+
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
72+
self.0.deref().read(buf).await
73+
}
74+
75+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
76+
self.0.deref().read_vectored(buf).await
77+
}
78+
}
79+
80+
/// Owned write half.
81+
#[derive(Debug)]
82+
pub struct OwnedWriteHalf<T>(Arc<T>);
83+
84+
impl<T> AsyncWrite for OwnedWriteHalf<T>
85+
where
86+
for<'a> &'a T: AsyncWrite,
87+
{
88+
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
89+
self.0.deref().write(buf).await
90+
}
91+
92+
async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
93+
self.0.deref().write_vectored(buf).await
94+
}
95+
96+
async fn flush(&mut self) -> io::Result<()> {
97+
self.0.deref().flush().await
98+
}
99+
100+
async fn shutdown(&mut self) -> io::Result<()> {
101+
self.0.deref().shutdown().await
102+
}
103+
}

compio-net/src/tcp.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use compio_io::{AsyncRead, AsyncWrite};
55
use compio_runtime::{impl_attachable, impl_try_as_raw_fd};
66
use socket2::{Protocol, SockAddr, Type};
77

8-
use crate::{Socket, ToSocketAddrsAsync};
8+
use crate::{OwnedReadHalf, OwnedWriteHalf, ReadHalf, Socket, ToSocketAddrsAsync, WriteHalf};
99

1010
/// A TCP socket server, listening for connections.
1111
///
@@ -203,6 +203,25 @@ impl TcpStream {
203203
.local_addr()
204204
.map(|addr| addr.as_socket().expect("should be SocketAddr"))
205205
}
206+
207+
/// Splits a [`TcpStream`] into a read half and a write half, which can be
208+
/// used to read and write the stream concurrently.
209+
///
210+
/// This method is more efficient than
211+
/// [`into_split`](TcpStream::into_split), but the halves cannot
212+
/// be moved into independently spawned tasks.
213+
pub fn split(&self) -> (ReadHalf<Self>, WriteHalf<Self>) {
214+
crate::split(self)
215+
}
216+
217+
/// Splits a [`TcpStream`] into a read half and a write half, which can be
218+
/// used to read and write the stream concurrently.
219+
///
220+
/// Unlike [`split`](TcpStream::split), the owned halves can be moved to
221+
/// separate tasks, however this comes at the cost of a heap allocation.
222+
pub fn into_split(self) -> (OwnedReadHalf<Self>, OwnedWriteHalf<Self>) {
223+
crate::into_split(self)
224+
}
206225
}
207226

208227
impl AsyncRead for TcpStream {

compio-net/src/unix.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use compio_io::{AsyncRead, AsyncWrite};
55
use compio_runtime::{impl_attachable, impl_try_as_raw_fd};
66
use socket2::{Domain, SockAddr, Type};
77

8-
use crate::Socket;
8+
use crate::{OwnedReadHalf, OwnedWriteHalf, ReadHalf, Socket, WriteHalf};
99

1010
/// A Unix socket server, listening for connections.
1111
///
@@ -159,6 +159,25 @@ impl UnixStream {
159159
pub fn local_addr(&self) -> io::Result<SockAddr> {
160160
self.inner.local_addr()
161161
}
162+
163+
/// Splits a [`UnixStream`] into a read half and a write half, which can be
164+
/// used to read and write the stream concurrently.
165+
///
166+
/// This method is more efficient than
167+
/// [`into_split`](UnixStream::into_split), but the halves cannot
168+
/// be moved into independently spawned tasks.
169+
pub fn split(&self) -> (ReadHalf<Self>, WriteHalf<Self>) {
170+
crate::split(self)
171+
}
172+
173+
/// Splits a [`UnixStream`] into a read half and a write half, which can be
174+
/// used to read and write the stream concurrently.
175+
///
176+
/// Unlike [`split`](UnixStream::split), the owned halves can be moved to
177+
/// separate tasks, however this comes at the cost of a heap allocation.
178+
pub fn into_split(self) -> (OwnedReadHalf<Self>, OwnedWriteHalf<Self>) {
179+
crate::into_split(self)
180+
}
162181
}
163182

164183
impl AsyncRead for UnixStream {

0 commit comments

Comments
 (0)