Skip to content

Commit 7061eb4

Browse files
Handle empty RTP packets in Sample Builder (#303)
libWebRTC, among others, will sometime send padding RTP packets to keep send rates steady. When they do the packets are sent after a complete sequence of RTP packets that form a Sample/GOP/Frame. Example: * P0, Timestamp=1, Start * P1, T=1 * P2, T=1, End(RTP Marker Bit set) * P3, T=1, Padding * P4, T=1, Padding * P5, T=2, Start * P6, T=2 End Here, when building a sample using the packets P5 and P6 we'll have `prev_dropped_packets = 2`, however these dropped packets are inconsequential as they don't carry media. The stream of samples is unbroken, despite having dropped some packets. With the introduction of `prev_padding_packets` it's possible to distinguish between dropped packets that are likely to break the sample stream and these unimportant padding packets.
1 parent c9409ba commit 7061eb4

File tree

5 files changed

+244
-2
lines changed

5 files changed

+244
-2
lines changed

media/src/io/sample_builder/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub struct SampleBuilder<T: Depacketizer> {
2222
max_late_timestamp: u32,
2323
buffer: Vec<Option<Packet>>,
2424
prepared_samples: Vec<Option<Sample>>,
25+
last_sample_timestamp: Option<u32>,
2526

2627
/// Interface that allows us to take RTP packets to samples
2728
depacketizer: T,
@@ -40,6 +41,10 @@ pub struct SampleBuilder<T: Depacketizer> {
4041

4142
/// number of packets forced to be dropped
4243
dropped_packets: u16,
44+
45+
/// number of padding packets detected and dropped. This number will be a subset of
46+
/// `droppped_packets`
47+
padding_packets: u16,
4348
}
4449

4550
impl<T: Depacketizer> SampleBuilder<T> {
@@ -55,12 +60,14 @@ impl<T: Depacketizer> SampleBuilder<T> {
5560
max_late_timestamp: 0,
5661
buffer: vec![None; u16::MAX as usize + 1],
5762
prepared_samples: (0..=u16::MAX as usize).map(|_| None).collect(),
63+
last_sample_timestamp: None,
5864
depacketizer,
5965
sample_rate,
6066
filled: SampleSequenceLocation::new(),
6167
active: SampleSequenceLocation::new(),
6268
prepared: SampleSequenceLocation::new(),
6369
dropped_packets: 0,
70+
padding_packets: 0,
6471
}
6572
}
6673

@@ -276,7 +283,19 @@ impl<T: Depacketizer> SampleBuilder<T> {
276283
.depacketizer
277284
.is_partition_head(&self.buffer[consume.head as usize].as_ref()?.payload)
278285
{
286+
// libWebRTC will sometimes send several empty padding packets to smooth out send
287+
// rate. These packets don't carry any media payloads.
288+
let is_padding = consume.range(&self.buffer).all(|p| {
289+
p.map(|p| {
290+
self.last_sample_timestamp == Some(p.header.timestamp) && p.payload.is_empty()
291+
})
292+
.unwrap_or(false)
293+
});
294+
279295
self.dropped_packets += consume.count();
296+
if is_padding {
297+
self.padding_packets += consume.count();
298+
}
280299
self.purge_consumed_location(&consume, true);
281300
self.purge_consumed_buffers();
282301
return None;
@@ -301,9 +320,12 @@ impl<T: Depacketizer> SampleBuilder<T> {
301320
duration: Duration::from_secs_f64((samples as f64) / (self.sample_rate as f64)),
302321
packet_timestamp: sample_timestamp,
303322
prev_dropped_packets: self.dropped_packets,
323+
prev_padding_packets: self.padding_packets,
304324
};
305325

306326
self.dropped_packets = 0;
327+
self.padding_packets = 0;
328+
self.last_sample_timestamp = Some(sample_timestamp);
307329

308330
self.prepared_samples[self.prepared.tail as usize] = Some(sample);
309331
self.prepared.tail = self.prepared.tail.wrapping_add(1);

media/src/io/sample_builder/sample_builder_test.rs

Lines changed: 121 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub struct SampleBuilderTest {
1818
samples: Vec<Sample>,
1919
max_late: u16,
2020
max_late_timestamp: Duration,
21+
extra_pop_attempts: usize,
2122
}
2223

2324
pub struct FakeDepacketizer {
@@ -821,6 +822,119 @@ pub fn test_sample_builder() {
821822
max_late_timestamp: Duration::from_secs(2000),
822823
..Default::default()
823824
},
825+
SampleBuilderTest {
826+
#[rustfmt::skip]
827+
message: "Sample builder should recognise padding packets".into(),
828+
packets: vec![
829+
Packet {
830+
// First packet
831+
header: Header {
832+
sequence_number: 5000,
833+
timestamp: 1,
834+
..Default::default()
835+
},
836+
payload: bytes!(1),
837+
..Default::default()
838+
},
839+
Packet {
840+
// Second packet
841+
header: Header {
842+
sequence_number: 5001,
843+
timestamp: 1,
844+
..Default::default()
845+
},
846+
payload: bytes!(2),
847+
..Default::default()
848+
},
849+
Packet {
850+
// Third packet
851+
header: Header {
852+
sequence_number: 5002,
853+
timestamp: 1,
854+
marker: true,
855+
..Default::default()
856+
},
857+
payload: bytes!(3),
858+
..Default::default()
859+
},
860+
Packet {
861+
// Padding packet 1
862+
header: Header {
863+
sequence_number: 5003,
864+
timestamp: 1,
865+
..Default::default()
866+
},
867+
payload: Bytes::from_static(&[]),
868+
..Default::default()
869+
},
870+
Packet {
871+
// Padding packet 2
872+
header: Header {
873+
sequence_number: 5004,
874+
timestamp: 1,
875+
..Default::default()
876+
},
877+
payload: Bytes::from_static(&[]),
878+
..Default::default()
879+
},
880+
Packet {
881+
// Sixth packet
882+
header: Header {
883+
sequence_number: 5005,
884+
timestamp: 2,
885+
..Default::default()
886+
},
887+
payload: bytes!(1),
888+
..Default::default()
889+
},
890+
Packet {
891+
// Seventh packet
892+
header: Header {
893+
sequence_number: 5006,
894+
timestamp: 2,
895+
marker: true,
896+
..Default::default()
897+
},
898+
payload: bytes!(7),
899+
..Default::default()
900+
},
901+
Packet {
902+
// Seventh packet
903+
header: Header {
904+
sequence_number: 5007,
905+
timestamp: 3,
906+
..Default::default()
907+
},
908+
payload: bytes!(1),
909+
..Default::default()
910+
},
911+
],
912+
samples: vec![
913+
Sample {
914+
// First sample
915+
data: bytes!(1, 2, 3),
916+
duration: Duration::from_secs(0),
917+
packet_timestamp: 1,
918+
prev_dropped_packets: 0,
919+
..Default::default()
920+
},
921+
Sample {
922+
// Second sample
923+
data: bytes!(1, 7),
924+
duration: Duration::from_secs(1),
925+
packet_timestamp: 2,
926+
prev_dropped_packets: 2,
927+
prev_padding_packets: 2,
928+
..Default::default()
929+
},
930+
],
931+
with_head_checker: true,
932+
head_bytes: vec![bytes!(1)],
933+
max_late: 50,
934+
max_late_timestamp: Duration::from_secs(2000),
935+
extra_pop_attempts: 1,
936+
..Default::default()
937+
},
824938
];
825939

826940
for t in test_data {
@@ -843,12 +957,17 @@ pub fn test_sample_builder() {
843957
s.push(p)
844958
}
845959

846-
// Here we need some fancy loop that pops from s until empty. This propbably exists somewhere already.
847-
// HAH, found it.
848960
while let Some(sample) = s.pop() {
849961
samples.push(sample)
850962
}
851963

964+
for _ in 0..t.extra_pop_attempts {
965+
// Pop some more
966+
while let Some(sample) = s.pop() {
967+
samples.push(sample)
968+
}
969+
}
970+
852971
// Current problem: Sample does not implement Eq. Either implement myself or find another way of comparison. (Derive does not work)
853972
assert_eq!(t.samples, samples, "{}", t.message);
854973
}

media/src/io/sample_builder/sample_sequence_location.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,26 @@ pub(crate) enum Comparison {
88
After,
99
}
1010

11+
pub(crate) struct Iterator<'a, T> {
12+
data: &'a [Option<T>],
13+
sample: SampleSequenceLocation,
14+
i: u16,
15+
}
16+
17+
impl<'a, T> std::iter::Iterator for Iterator<'a, T> {
18+
type Item = Option<&'a T>;
19+
20+
fn next(&mut self) -> Option<Self::Item> {
21+
if self.sample.compare(self.i) == Comparison::Inside {
22+
let old_i = self.i as usize;
23+
self.i = self.i.wrapping_add(1);
24+
return Some(self.data[old_i].as_ref());
25+
}
26+
27+
None
28+
}
29+
}
30+
1131
#[derive(Clone, Copy)]
1232
pub(crate) struct SampleSequenceLocation {
1333
/// head is the first packet in a sequence
@@ -50,4 +70,12 @@ impl SampleSequenceLocation {
5070
}
5171
Comparison::After
5272
}
73+
74+
pub(crate) fn range<'s, 'a, T>(&'s self, data: &'a [Option<T>]) -> Iterator<'a, T> {
75+
Iterator {
76+
data,
77+
sample: *self,
78+
i: self.head,
79+
}
80+
}
5381
}

media/src/io/sample_builder/sample_sequence_location_test.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,24 @@ fn test_sample_sequence_location_compare() {
2222
assert_eq!(Comparison::After, s2.compare(32));
2323
assert_eq!(Comparison::After, s2.compare(128));
2424
}
25+
26+
#[test]
27+
fn test_sample_sequence_location_range() {
28+
let mut data: Vec<Option<u16>> = vec![None; u16::MAX as usize + 1];
29+
30+
data[65533] = Some(65533);
31+
data[65535] = Some(65535);
32+
data[0] = Some(0);
33+
data[2] = Some(2);
34+
35+
let s = SampleSequenceLocation {
36+
head: 65533,
37+
tail: 3,
38+
};
39+
let reconstructed: Vec<_> = s.range(&data).map(|x| x.cloned()).collect();
40+
41+
assert_eq!(
42+
reconstructed,
43+
[Some(65533), None, Some(65535), Some(0), None, Some(2)]
44+
);
45+
}

media/src/lib.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,58 @@ use std::time::{Duration, SystemTime};
1515
/// A Sample contains encoded media and timing information
1616
#[derive(Debug)]
1717
pub struct Sample {
18+
/// The assembled data in the sample, as a bitstream.
19+
///
20+
/// The format is Codec dependant, but is always a bitstream format
21+
/// rather than the packetized format used when carried over RTP.
22+
///
23+
/// See: [`rtp::packetizer::Depacketizer`] and implementations of it for more details.
1824
pub data: Bytes,
25+
26+
/// The wallclock time when this sample was generated.
1927
pub timestamp: SystemTime,
28+
29+
/// The duration of this sample
2030
pub duration: Duration,
31+
32+
/// The RTP packet timestamp of this sample.
33+
///
34+
/// For all RTP packets that contributed to a single sample the timestamp is the same.
2135
pub packet_timestamp: u32,
36+
37+
/// The number of packets that were dropped prior to building this sample.
38+
///
39+
/// Packets being dropped doesn't necessarily indicate something wrong, e.g., packets are sometimes
40+
/// dropped because they aren't relevant for sample building.
2241
pub prev_dropped_packets: u16,
42+
43+
/// The number of packets that were identified as padding prior to building this sample.
44+
///
45+
/// Some implementations, notably libWebRTC, send padding packets to keep the send rate steady.
46+
/// These packets don't carry media and aren't useful for building samples.
47+
///
48+
/// This field can be combined with [`Sample::prev_dropped_packets`] to determine if any
49+
/// dropped packets are likely to have detrimental impact on the steadiness of the RTP stream.
50+
///
51+
/// ## Example adjustment
52+
///
53+
/// ```rust
54+
/// # use bytes::Bytes;
55+
/// # use std::time::{SystemTime, Duration};
56+
/// # use webrtc_media::Sample;
57+
/// # let sample = Sample {
58+
/// # data: Bytes::new(),
59+
/// # timestamp: SystemTime::now(),
60+
/// # duration: Duration::from_secs(0),
61+
/// # packet_timestamp: 0,
62+
/// # prev_dropped_packets: 10,
63+
/// # prev_padding_packets: 15
64+
/// # };
65+
/// #
66+
/// let adjusted_dropped =
67+
/// sample.prev_dropped_packets.saturating_sub(sample.prev_padding_packets);
68+
/// ```
69+
pub prev_padding_packets: u16,
2370
}
2471

2572
impl Default for Sample {
@@ -30,6 +77,7 @@ impl Default for Sample {
3077
duration: Duration::from_secs(0),
3178
packet_timestamp: 0,
3279
prev_dropped_packets: 0,
80+
prev_padding_packets: 0,
3381
}
3482
}
3583
}
@@ -54,6 +102,10 @@ impl PartialEq for Sample {
54102
if self.prev_dropped_packets != other.prev_dropped_packets {
55103
equal = false;
56104
}
105+
if self.prev_padding_packets != other.prev_padding_packets {
106+
equal = false;
107+
}
108+
57109
equal
58110
}
59111
}

0 commit comments

Comments
 (0)