From c6d95318776458fc22215d6b53b188c94bc5efad Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Thu, 27 Feb 2025 14:03:34 +0100 Subject: [PATCH] encoder: ensure that flush is finished Once a flush has been started some encoders (for example xz2) require that the flush is completed before writing more data. --- src/tokio/write/generic/encoder.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/tokio/write/generic/encoder.rs b/src/tokio/write/generic/encoder.rs index f5a83aa..09d8672 100644 --- a/src/tokio/write/generic/encoder.rs +++ b/src/tokio/write/generic/encoder.rs @@ -16,6 +16,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; #[derive(Debug)] enum State { Encoding, + Flushing, Finishing, Done, } @@ -80,6 +81,12 @@ impl Encoder { State::Encoding } + // Once a flush has been started, it must be completed. + State::Flushing => match this.encoder.flush(&mut output)? { + true => State::Encoding, + false => State::Flushing, + }, + State::Finishing | State::Done => { return Poll::Ready(Err(io::Error::new( io::ErrorKind::Other, @@ -105,7 +112,7 @@ impl Encoder { let mut output = PartialBuffer::new(output); let done = match this.state { - State::Encoding => this.encoder.flush(&mut output)?, + State::Encoding | State::Flushing => this.encoder.flush(&mut output)?, State::Finishing | State::Done => { return Poll::Ready(Err(io::Error::new( @@ -114,11 +121,13 @@ impl Encoder { ))) } }; + *this.state = State::Flushing; let produced = output.written().len(); this.writer.as_mut().produce(produced); if done { + *this.state = State::Encoding; return Poll::Ready(Ok(())); } } @@ -140,6 +149,12 @@ impl Encoder { } } + // Once a flush has been started, it must be completed. + State::Flushing => match this.encoder.flush(&mut output)? { + true => State::Finishing, + false => State::Flushing, + }, + State::Done => State::Done, };