Skip to content

Commit 137f11f

Browse files
authored
[PollStream/PollDataChannel] do not loose bytes (#341)
Returning Poll::Ready(Ok(n)) when write_fut is Some is incorrect since buf already contains new data and we should instead always return Poll::Ready(Ok(buf.len())) and update write_fut. In the future, if/when we make write synchronous, this whole block could be simplified to just contain a single call (1 line) to write. See #344
1 parent d50269a commit 137f11f

File tree

4 files changed

+79
-59
lines changed

4 files changed

+79
-59
lines changed

data/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Unreleased
44

55
* Increased minimum support rust version to `1.60.0`.
6+
* Do not loose data in `PollDataChannel::poll_write` [#341](https://github.com/webrtc-rs/webrtc/pull/341).
67
* `PollDataChannel::poll_shutdown`: make sure to flush any writes before shutting down [#340](https://github.com/webrtc-rs/webrtc/pull/340)
78

89
## 0.5.0

data/src/data_channel/mod.rs

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -566,44 +566,52 @@ impl AsyncWrite for PollDataChannel {
566566
return Poll::Ready(Ok(0));
567567
}
568568

569-
let (fut, fut_is_new) = match self.write_fut.as_mut() {
570-
Some(fut) => (fut, false),
571-
None => {
572-
let data_channel = self.data_channel.clone();
573-
let bytes = Bytes::copy_from_slice(buf);
574-
(
575-
self.write_fut
576-
.get_or_insert(Box::pin(async move { data_channel.write(&bytes).await })),
577-
true,
578-
)
569+
if let Some(fut) = self.write_fut.as_mut() {
570+
match fut.as_mut().poll(cx) {
571+
Poll::Pending => Poll::Pending,
572+
Poll::Ready(Err(e)) => {
573+
let data_channel = self.data_channel.clone();
574+
let bytes = Bytes::copy_from_slice(buf);
575+
self.write_fut =
576+
Some(Box::pin(async move { data_channel.write(&bytes).await }));
577+
Poll::Ready(Err(e.into()))
578+
}
579+
// Given the data is buffered, it's okay to ignore the number of written bytes.
580+
//
581+
// TODO: In the long term, `data_channel.write` should be made sync. Then we could
582+
// remove the whole `if` condition and just call `data_channel.write`.
583+
Poll::Ready(Ok(_)) => {
584+
let data_channel = self.data_channel.clone();
585+
let bytes = Bytes::copy_from_slice(buf);
586+
self.write_fut =
587+
Some(Box::pin(async move { data_channel.write(&bytes).await }));
588+
Poll::Ready(Ok(buf.len()))
589+
}
579590
}
580-
};
591+
} else {
592+
let data_channel = self.data_channel.clone();
593+
let bytes = Bytes::copy_from_slice(buf);
594+
let fut = self
595+
.write_fut
596+
.insert(Box::pin(async move { data_channel.write(&bytes).await }));
581597

582-
match fut.as_mut().poll(cx) {
583-
Poll::Pending => {
598+
match fut.as_mut().poll(cx) {
584599
// If it's the first time we're polling the future, `Poll::Pending` can't be
585-
// returned because that would mean the `PollStream` is not ready for writing. And
586-
// this is not true since we've just created a future, which is going to write the
587-
// buf to the underlying stream.
600+
// returned because that would mean the `PollDataChannel` is not ready for writing.
601+
// And this is not true since we've just created a future, which is going to write
602+
// the buf to the underlying stream.
588603
//
589604
// It's okay to return `Poll::Ready` if the data is buffered (this is what the
590605
// buffered writer and `File` do).
591-
if fut_is_new {
592-
Poll::Ready(Ok(buf.len()))
593-
} else {
594-
// If it's the subsequent poll, it's okay to return `Poll::Pending` as it
595-
// indicates that the `PollStream` is not ready for writing. Only one future
596-
// can be in progress at the time.
597-
Poll::Pending
606+
Poll::Pending => Poll::Ready(Ok(buf.len())),
607+
Poll::Ready(Err(e)) => {
608+
self.write_fut = None;
609+
Poll::Ready(Err(e.into()))
610+
}
611+
Poll::Ready(Ok(n)) => {
612+
self.write_fut = None;
613+
Poll::Ready(Ok(n))
598614
}
599-
}
600-
Poll::Ready(Err(e)) => {
601-
self.write_fut = None;
602-
Poll::Ready(Err(e.into()))
603-
}
604-
Poll::Ready(Ok(n)) => {
605-
self.write_fut = None;
606-
Poll::Ready(Ok(n))
607615
}
608616
}
609617
}

sctp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Unreleased
44

55
* Increased minimum support rust version to `1.60.0`.
6+
* Do not loose data in `PollStream::poll_write` [#341](https://github.com/webrtc-rs/webrtc/pull/341).
67
* `PollStream::poll_shutdown`: make sure to flush any writes before shutting down [#340](https://github.com/webrtc-rs/webrtc/pull/340)
78

89
## v0.6.1

sctp/src/stream/mod.rs

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -703,44 +703,54 @@ impl AsyncWrite for PollStream {
703703
cx: &mut Context<'_>,
704704
buf: &[u8],
705705
) -> Poll<io::Result<usize>> {
706-
let (fut, fut_is_new) = match self.write_fut.as_mut() {
707-
Some(fut) => (fut, false),
708-
None => {
709-
let stream = self.stream.clone();
710-
let bytes = Bytes::copy_from_slice(buf);
711-
(
712-
self.write_fut
713-
.get_or_insert(Box::pin(async move { stream.write(&bytes).await })),
714-
true,
715-
)
706+
if buf.is_empty() {
707+
return Poll::Ready(Ok(0));
708+
}
709+
710+
if let Some(fut) = self.write_fut.as_mut() {
711+
match fut.as_mut().poll(cx) {
712+
Poll::Pending => Poll::Pending,
713+
Poll::Ready(Err(e)) => {
714+
let stream = self.stream.clone();
715+
let bytes = Bytes::copy_from_slice(buf);
716+
self.write_fut = Some(Box::pin(async move { stream.write(&bytes).await }));
717+
Poll::Ready(Err(e.into()))
718+
}
719+
// Given the data is buffered, it's okay to ignore the number of written bytes.
720+
//
721+
// TODO: In the long term, `stream.write` should be made sync. Then we could
722+
// remove the whole `if` condition and just call `stream.write`.
723+
Poll::Ready(Ok(_)) => {
724+
let stream = self.stream.clone();
725+
let bytes = Bytes::copy_from_slice(buf);
726+
self.write_fut = Some(Box::pin(async move { stream.write(&bytes).await }));
727+
Poll::Ready(Ok(buf.len()))
728+
}
716729
}
717-
};
730+
} else {
731+
let stream = self.stream.clone();
732+
let bytes = Bytes::copy_from_slice(buf);
733+
let fut = self
734+
.write_fut
735+
.insert(Box::pin(async move { stream.write(&bytes).await }));
718736

719-
match fut.as_mut().poll(cx) {
720-
Poll::Pending => {
737+
match fut.as_mut().poll(cx) {
721738
// If it's the first time we're polling the future, `Poll::Pending` can't be
722739
// returned because that would mean the `PollStream` is not ready for writing. And
723740
// this is not true since we've just created a future, which is going to write the
724741
// buf to the underlying stream.
725742
//
726743
// It's okay to return `Poll::Ready` if the data is buffered (this is what the
727744
// buffered writer and `File` do).
728-
if fut_is_new {
729-
Poll::Ready(Ok(buf.len()))
730-
} else {
731-
// If it's the subsequent poll, it's okay to return `Poll::Pending` as it
732-
// indicates that the `PollStream` is not ready for writing. Only one future
733-
// can be in progress at the time.
734-
Poll::Pending
745+
Poll::Pending => Poll::Ready(Ok(buf.len())),
746+
Poll::Ready(Err(e)) => {
747+
self.write_fut = None;
748+
Poll::Ready(Err(e.into()))
749+
}
750+
Poll::Ready(Ok(n)) => {
751+
self.write_fut = None;
752+
Poll::Ready(Ok(n))
735753
}
736-
}
737-
Poll::Ready(Err(e)) => {
738-
self.write_fut = None;
739-
Poll::Ready(Err(e.into()))
740-
}
741-
Poll::Ready(Ok(n)) => {
742-
self.write_fut = None;
743-
Poll::Ready(Ok(n))
744754
}
745755
}
746756
}

0 commit comments

Comments
 (0)