Skip to content

Commit a26dbe5

Browse files
authored
Merge pull request #205 from Berrysoft:fix/return-unit
feat(io): return `()` in `read_exact` and `write_all`
2 parents 583da16 + 8f7ea0c commit a26dbe5

File tree

10 files changed

+104
-74
lines changed

10 files changed

+104
-74
lines changed

compio-fs/src/named_pipe.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -540,9 +540,8 @@ impl ServerOptions {
540540
///
541541
/// let (BufResult(write, _), BufResult(read, buf)) = futures_util::join!(write, read);
542542
/// write.unwrap();
543-
/// let read = read.unwrap();
543+
/// read.unwrap();
544544
///
545-
/// assert_eq!(read, 4);
546545
/// assert_eq!(&buf[..], b"ping");
547546
/// # })
548547
/// ```
@@ -646,11 +645,10 @@ impl ServerOptions {
646645
///
647646
/// let (BufResult(write, _), BufResult(read, buf)) = futures_util::join!(write, read);
648647
/// write.unwrap();
649-
/// let read = read.unwrap();
648+
/// read.unwrap();
650649
///
651650
/// println!("done reading and writing");
652651
///
653-
/// assert_eq!(read, 4);
654652
/// assert_eq!(&buf[..], b"ping");
655653
/// # })
656654
/// ```

compio-io/src/compat.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,14 @@ impl<S: crate::AsyncWrite> SyncStream<S> {
153153
/// Flush all data in the write buffer.
154154
pub async fn flush_write_buf(&mut self) -> io::Result<usize> {
155155
let stream = &mut self.stream;
156-
let len = self.write_buffer.with(|b| stream.write_all(b)).await?;
156+
let len = self
157+
.write_buffer
158+
.with(|w| async {
159+
let len = w.buf_len();
160+
let BufResult(res, w) = stream.write_all(w).await;
161+
BufResult(res.map(|()| len), w)
162+
})
163+
.await?;
157164
self.write_buffer.reset();
158165
stream.flush().await?;
159166
Ok(len)

compio-io/src/read/ext.rs

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ macro_rules! read_scalar {
1414
use ::compio_buf::{arrayvec::ArrayVec, BufResult};
1515

1616
const LEN: usize = ::std::mem::size_of::<$t>();
17-
let BufResult(len, buf) = self.read_exact(ArrayVec::<u8, LEN>::new()).await;
18-
assert_eq!(len?, LEN, "read_exact returned unexpected length");
17+
let BufResult(res, buf) = self.read_exact(ArrayVec::<u8, LEN>::new()).await;
18+
res?;
1919
// Safety: We just checked that the buffer is the correct size
2020
Ok($t::$be(unsafe { buf.into_inner_unchecked() }))
2121
}
@@ -25,8 +25,8 @@ macro_rules! read_scalar {
2525
use ::compio_buf::{arrayvec::ArrayVec, BufResult};
2626

2727
const LEN: usize = ::std::mem::size_of::<$t>();
28-
let BufResult(len, buf) = self.read_exact(ArrayVec::<u8, LEN>::new()).await;
29-
assert_eq!(len?, LEN, "read_exact returned unexpected length");
28+
let BufResult(res, buf) = self.read_exact(ArrayVec::<u8, LEN>::new()).await;
29+
res?;
3030
// Safety: We just checked that the buffer is the correct size
3131
Ok($t::$le(unsafe { buf.into_inner_unchecked() }))
3232
}
@@ -58,23 +58,41 @@ macro_rules! loop_read_exact {
5858
BufResult(Err(ref e), buf) if e.kind() == ::std::io::ErrorKind::Interrupted => {
5959
$buf = buf;
6060
}
61-
res => return res,
61+
BufResult(Err(e), buf) => return BufResult(Err(e), buf),
6262
}
6363
}
64-
return BufResult(Ok($tracker), $buf)
64+
return BufResult(Ok(()), $buf)
6565
};
6666
}
6767

6868
macro_rules! loop_read_vectored {
69-
(
70-
$buf:ident,
71-
$tracker:ident :
72-
$tracker_ty:ty,
73-
$iter:ident,loop
74-
$read_expr:expr
75-
) => {
76-
loop_read_vectored!($buf, len, $tracker: $tracker_ty, res, $iter, loop $read_expr, break None)
77-
};
69+
($buf:ident, $tracker:ident : $tracker_ty:ty, $iter:ident,loop $read_expr:expr) => {{
70+
let mut $iter = match $buf.owned_iter_mut() {
71+
Ok(buf) => buf,
72+
Err(buf) => return BufResult(Ok(()), buf),
73+
};
74+
let mut $tracker: $tracker_ty = 0;
75+
76+
loop {
77+
let len = $iter.buf_capacity();
78+
if len == 0 {
79+
continue;
80+
}
81+
82+
match $read_expr.await {
83+
BufResult(Ok(()), ret) => {
84+
$iter = ret;
85+
$tracker += len as $tracker_ty;
86+
}
87+
BufResult(Err(e), $iter) => return BufResult(Err(e), $iter.into_inner()),
88+
};
89+
90+
match $iter.next() {
91+
Ok(next) => $iter = next,
92+
Err(buf) => return BufResult(Ok(()), buf),
93+
}
94+
}
95+
}};
7896
(
7997
$buf:ident,
8098
$len:ident,
@@ -158,7 +176,7 @@ pub trait AsyncReadExt: AsyncRead {
158176
}
159177

160178
/// Read the exact number of bytes required to fill the buf.
161-
async fn read_exact<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<usize, T> {
179+
async fn read_exact<T: IoBufMut>(&mut self, mut buf: T) -> BufResult<(), T> {
162180
loop_read_exact!(buf, buf.buf_capacity(), read, loop self.read(buf.slice(read..)));
163181
}
164182

@@ -171,8 +189,8 @@ pub trait AsyncReadExt: AsyncRead {
171189
}
172190

173191
/// Read the exact number of bytes required to fill the vectored buf.
174-
async fn read_vectored_exact<T: IoVectoredBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
175-
loop_read_vectored!(buf, total: usize, iter, loop self.read_exact(iter))
192+
async fn read_vectored_exact<T: IoVectoredBufMut>(&mut self, buf: T) -> BufResult<(), T> {
193+
loop_read_vectored!(buf, _total: usize, iter, loop self.read_exact(iter))
176194
}
177195

178196
/// Creates an adaptor which reads at most `limit` bytes from it.
@@ -230,7 +248,7 @@ pub trait AsyncReadAtExt: AsyncReadAt {
230248
/// completely fill the buffer.
231249
///
232250
/// [`ErrorKind::UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
233-
async fn read_exact_at<T: IoBufMut>(&self, mut buf: T, pos: u64) -> BufResult<usize, T> {
251+
async fn read_exact_at<T: IoBufMut>(&self, mut buf: T, pos: u64) -> BufResult<(), T> {
234252
loop_read_exact!(
235253
buf,
236254
buf.buf_capacity(),
@@ -262,7 +280,7 @@ pub trait AsyncReadAtExt: AsyncReadAt {
262280
&self,
263281
buf: T,
264282
pos: u64,
265-
) -> BufResult<usize, T> {
283+
) -> BufResult<(), T> {
266284
loop_read_vectored!(buf, total: u64, iter, loop self.read_exact_at(iter, pos + total))
267285
}
268286
}

compio-io/src/util/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! IO related utilities functions for ease of use.
22
33
mod take;
4+
use compio_buf::{BufResult, IoBuf};
45
pub use take::Take;
56

67
mod null;
@@ -39,13 +40,13 @@ pub async fn copy<'a, R: AsyncRead, W: AsyncWrite>(
3940

4041
// When EOF is reached, we are terminating, so flush before that
4142
if read == 0 || buf.need_flush() {
42-
let written = buf.with(|w| writer.write_all(w)).await?;
43-
if written == 0 {
44-
return Err(std::io::Error::new(
45-
std::io::ErrorKind::WriteZero,
46-
"0 byte was written into the writer",
47-
));
48-
}
43+
let written = buf
44+
.with(|w| async {
45+
let len = w.buf_len();
46+
let BufResult(res, w) = writer.write_all(w).await;
47+
BufResult(res.map(|()| len), w)
48+
})
49+
.await?;
4950
total += written;
5051

5152
if buf.advance(written) {

compio-io/src/util/repeat.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,12 @@ impl AsyncBufRead for Repeat {
6060
/// # compio_runtime::Runtime::new().unwrap().block_on(async {
6161
/// use compio_io::{self, AsyncRead, AsyncReadExt};
6262
///
63-
/// let (len, buffer) = compio_io::repeat(42)
63+
/// let ((), buffer) = compio_io::repeat(42)
6464
/// .read_exact(Vec::with_capacity(3))
6565
/// .await
6666
/// .unwrap();
6767
///
6868
/// assert_eq!(buffer.as_slice(), [42, 42, 42]);
69-
/// assert_eq!(len, 3);
7069
/// # })
7170
/// ```
7271
pub fn repeat(byte: u8) -> Repeat {

compio-io/src/write/ext.rs

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,21 @@ macro_rules! write_scalar {
1111
use ::compio_buf::{arrayvec::ArrayVec, BufResult};
1212

1313
const LEN: usize = ::std::mem::size_of::<$t>();
14-
let BufResult(len, _) = self
14+
let BufResult(res, _) = self
1515
.write_all(ArrayVec::<u8, LEN>::from(num.$be()))
1616
.await;
17-
assert_eq!(len?, LEN, "`write_all` returned unexpected length");
18-
Ok(())
17+
res
1918
}
2019

2120
#[doc = concat!("Write a little endian `", stringify!($t), "` into the underlying writer.")]
2221
async fn [< write_ $t _le >](&mut self, num: $t) -> IoResult<()> {
2322
use ::compio_buf::{arrayvec::ArrayVec, BufResult};
2423

2524
const LEN: usize = ::std::mem::size_of::<$t>();
26-
let BufResult(len, _) = self
25+
let BufResult(res, _) = self
2726
.write_all(ArrayVec::<u8, LEN>::from(num.$le()))
2827
.await;
29-
assert_eq!(len?, LEN, "`write_all` returned unexpected length");
30-
Ok(())
28+
res
3129
}
3230
}
3331
};
@@ -57,24 +55,42 @@ macro_rules! loop_write_all {
5755
BufResult(Err(ref e), buf) if e.kind() == ::std::io::ErrorKind::Interrupted => {
5856
$buf = buf;
5957
}
60-
res => return res,
58+
BufResult(Err(e), buf) => return BufResult(Err(e), buf),
6159
}
6260
}
6361

64-
return BufResult(Ok($needle), $buf);
62+
return BufResult(Ok(()), $buf);
6563
};
6664
}
6765

6866
macro_rules! loop_write_vectored {
69-
(
70-
$buf:ident,
71-
$tracker:ident :
72-
$tracker_ty:ty,
73-
$iter:ident,loop
74-
$read_expr:expr
75-
) => {
76-
loop_write_vectored!($buf, $tracker: $tracker_ty, res, $iter, loop $read_expr, break None)
77-
};
67+
($buf:ident, $tracker:ident : $tracker_ty:ty, $iter:ident,loop $read_expr:expr) => {{
68+
let mut $iter = match $buf.owned_iter() {
69+
Ok(buf) => buf,
70+
Err(buf) => return BufResult(Ok(()), buf),
71+
};
72+
let mut $tracker: $tracker_ty = 0;
73+
74+
loop {
75+
let len = $iter.buf_len();
76+
if len == 0 {
77+
continue;
78+
}
79+
80+
match $read_expr.await {
81+
BufResult(Ok(()), ret) => {
82+
$iter = ret;
83+
$tracker += len as $tracker_ty;
84+
}
85+
BufResult(Err(e), $iter) => return BufResult(Err(e), $iter.into_inner()),
86+
};
87+
88+
match $iter.next() {
89+
Ok(next) => $iter = next,
90+
Err(buf) => return BufResult(Ok(()), buf),
91+
}
92+
}
93+
}};
7894
(
7995
$buf:ident,
8096
$tracker:ident :
@@ -130,7 +146,7 @@ pub trait AsyncWriteExt: AsyncWrite {
130146
}
131147

132148
/// Write the entire contents of a buffer into this writer.
133-
async fn write_all<T: IoBuf>(&mut self, mut buf: T) -> BufResult<usize, T> {
149+
async fn write_all<T: IoBuf>(&mut self, mut buf: T) -> BufResult<(), T> {
134150
loop_write_all!(
135151
buf,
136152
buf.buf_len(),
@@ -142,8 +158,8 @@ pub trait AsyncWriteExt: AsyncWrite {
142158
/// Write the entire contents of a buffer into this writer. Like
143159
/// [`AsyncWrite::write_vectored`], except that it tries to write the entire
144160
/// contents of the buffer into this writer.
145-
async fn write_vectored_all<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
146-
loop_write_vectored!(buf, total: usize, iter, loop self.write_all(iter))
161+
async fn write_vectored_all<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<(), T> {
162+
loop_write_vectored!(buf, _total: usize, iter, loop self.write_all(iter))
147163
}
148164

149165
write_scalar!(u8, to_be_bytes, to_le_bytes);
@@ -168,7 +184,7 @@ impl<A: AsyncWrite + ?Sized> AsyncWriteExt for A {}
168184
pub trait AsyncWriteAtExt: AsyncWriteAt {
169185
/// Like [`AsyncWriteAt::write_at`], except that it tries to write the
170186
/// entire contents of the buffer into this writer.
171-
async fn write_all_at<T: IoBuf>(&mut self, mut buf: T, pos: u64) -> BufResult<usize, T> {
187+
async fn write_all_at<T: IoBuf>(&mut self, mut buf: T, pos: u64) -> BufResult<(), T> {
172188
loop_write_all!(
173189
buf,
174190
buf.buf_len(),
@@ -183,7 +199,7 @@ pub trait AsyncWriteAtExt: AsyncWriteAt {
183199
&mut self,
184200
buf: T,
185201
pos: u64,
186-
) -> BufResult<usize, T> {
202+
) -> BufResult<(), T> {
187203
loop_write_vectored!(buf, total: u64, iter, loop self.write_all_at(iter, pos + total))
188204
}
189205
}

compio-io/tests/io.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,10 @@ impl AsyncReadAt for RepeatOne {
220220
async fn read_exact() {
221221
let mut src = RepeatOne(114);
222222

223-
let (len, buf) = src.read_exact(vec![0; 5]).await.unwrap();
224-
assert_eq!(len, 5);
223+
let ((), buf) = src.read_exact(vec![0; 5]).await.unwrap();
225224
assert_eq!(buf, [114; 5]);
226225

227-
let (len, buf) = src.read_exact_at(Vec::with_capacity(5), 0).await.unwrap();
228-
assert_eq!(len, 5);
226+
let ((), buf) = src.read_exact_at(Vec::with_capacity(5), 0).await.unwrap();
229227
assert_eq!(buf, [0, 114, 114, 114, 114]);
230228
}
231229

@@ -282,12 +280,10 @@ impl AsyncWriteAt for WriteOne {
282280
async fn write_all() {
283281
let mut dst = WriteOne(vec![]);
284282

285-
let (len, _) = dst.write_all([1, 1, 4, 5, 1, 4]).await.unwrap();
286-
assert_eq!(len, 6);
283+
let ((), _) = dst.write_all([1, 1, 4, 5, 1, 4]).await.unwrap();
287284
assert_eq!(dst.0, [1, 1, 4, 5, 1, 4]);
288285

289-
let (len, _) = dst.write_all_at([114, 114, 114], 2).await.unwrap();
290-
assert_eq!(len, 3);
286+
let ((), _) = dst.write_all_at([114, 114, 114], 2).await.unwrap();
291287
assert_eq!(dst.0, [1, 1, 114, 114, 114, 4]);
292288
}
293289

compio-net/tests/unix_stream.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@ async fn accept_read_write() -> std::io::Result<()> {
1414
let mut client = UnixStream::connect(&sock_path)?;
1515
let (mut server, _) = listener.accept().await?;
1616

17-
let write_len = client.write_all("hello").await.0?;
18-
assert_eq!(write_len, 5);
17+
client.write_all("hello").await.0?;
1918
drop(client);
2019

2120
let buf = Vec::with_capacity(5);
22-
let (res, buf) = server.read_exact(buf).await.unwrap();
23-
assert_eq!(res, 5);
21+
let ((), buf) = server.read_exact(buf).await.unwrap();
2422
assert_eq!(&buf[..], b"hello");
2523
let len = server.read(buf).await.0?;
2624
assert_eq!(len, 0);

compio/examples/net.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ async fn main() {
1616
tx.write_all("Hello world!").await.0.unwrap();
1717

1818
let buffer = Vec::with_capacity(12);
19-
let (n, buffer) = rx.read_exact(buffer).await.unwrap();
20-
assert_eq!(n, buffer.len());
19+
let ((), buffer) = rx.read_exact(buffer).await.unwrap();
2120
println!("{}", String::from_utf8(buffer).unwrap());
2221
}

compio/tests/runtime.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ async fn multi_threading() {
2828
let mut rx = rx.into_inner();
2929
compio::runtime::Runtime::new().unwrap().block_on(async {
3030
let buffer = Vec::with_capacity(DATA.len());
31-
let (n, buffer) = rx.read_exact(buffer).await.unwrap();
32-
assert_eq!(n, buffer.len());
31+
let ((), buffer) = rx.read_exact(buffer).await.unwrap();
3332
assert_eq!(DATA, String::from_utf8(buffer).unwrap());
3433
});
3534
})
@@ -57,8 +56,7 @@ async fn try_clone() {
5756
let mut rx = rx.into_inner();
5857
compio::runtime::Runtime::new().unwrap().block_on(async {
5958
let buffer = Vec::with_capacity(DATA.len());
60-
let (n, buffer) = rx.read_exact(buffer).await.unwrap();
61-
assert_eq!(n, buffer.len());
59+
let ((), buffer) = rx.read_exact(buffer).await.unwrap();
6260
assert_eq!(DATA, String::from_utf8(buffer).unwrap());
6361
});
6462
})

0 commit comments

Comments
 (0)