Skip to content

Commit 5074b6c

Browse files
authored
Merge pull request #210 from Berrysoft/fix/flush-error-safe
fix(io): make buffer flush error safe
2 parents 4cabfec + b06c312 commit 5074b6c

File tree

4 files changed

+55
-41
lines changed

4 files changed

+55
-41
lines changed

compio-io/src/buffer.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use std::{
77
future::Future,
88
};
99

10-
use compio_buf::{BufResult, IoBuf, IoBufMut, SetBufInit};
10+
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit, Slice};
1111

12-
use crate::util::MISSING_BUF;
12+
use crate::{util::MISSING_BUF, AsyncWrite, IoResult};
1313

1414
pub struct Inner {
1515
buf: Vec<u8>,
@@ -33,6 +33,12 @@ impl Inner {
3333
fn slice(&self) -> &[u8] {
3434
&self.buf[self.pos..]
3535
}
36+
37+
#[inline]
38+
pub(crate) fn into_slice(self) -> Slice<Self> {
39+
let pos = self.pos;
40+
IoBuf::slice(self, pos..)
41+
}
3642
}
3743

3844
impl IoBuf for Inner {
@@ -96,6 +102,7 @@ impl Buffer {
96102

97103
/// If the inner buffer is empty.
98104
#[inline]
105+
#[allow(unused)]
99106
pub fn is_empty(&self) -> bool {
100107
self.inner().as_slice().is_empty()
101108
}
@@ -115,6 +122,7 @@ impl Buffer {
115122
}
116123

117124
/// The buffer needs to be flushed
125+
#[inline]
118126
pub fn need_flush(&self) -> bool {
119127
// TODO: Better way to determine if we need to flush the buffer
120128
let buf = self.buf();
@@ -129,7 +137,7 @@ impl Buffer {
129137

130138
/// Execute a funcition with ownership of the buffer, and restore the buffer
131139
/// afterwards
132-
pub async fn with<R, Fut, F>(&mut self, func: F) -> std::io::Result<R>
140+
pub async fn with<R, Fut, F>(&mut self, func: F) -> IoResult<R>
133141
where
134142
Fut: Future<Output = BufResult<R, Inner>>,
135143
F: FnOnce(Inner) -> Fut,
@@ -150,6 +158,24 @@ impl Buffer {
150158
res
151159
}
152160

161+
/// Wrapper to flush the buffer to a writer with error safety.
162+
///
163+
/// https://github.com/compio-rs/compio/issues/209
164+
pub async fn flush_to(&mut self, writer: &mut impl AsyncWrite) -> IoResult<usize> {
165+
let mut total = 0;
166+
loop {
167+
let written = self
168+
.with(|inner| async { writer.write(inner.into_slice()).await.into_inner() })
169+
.await?;
170+
total += written;
171+
if self.advance(written) {
172+
break;
173+
}
174+
}
175+
self.reset();
176+
Ok(total)
177+
}
178+
153179
/// Mark some bytes as read by advancing the progress tracker, return a
154180
/// `bool` indicating if all bytes are read.
155181
#[inline]

compio-io/src/compat.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::io::{self, BufRead, Read, Write};
44

55
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit};
66

7-
use crate::{buffer::Buffer, util::DEFAULT_BUF_SIZE, AsyncWriteExt};
7+
use crate::{buffer::Buffer, util::DEFAULT_BUF_SIZE};
88

99
/// A wrapper for [`AsyncRead`](crate::AsyncRead) +
1010
/// [`AsyncWrite`](crate::AsyncWrite), providing sync traits impl. The sync
@@ -153,15 +153,7 @@ impl<S: crate::AsyncWrite> SyncStream<S> {
153153
/// Flush all data in the write buffer.
154154
pub async fn flush_write_buf(&mut self) -> io::Result<usize> {
155155
let stream = &mut self.stream;
156-
let len = self
157-
.write_buffer
158-
.with(|w| async {
159-
let len = w.buf_len();
160-
let BufResult(res, w) = stream.write_all(w).await;
161-
BufResult(res.map(|()| len), w)
162-
})
163-
.await?;
164-
self.write_buffer.reset();
156+
let len = self.write_buffer.flush_to(stream).await?;
165157
stream.flush().await?;
166158
Ok(len)
167159
}

compio-io/src/util/mod.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! IO related utilities functions for ease of use.
22
33
mod take;
4-
use compio_buf::{BufResult, IoBuf};
54
pub use take::Take;
65

76
mod null;
@@ -13,7 +12,7 @@ pub use repeat::{repeat, Repeat};
1312
mod internal;
1413
pub(crate) use internal::*;
1514

16-
use crate::{buffer::Buffer, AsyncRead, AsyncWrite, AsyncWriteExt, IoResult};
15+
use crate::{buffer::Buffer, AsyncRead, AsyncWrite, IoResult};
1716

1817
/// Asynchronously copies the entire contents of a reader into a writer.
1918
///
@@ -40,18 +39,8 @@ pub async fn copy<'a, R: AsyncRead, W: AsyncWrite>(
4039

4140
// When EOF is reached, we are terminating, so flush before that
4241
if read == 0 || buf.need_flush() {
43-
let written = buf
44-
.with(|w| async {
45-
let len = w.buf_len();
46-
let BufResult(res, w) = writer.write_all(w).await;
47-
BufResult(res.map(|()| len), w)
48-
})
49-
.await?;
42+
let written = buf.flush_to(writer).await?;
5043
total += written;
51-
52-
if buf.advance(written) {
53-
buf.reset()
54-
}
5544
}
5645

5746
if read == 0 {

compio-io/src/write/buf.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use std::future::ready;
22

3-
use compio_buf::{buf_try, BufResult, IntoInner, IoBuf};
3+
use compio_buf::{buf_try, BufResult, IntoInner, IoBuf, IoVectoredBuf};
44

55
use crate::{
66
buffer::Buffer,
77
util::{slice_to_buf, DEFAULT_BUF_SIZE},
8-
AsyncWrite, AsyncWriteExt, IoResult,
8+
AsyncWrite, IoResult,
99
};
1010

1111
/// Wraps a writer and buffers its output.
@@ -48,8 +48,21 @@ impl<W> BufWriter<W> {
4848
}
4949
}
5050

51+
impl<W: AsyncWrite> BufWriter<W> {
52+
async fn flush_if_needed(&mut self) -> IoResult<()> {
53+
if self.buf.need_flush() {
54+
self.flush().await?;
55+
}
56+
Ok(())
57+
}
58+
}
59+
5160
impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
52-
async fn write<T: IoBuf>(&mut self, mut buf: T) -> compio_buf::BufResult<usize, T> {
61+
async fn write<T: IoBuf>(&mut self, mut buf: T) -> BufResult<usize, T> {
62+
// The previous flush may error because disk full. We need to make the buffer
63+
// all-done before writing new data to it.
64+
(_, buf) = buf_try!(self.flush_if_needed().await, buf);
65+
5366
let written = self
5467
.buf
5568
.with_sync(|w| {
@@ -60,17 +73,14 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
6073
})
6174
.expect("Closure always return Ok");
6275

63-
if self.buf.need_flush() {
64-
(_, buf) = buf_try!(self.flush().await, buf);
65-
}
76+
(_, buf) = buf_try!(self.flush_if_needed().await, buf);
6677

6778
BufResult(Ok(written), buf)
6879
}
6980

70-
async fn write_vectored<T: compio_buf::IoVectoredBuf>(
71-
&mut self,
72-
mut buf: T,
73-
) -> compio_buf::BufResult<usize, T> {
81+
async fn write_vectored<T: IoVectoredBuf>(&mut self, mut buf: T) -> BufResult<usize, T> {
82+
(_, buf) = buf_try!(self.flush_if_needed().await, buf);
83+
7484
let written = self
7585
.buf
7686
.with(|mut w| {
@@ -90,18 +100,15 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
90100
.await
91101
.expect("Closure always return Ok");
92102

93-
if self.buf.need_flush() {
94-
(_, buf) = buf_try!(self.flush().await, buf);
95-
}
103+
(_, buf) = buf_try!(self.flush_if_needed().await, buf);
96104

97105
BufResult(Ok(written), buf)
98106
}
99107

100108
async fn flush(&mut self) -> IoResult<()> {
101109
let Self { writer, buf } = self;
102110

103-
buf.with(|w| writer.write_all(w)).await?;
104-
buf.reset();
111+
buf.flush_to(writer).await?;
105112

106113
Ok(())
107114
}

0 commit comments

Comments
 (0)