Skip to content

Commit aba5218

Browse files
stuqdograinliu
andauthored
Implement pion commit 2184 - mux: drop packets when buffer is full (#516)
* implement logic changes * add tests, switch to 2184 * remove superfluous loop * fix fmt --------- Co-authored-by: Rusty Rain <2069201+rainliu@users.noreply.github.com>
1 parent 7d09375 commit aba5218

File tree

2 files changed

+32
-20
lines changed

2 files changed

+32
-20
lines changed

webrtc/src/mux/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use util::{Buffer, Conn};
1414
use crate::error::Result;
1515
use crate::mux::endpoint::Endpoint;
1616
use crate::mux::mux_func::MatchFunc;
17+
use crate::util::Error;
1718

1819
/// mux multiplexes packets on a single socket (RFC7983)
1920
@@ -64,8 +65,6 @@ impl Mux {
6465

6566
let id = self.id.fetch_add(1, Ordering::SeqCst);
6667
// Set a maximum size of the buffer in bytes.
67-
// NOTE: We actually won't get anywhere close to this limit.
68-
// SRTP will constantly read from the endpoint and drop packets if it's full.
6968
let e = Arc::new(Endpoint {
7069
id,
7170
buffer: Buffer::new(0, MAX_BUFFER_SIZE),
@@ -135,7 +134,14 @@ impl Mux {
135134
}
136135

137136
if let Some(ep) = endpoint {
138-
ep.buffer.write(buf).await?;
137+
match ep.buffer.write(buf).await {
138+
// Expected when bytes are received faster than the endpoint can process them
139+
Err(Error::ErrBufferFull) => {
140+
log::info!("mux: endpoint buffer is full, dropping packet")
141+
}
142+
Ok(_) => (),
143+
Err(e) => return Err(crate::Error::Util(e)),
144+
}
139145
} else if !buf.is_empty() {
140146
log::warn!(
141147
"Warning: mux: no endpoint for packet starting with {}",

webrtc/src/mux/mux_test.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,10 @@ use async_trait::async_trait;
66
use util::conn::conn_pipe::pipe;
77

88
use super::*;
9-
use crate::mux::mux_func::match_all;
9+
use crate::mux::mux_func::{match_all, match_srtp};
1010

1111
const TEST_PIPE_BUFFER_SIZE: usize = 8192;
1212

13-
async fn pipe_memory() -> (Arc<Endpoint>, impl Conn) {
14-
// In memory pipe
15-
let (ca, cb) = pipe();
16-
17-
let mut m = Mux::new(Config {
18-
conn: Arc::new(ca),
19-
buffer_size: TEST_PIPE_BUFFER_SIZE,
20-
});
21-
22-
let e = m.new_endpoint(Box::new(match_all)).await;
23-
m.remove_endpoint(&e).await;
24-
let e = m.new_endpoint(Box::new(match_all)).await;
25-
26-
(e, cb)
27-
}
28-
2913
#[tokio::test]
3014
async fn test_no_endpoints() -> crate::error::Result<()> {
3115
// In memory pipe
@@ -129,3 +113,25 @@ async fn test_non_fatal_read() -> Result<()> {
129113

130114
Ok(())
131115
}
116+
117+
#[tokio::test]
118+
async fn test_non_fatal_dispatch() -> Result<()> {
119+
let (ca, cb) = pipe();
120+
121+
let mut m = Mux::new(Config {
122+
conn: Arc::new(ca),
123+
buffer_size: TEST_PIPE_BUFFER_SIZE,
124+
});
125+
126+
let e = m.new_endpoint(Box::new(match_srtp)).await;
127+
e.buffer.set_limit_size(1).await;
128+
129+
for _ in 0..25 {
130+
let srtp_packet = [128, 1, 2, 3, 4].to_vec();
131+
cb.send(&srtp_packet).await?;
132+
}
133+
134+
m.close().await;
135+
136+
Ok(())
137+
}

0 commit comments

Comments
 (0)