Skip to content

Commit 7220f44

Browse files
Don't generate empty TWCC feedback packets (#324)
* Don't generate empty TWCC feedback packets The previous version of this code would very often generate a feedback packet with no chunks. These packets would be ignored by libWebRTC which logs an error message saying: ``` Buffer too small (16 bytes) to fit a FeedbackPacket. Minimum size = 18 ``` When receiving such a packet. It also felt weird to clear the received packets when no chunks were generated for those packets.
1 parent 4df9c60 commit 7220f44

File tree

5 files changed

+52
-56
lines changed

5 files changed

+52
-56
lines changed

interceptor/src/twcc/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,11 @@ impl Recorder {
5656

5757
/// build_feedback_packet creates a new RTCP packet containing a TWCC feedback report.
5858
pub fn build_feedback_packet(&mut self) -> Vec<Box<dyn rtcp::packet::Packet + Send + Sync>> {
59-
let mut feedback = Feedback::new(self.sender_ssrc, self.media_ssrc, self.fb_pkt_cnt);
60-
self.fb_pkt_cnt = self.fb_pkt_cnt.wrapping_add(1);
6159
if self.received_packets.len() < 2 {
62-
self.received_packets.clear();
63-
return vec![Box::new(feedback.get_rtcp())];
60+
return vec![];
6461
}
62+
let mut feedback = Feedback::new(self.sender_ssrc, self.media_ssrc, self.fb_pkt_cnt);
63+
self.fb_pkt_cnt = self.fb_pkt_cnt.wrapping_add(1);
6564

6665
self.received_packets
6766
.sort_by(|a: &PktInfo, b: &PktInfo| -> Ordering {

interceptor/src/twcc/receiver/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::*;
88
use receiver_stream::ReceiverStream;
99

1010
use rtp::extension::transport_cc_extension::TransportCcExtension;
11-
use std::time::{Duration, SystemTime};
11+
use std::time::Duration;
1212
use tokio::sync::{mpsc, Mutex};
1313
use tokio::time::MissedTickBehavior;
1414
use util::Unmarshal;
@@ -44,7 +44,7 @@ impl InterceptorBuilder for ReceiverBuilder {
4444
streams: Mutex::new(HashMap::new()),
4545
close_rx: Mutex::new(Some(close_rx)),
4646
}),
47-
start_time: SystemTime::now(),
47+
start_time: tokio::time::Instant::now(),
4848
packet_chan_tx,
4949
wg: Mutex::new(Some(WaitGroup::new())),
5050
close_tx: Mutex::new(Some(close_tx)),
@@ -72,7 +72,8 @@ struct ReceiverInternal {
7272
pub struct Receiver {
7373
internal: Arc<ReceiverInternal>,
7474

75-
start_time: SystemTime,
75+
// we use tokio's Instant because it makes testing easier via `tokio::time::advance`.
76+
start_time: tokio::time::Instant,
7677
packet_chan_tx: mpsc::Sender<Packet>,
7778

7879
wg: Mutex<Option<WaitGroup>>,
@@ -132,6 +133,10 @@ impl Receiver {
132133
recorder.build_feedback_packet()
133134
};
134135

136+
if pkts.is_empty() {
137+
continue;
138+
}
139+
135140
if let Err(err) = rtcp_writer.write(&pkts, &a).await{
136141
log::error!("rtcp_writer.write got err: {}", err);
137142
}

interceptor/src/twcc/receiver/receiver_stream.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ pub(super) struct ReceiverStream {
55
hdr_ext_id: u8,
66
ssrc: u32,
77
packet_chan_tx: mpsc::Sender<Packet>,
8-
start_time: SystemTime,
8+
// we use tokio's Instant because it makes testing easier via `tokio::time::advance`.
9+
start_time: tokio::time::Instant,
910
}
1011

1112
impl ReceiverStream {
@@ -14,7 +15,7 @@ impl ReceiverStream {
1415
hdr_ext_id: u8,
1516
ssrc: u32,
1617
packet_chan_tx: mpsc::Sender<Packet>,
17-
start_time: SystemTime,
18+
start_time: tokio::time::Instant,
1819
) -> Self {
1920
ReceiverStream {
2021
parent_rtp_reader,
@@ -43,10 +44,8 @@ impl RTPReader for ReceiverStream {
4344
.send(Packet {
4445
hdr: p.header,
4546
sequence_number: tcc_ext.transport_sequence,
46-
arrival_time: SystemTime::now()
47-
.duration_since(self.start_time)
48-
.unwrap_or_else(|_| Duration::from_secs(0))
49-
.as_micros() as i64,
47+
arrival_time: (tokio::time::Instant::now() - self.start_time).as_micros()
48+
as i64,
5049
ssrc: self.ssrc,
5150
})
5251
.await;

interceptor/src/twcc/receiver/receiver_test.rs

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,13 @@ async fn test_twcc_receiver_interceptor_before_any_packets() -> Result<()> {
2626
)
2727
.await;
2828

29-
let pkts = stream.written_rtcp().await.unwrap();
30-
assert_eq!(pkts.len(), 1);
31-
if let Some(tlcc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
32-
assert_eq!(0, tlcc.packet_status_count);
33-
assert_eq!(0, tlcc.fb_pkt_count);
34-
assert_eq!(0, tlcc.base_sequence_number);
35-
assert_eq!(0, tlcc.media_ssrc);
36-
assert_eq!(0, tlcc.reference_time);
37-
assert_eq!(0, tlcc.recv_deltas.len());
38-
assert_eq!(0, tlcc.packet_chunks.len());
39-
} else {
40-
assert!(false);
29+
tokio::select! {
30+
pkts = stream.written_rtcp() => {
31+
assert!(pkts.map(|p| p.is_empty()).unwrap_or(true), "Should not have sent an RTCP packet before receiving the first RTP packets")
32+
}
33+
_ = tokio::time::sleep(Duration::from_millis(300)) => {
34+
// All good
35+
}
4136
}
4237

4338
stream.close().await?;
@@ -101,9 +96,7 @@ async fn test_twcc_receiver_interceptor_after_rtp_packets() -> Result<()> {
10196
Ok(())
10297
}
10398

104-
//TODO: remove this conditional test
105-
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
106-
#[tokio::test]
99+
#[tokio::test(start_paused = true)]
107100
async fn test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -> Result<()> {
108101
let builder = Receiver::builder().with_interval(Duration::from_millis(500));
109102
let icpr = builder.build("")?;
@@ -124,7 +117,7 @@ async fn test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -
124117

125118
let delays = vec![0, 10, 100, 200];
126119
for (i, d) in delays.iter().enumerate() {
127-
tokio::time::sleep(Duration::from_millis(*d)).await;
120+
tokio::time::advance(Duration::from_millis(*d)).await;
128121

129122
let mut hdr = rtp::header::Header::default();
130123
let tcc = TransportCcExtension {
@@ -139,13 +132,17 @@ async fn test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -
139132
..Default::default()
140133
})
141134
.await;
135+
136+
// Yield so this packet can be processed
137+
tokio::task::yield_now().await;
142138
}
143139

144-
// tick immediately, let's ignore the first rtcp pkt
145-
let _ = stream.written_rtcp().await.unwrap();
140+
// Force a packet to be generated
141+
tokio::time::advance(Duration::from_millis(2001)).await;
142+
tokio::task::yield_now().await;
146143

147-
// the second 500ms tick will works
148144
let pkts = stream.written_rtcp().await.unwrap();
145+
149146
assert_eq!(pkts.len(), 1);
150147
if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
151148
assert_eq!(0, cc.base_sequence_number);
@@ -171,9 +168,7 @@ async fn test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -
171168
Ok(())
172169
}
173170

174-
//TODO: remove this conditional test
175-
#[cfg(not(target_os = "macos"))]
176-
#[tokio::test]
171+
#[tokio::test(start_paused = true)]
177172
async fn test_twcc_receiver_interceptor_packet_loss() -> Result<()> {
178173
let builder = Receiver::builder().with_interval(Duration::from_secs(2));
179174
let icpr = builder.build("")?;
@@ -192,26 +187,21 @@ async fn test_twcc_receiver_interceptor_packet_loss() -> Result<()> {
192187
)
193188
.await;
194189

195-
let sequence_number_to_delay: HashMap<u16, u64> = [
190+
let sequence_number_to_delay = &[
196191
(0, 0),
197192
(1, 10),
198193
(4, 100),
199194
(8, 200),
200195
(9, 20),
201196
(10, 20),
202197
(30, 300),
203-
]
204-
.iter()
205-
.cloned()
206-
.collect();
207-
208-
for i in &[0, 1, 4, 8, 9, 10, 30] {
209-
let d = sequence_number_to_delay.get(i).unwrap();
210-
tokio::time::sleep(Duration::from_millis(*d)).await;
198+
];
211199

200+
for (i, d) in sequence_number_to_delay {
201+
tokio::time::advance(Duration::from_millis(*d)).await;
212202
let mut hdr = rtp::header::Header::default();
213203
let tcc = TransportCcExtension {
214-
transport_sequence: *i as u16,
204+
transport_sequence: *i,
215205
}
216206
.marshal()?;
217207
hdr.set_extension(1, tcc)?;
@@ -221,13 +211,17 @@ async fn test_twcc_receiver_interceptor_packet_loss() -> Result<()> {
221211
..Default::default()
222212
})
223213
.await;
214+
215+
// Yield so this packet can be processed
216+
tokio::task::yield_now().await;
224217
}
225218

226-
// tick immediately, let's ignore the first rtcp pkt
227-
let _ = stream.written_rtcp().await.unwrap();
219+
// Force a packet to be generated
220+
tokio::time::advance(Duration::from_millis(2001)).await;
221+
tokio::task::yield_now().await;
228222

229-
// the second 500ms tick will works
230223
let pkts = stream.written_rtcp().await.unwrap();
224+
231225
assert_eq!(pkts.len(), 1);
232226
if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
233227
assert_eq!(0, cc.base_sequence_number);

interceptor/src/twcc/twcc_test.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -494,19 +494,18 @@ fn test_build_feedback_packet_rolling() -> Result<()> {
494494
let mut r = Recorder::new(5000);
495495

496496
let mut arrival_time = SCALE_FACTOR_REFERENCE_TIME as i64;
497-
add_run(&mut r, &[0], &[arrival_time]);
497+
add_run(&mut r, &[3], &[arrival_time]);
498498

499499
let rtcp_packets = r.build_feedback_packet();
500-
assert_eq!(1, rtcp_packets.len()); // Empty TWCC
500+
assert_eq!(0, rtcp_packets.len());
501501

502502
add_run(
503503
&mut r,
504-
&[4, 8, 9, 10],
504+
&[4, 8, 9],
505505
&[
506506
increase_time(&mut arrival_time, TYPE_TCC_DELTA_SCALE_FACTOR),
507507
increase_time(&mut arrival_time, TYPE_TCC_DELTA_SCALE_FACTOR),
508508
increase_time(&mut arrival_time, TYPE_TCC_DELTA_SCALE_FACTOR),
509-
increase_time(&mut arrival_time, TYPE_TCC_DELTA_SCALE_FACTOR),
510509
],
511510
);
512511

@@ -516,27 +515,27 @@ fn test_build_feedback_packet_rolling() -> Result<()> {
516515
let expected = TransportLayerCc {
517516
sender_ssrc: 5000,
518517
media_ssrc: 5000,
519-
base_sequence_number: 4,
518+
base_sequence_number: 3,
520519
reference_time: 1,
521-
fb_pkt_count: 1,
520+
fb_pkt_count: 0,
522521
packet_status_count: 7,
523522
packet_chunks: vec![PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
524523
type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
525524
symbol_size: SymbolSizeTypeTcc::TwoBit,
526525
symbol_list: vec![
526+
SymbolTypeTcc::PacketReceivedSmallDelta,
527527
SymbolTypeTcc::PacketReceivedSmallDelta,
528528
SymbolTypeTcc::PacketNotReceived,
529529
SymbolTypeTcc::PacketNotReceived,
530530
SymbolTypeTcc::PacketNotReceived,
531531
SymbolTypeTcc::PacketReceivedSmallDelta,
532532
SymbolTypeTcc::PacketReceivedSmallDelta,
533-
SymbolTypeTcc::PacketReceivedSmallDelta,
534533
],
535534
})],
536535
recv_deltas: vec![
537536
RecvDelta {
538537
type_tcc_packet: SymbolTypeTcc::PacketReceivedSmallDelta,
539-
delta: TYPE_TCC_DELTA_SCALE_FACTOR,
538+
delta: 0,
540539
},
541540
RecvDelta {
542541
type_tcc_packet: SymbolTypeTcc::PacketReceivedSmallDelta,

0 commit comments

Comments
 (0)