Skip to content
This repository was archived by the owner on Feb 14, 2023. It is now read-only.

Commit 73cae48

Browse files
Pause of reading
1 parent 4c59a0f commit 73cae48

File tree

2 files changed

+143
-43
lines changed

2 files changed

+143
-43
lines changed

src/lib.rs

Lines changed: 85 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ use buffer::BufImpl;
171171

172172
pub mod policy;
173173

174-
use self::policy::{ReaderPolicy, WriterPolicy, StdPolicy, FlushOnNewline};
174+
use self::policy::{FlushOnNewline, ReaderPolicy, StdPolicy, WriterPolicy};
175175

176176
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
177177

@@ -198,11 +198,15 @@ const DEFAULT_BUF_SIZE: usize = 8 * 1024;
198198
/// [`new_ringbuf()`]: BufReader::new_ringbuf
199199
/// [`with_capacity_ringbuf()`]: BufReader::with_capacity_ringbuf
200200
/// [ringbufs-root]: index.html#ringbuffers--slice-deque-feature
201-
pub struct BufReader<R, P = StdPolicy>{
201+
pub struct BufReader<R, P = StdPolicy> {
202202
// First field for null pointer optimization.
203203
buf: Buffer,
204+
// We need field "empty" to return empty &[u8] in case of reader is paused
205+
// This field is never used, never filled.
206+
empty: Vec<u8>,
204207
inner: R,
205208
policy: P,
209+
paused: bool,
206210
}
207211

208212
impl<R> BufReader<R, StdPolicy> {
@@ -262,7 +266,11 @@ impl<R> BufReader<R, StdPolicy> {
262266
/// then it will be returned in `read()` and `fill_buf()` ahead of any data from `inner`.
263267
pub fn with_buffer(buf: Buffer, inner: R) -> Self {
264268
BufReader {
265-
buf, inner, policy: StdPolicy
269+
buf,
270+
empty: vec![],
271+
inner,
272+
policy: StdPolicy,
273+
paused: false,
266274
}
267275
}
268276
}
@@ -272,22 +280,26 @@ impl<R, P> BufReader<R, P> {
272280
pub fn set_policy<P_: ReaderPolicy>(self, policy: P_) -> BufReader<R, P_> {
273281
BufReader {
274282
inner: self.inner,
283+
empty: self.empty,
275284
buf: self.buf,
276-
policy
285+
policy,
286+
paused: self.paused,
277287
}
278288
}
279289

280290
/// Mutate the current [`ReaderPolicy`](policy::ReaderPolicy) in-place.
281291
///
282292
/// If you want to change the type, use `.set_policy()`.
283-
pub fn policy_mut(&mut self) -> &mut P { &mut self.policy }
293+
pub fn policy_mut(&mut self) -> &mut P {
294+
&mut self.policy
295+
}
284296

285297
/// Inspect the current `ReaderPolicy`.
286298
pub fn policy(&self) -> &P {
287299
&self.policy
288300
}
289301

290-
/// Move data to the start of the buffer, making room at the end for more
302+
/// Move data to the start of the buffer, making room at the end for more
291303
/// reading.
292304
///
293305
/// This is a no-op with the `*_ringbuf()` constructors (requires `slice-deque` feature).
@@ -321,14 +333,18 @@ impl<R, P> BufReader<R, P> {
321333
}
322334

323335
/// Get an immutable reference to the underlying reader.
324-
pub fn get_ref(&self) -> &R { &self.inner }
336+
pub fn get_ref(&self) -> &R {
337+
&self.inner
338+
}
325339

326340
/// Get a mutable reference to the underlying reader.
327341
///
328342
/// ## Note
329343
/// Reading directly from the underlying reader is not recommended, as some
330344
/// data has likely already been moved into the buffer.
331-
pub fn get_mut(&mut self) -> &mut R { &mut self.inner }
345+
pub fn get_mut(&mut self) -> &mut R {
346+
&mut self.inner
347+
}
332348

333349
/// Consume `self` and return the inner reader only.
334350
pub fn into_inner(self) -> R {
@@ -357,32 +373,45 @@ impl<R, P: ReaderPolicy> BufReader<R, P> {
357373
fn should_read(&mut self) -> bool {
358374
self.policy.before_read(&mut self.buf).0
359375
}
376+
377+
#[inline]
378+
fn is_paused(&mut self) -> bool {
379+
self.policy.is_paused()
380+
}
360381
}
361382

362383
impl<R: Read, P> BufReader<R, P> {
363384
/// Unconditionally perform a read into the buffer.
364385
///
365386
/// Does not invoke `ReaderPolicy` methods.
366-
///
387+
///
367388
/// If the read was successful, returns the number of bytes read.
368389
pub fn read_into_buf(&mut self) -> io::Result<usize> {
369390
self.buf.read_from(&mut self.inner)
370391
}
371392

372393
/// Box the inner reader without losing data.
373-
pub fn boxed<'a>(self) -> BufReader<Box<Read + 'a>, P> where R: 'a {
394+
pub fn boxed<'a>(self) -> BufReader<Box<Read + 'a>, P>
395+
where
396+
R: 'a,
397+
{
374398
let inner: Box<Read + 'a> = Box::new(self.inner);
375-
376399
BufReader {
377400
inner,
401+
empty: self.empty,
378402
buf: self.buf,
379403
policy: self.policy,
404+
paused: self.paused,
380405
}
381406
}
382407
}
383408

384409
impl<R: Read, P: ReaderPolicy> Read for BufReader<R, P> {
385410
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
411+
// If reading is paused, returning 0 to send end reading signal
412+
if self.is_paused() {
413+
return Ok(0);
414+
}
386415
// If we don't have any buffered data and we're doing a read matching
387416
// or exceeding the internal buffer's capacity, bypass the buffer.
388417
if self.buf.is_empty() && out.len() >= self.buf.capacity() {
@@ -397,12 +426,18 @@ impl<R: Read, P: ReaderPolicy> Read for BufReader<R, P> {
397426

398427
impl<R: Read, P: ReaderPolicy> BufRead for BufReader<R, P> {
399428
fn fill_buf(&mut self) -> io::Result<&[u8]> {
429+
// If reading is paused, we are sending empty buffer to send signal - "no data any more"
430+
if self.is_paused() {
431+
return Ok(&self.empty);
432+
}
400433
// If we've reached the end of our internal buffer then we need to fetch
401434
// some more data from the underlying reader.
402435
// This execution order is important; the policy may want to resize the buffer or move data
403436
// before reading into it.
404437
while self.should_read() && self.buf.usable_space() > 0 {
405-
if self.read_into_buf()? == 0 { break; };
438+
if self.read_into_buf()? == 0 {
439+
break;
440+
};
406441
}
407442

408443
Ok(self.buffer())
@@ -559,7 +594,10 @@ impl<W: Write> BufWriter<W> {
559594
/// it will be written out on the next flush!
560595
pub fn with_buffer(buf: Buffer, inner: W) -> BufWriter<W> {
561596
BufWriter {
562-
buf, inner, policy: StdPolicy, panicked: false,
597+
buf,
598+
inner,
599+
policy: StdPolicy,
600+
panicked: false,
563601
}
564602
}
565603
}
@@ -571,7 +609,10 @@ impl<W: Write, P> BufWriter<W, P> {
571609
let (inner, buf) = self.into_inner_();
572610

573611
BufWriter {
574-
inner, buf, policy, panicked
612+
inner,
613+
buf,
614+
policy,
615+
panicked,
575616
}
576617
}
577618

@@ -640,7 +681,9 @@ impl<W: Write, P> BufWriter<W, P> {
640681
}
641682

642683
fn flush_buf(&mut self, amt: usize) -> io::Result<()> {
643-
if amt == 0 || amt > self.buf.len() { return Ok(()) }
684+
if amt == 0 || amt > self.buf.len() {
685+
return Ok(());
686+
}
644687

645688
self.panicked = true;
646689
let ret = self.buf.write_max(amt, &mut self.inner);
@@ -714,7 +757,6 @@ impl<W: Write + fmt::Debug, P: fmt::Debug> fmt::Debug for BufWriter<W, P> {
714757
}
715758
}
716759

717-
718760
/// Attempt to flush the buffer to the underlying writer.
719761
///
720762
/// If an error occurs, the thread-local handler is invoked, if one was previously
@@ -725,9 +767,7 @@ impl<W: Write, P> Drop for BufWriter<W, P> {
725767
// instead of ignoring a failed flush, call the handler
726768
let buf_len = self.buf.len();
727769
if let Err(err) = self.flush_buf(buf_len) {
728-
DROP_ERR_HANDLER.with(|deh| {
729-
(*deh.borrow())(&mut self.inner, &mut self.buf, err)
730-
});
770+
DROP_ERR_HANDLER.with(|deh| (*deh.borrow())(&mut self.inner, &mut self.buf, err));
731771
}
732772
}
733773
}
@@ -805,7 +845,8 @@ impl<W: Write> LineWriter<W> {
805845
/// Flush the buffer and unwrap, returning the inner writer on success,
806846
/// or a type wrapping `self` plus the error otherwise.
807847
pub fn into_inner(self) -> Result<W, IntoInnerError<Self>> {
808-
self.0.into_inner()
848+
self.0
849+
.into_inner()
809850
.map_err(|IntoInnerError(inner, e)| IntoInnerError(LineWriter(inner), e))
810851
}
811852

@@ -816,7 +857,7 @@ impl<W: Write> LineWriter<W> {
816857
}
817858

818859
/// Consume `self` and return both the underlying writer and the buffer.
819-
pub fn into_inner_with_buf(self) -> (W, Buffer){
860+
pub fn into_inner_with_buf(self) -> (W, Buffer) {
820861
self.0.into_inner_with_buffer()
821862
}
822863
}
@@ -1015,12 +1056,16 @@ impl Buffer {
10151056
/// Get an immutable slice of the available bytes in this buffer.
10161057
///
10171058
/// Call `.consume()` to remove bytes from the beginning of this slice.
1018-
pub fn buf(&self) -> &[u8] { self.buf.buf() }
1059+
pub fn buf(&self) -> &[u8] {
1060+
self.buf.buf()
1061+
}
10191062

10201063
/// Get a mutable slice representing the available bytes in this buffer.
10211064
///
10221065
/// Call `.consume()` to remove bytes from the beginning of this slice.
1023-
pub fn buf_mut(&mut self) -> &mut [u8] { self.buf.buf_mut() }
1066+
pub fn buf_mut(&mut self) -> &mut [u8] {
1067+
self.buf.buf_mut()
1068+
}
10241069

10251070
/// Read from `rdr`, returning the number of bytes read or any errors.
10261071
///
@@ -1105,8 +1150,12 @@ impl Buffer {
11051150
while self.len() > 0 && max > 0 {
11061151
let len = cmp::min(self.len(), max);
11071152
let n = match wrt.write(&self.buf()[..len]) {
1108-
Ok(0) => return Err(io::Error::new(io::ErrorKind::WriteZero,
1109-
"Buffer::write_all() got zero-sized write")),
1153+
Ok(0) => {
1154+
return Err(io::Error::new(
1155+
io::ErrorKind::WriteZero,
1156+
"Buffer::write_all() got zero-sized write",
1157+
))
1158+
}
11101159
Ok(n) => n,
11111160
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
11121161
Err(e) => return Err(e),
@@ -1127,8 +1176,12 @@ impl Buffer {
11271176
pub fn write_all<W: Write + ?Sized>(&mut self, wrt: &mut W) -> io::Result<()> {
11281177
while self.len() > 0 {
11291178
match self.write_to(wrt) {
1130-
Ok(0) => return Err(io::Error::new(io::ErrorKind::WriteZero,
1131-
"Buffer::write_all() got zero-sized write")),
1179+
Ok(0) => {
1180+
return Err(io::Error::new(
1181+
io::ErrorKind::WriteZero,
1182+
"Buffer::write_all() got zero-sized write",
1183+
))
1184+
}
11321185
Ok(_) => (),
11331186
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (),
11341187
Err(e) => return Err(e),
@@ -1259,7 +1312,9 @@ pub fn copy_buf<B: BufRead, W: Write>(b: &mut B, w: &mut W) -> io::Result<u64> {
12591312
Ok(buf) => buf,
12601313
};
12611314

1262-
if copied == 0 { break; }
1315+
if copied == 0 {
1316+
break;
1317+
}
12631318

12641319
b.consume(copied);
12651320

@@ -1283,7 +1338,8 @@ thread_local!(
12831338
/// ### Panics
12841339
/// If called from within a handler previously provided to this function.
12851340
pub fn set_drop_err_handler<F: 'static>(handler: F)
1286-
where F: Fn(&mut Write, &mut Buffer, io::Error)
1341+
where
1342+
F: Fn(&mut Write, &mut Buffer, io::Error),
12871343
{
12881344
DROP_ERR_HANDLER.with(|deh| *deh.borrow_mut() = Box::new(handler))
12891345
}

0 commit comments

Comments
 (0)