Skip to content

Commit 203529d

Browse files
Support providing RTP extensions in local tracks (#336)
1 parent de6e781 commit 203529d

File tree

6 files changed

+283
-40
lines changed

6 files changed

+283
-40
lines changed

rtp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Unreleased
44

55
* Increased minimum support rust version to `1.60.0`.
6+
* Adds a new generic header extensions type `rtp::extension::HeaderExtension` which allows abstracting over all known extensions as well as custom extensions. [#336](https://github.com/webrtc-rs/webrtc/pull/336) by [@k0nserv](https://github.com/k0nserv).
67

78
## v0.6.7
89

rtp/src/extension/mod.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,91 @@
1+
use std::borrow::Cow;
2+
use std::fmt;
3+
4+
use util::{Marshal, MarshalSize};
5+
16
pub mod abs_send_time_extension;
27
pub mod audio_level_extension;
38
pub mod transport_cc_extension;
49
pub mod video_orientation_extension;
10+
11+
/// A generic RTP header extension.
12+
pub enum HeaderExtension {
13+
AbsSendTime(abs_send_time_extension::AbsSendTimeExtension),
14+
AudioLevel(audio_level_extension::AudioLevelExtension),
15+
TransportCc(transport_cc_extension::TransportCcExtension),
16+
VideoOrientation(video_orientation_extension::VideoOrientationExtension),
17+
18+
/// A custom extension
19+
Custom {
20+
uri: Cow<'static, str>,
21+
extension: Box<dyn Marshal + Send + Sync + 'static>,
22+
},
23+
}
24+
25+
impl HeaderExtension {
26+
pub fn uri(&self) -> Cow<'static, str> {
27+
use HeaderExtension::*;
28+
29+
match self {
30+
AbsSendTime(_) => "http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time".into(),
31+
AudioLevel(_) => "urn:ietf:params:rtp-hdrext:ssrc-audio-level".into(),
32+
TransportCc(_) => {
33+
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01".into()
34+
}
35+
VideoOrientation(_) => "urn:3gpp:video-orientation".into(),
36+
Custom { uri, .. } => uri.clone(),
37+
}
38+
}
39+
40+
pub fn is_same(&self, other: &Self) -> bool {
41+
use HeaderExtension::*;
42+
match (self, other) {
43+
(AbsSendTime(_), AbsSendTime(_)) => true,
44+
(AudioLevel(_), AudioLevel(_)) => true,
45+
(TransportCc(_), TransportCc(_)) => true,
46+
(VideoOrientation(_), VideoOrientation(_)) => true,
47+
(Custom { uri, .. }, Custom { uri: other_uri, .. }) => uri == other_uri,
48+
_ => false,
49+
}
50+
}
51+
}
52+
53+
impl MarshalSize for HeaderExtension {
54+
fn marshal_size(&self) -> usize {
55+
use HeaderExtension::*;
56+
match self {
57+
AbsSendTime(ext) => ext.marshal_size(),
58+
AudioLevel(ext) => ext.marshal_size(),
59+
TransportCc(ext) => ext.marshal_size(),
60+
VideoOrientation(ext) => ext.marshal_size(),
61+
Custom { extension: ext, .. } => ext.marshal_size(),
62+
}
63+
}
64+
}
65+
66+
impl Marshal for HeaderExtension {
67+
fn marshal_to(&self, buf: &mut [u8]) -> util::Result<usize> {
68+
use HeaderExtension::*;
69+
match self {
70+
AbsSendTime(ext) => ext.marshal_to(buf),
71+
AudioLevel(ext) => ext.marshal_to(buf),
72+
TransportCc(ext) => ext.marshal_to(buf),
73+
VideoOrientation(ext) => ext.marshal_to(buf),
74+
Custom { extension: ext, .. } => ext.marshal_to(buf),
75+
}
76+
}
77+
}
78+
79+
impl fmt::Debug for HeaderExtension {
80+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81+
use HeaderExtension::*;
82+
83+
match self {
84+
AbsSendTime(ext) => f.debug_tuple("AbsSendTime").field(ext).finish(),
85+
AudioLevel(ext) => f.debug_tuple("AudioLevel").field(ext).finish(),
86+
TransportCc(ext) => f.debug_tuple("TransportCc").field(ext).finish(),
87+
VideoOrientation(ext) => f.debug_tuple("VideoOrientation").field(ext).finish(),
88+
Custom { uri, extension: _ } => f.debug_struct("Custom").field("uri", uri).finish(),
89+
}
90+
}
91+
}

webrtc/CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# webrtc-rc changelog
1+
# webrtc-rs changelog
22

33
## Unreleased
44

@@ -11,14 +11,16 @@ directions that should not send. [#316](https://github.com/webrtc-rs/webrtc/pull
1111
* Fixed a panic that would sometimes happen when collecting stats. [#327](https://github.com/webrtc-rs/webrtc/pull/327) by [@k0nserv](https://github.com/k0nserv).
1212
* Added new extension marshaller/unmarshaller for VideoOrientation, and made marshallers serializable via serde [#331](https://github.com/webrtc-rs/webrtc/pull/331) [#332](https://github.com/webrtc-rs/webrtc/pull/332)
1313
* Updated minimum rust version to `1.60.0`
14-
14+
* Added a new `write_rtp_with_extensions` method to `TrackLocalStaticSample` and `TrackLocalStatiRTP`. [#336](https://github.com/webrtc-rs/webrtc/pull/336) by [@k0nserv](https://github.com/k0nserv).
15+
* Added a new `sample_writer` helper to `TrackLocalStaticSample`. [#336](https://github.com/webrtc-rs/webrtc/pull/336) by [@k0nserv](https://github.com/k0nserv).
1516

1617
#### Breaking changes
1718

1819
* Allow one single direction for extmap matching. [#321](https://github.com/webrtc-rs/webrtc/pull/321). API
1920
change for MediaEngine::register_header_extension
2021
* Removes support for Plan-B. All major implementations of WebRTC now support unified and continuing support for plan-b is an undue maintenance burden when unified can be used. See [“Unified Plan” Transition Guide (JavaScript)](https://docs.google.com/document/d/1-ZfikoUtoJa9k-GZG1daN0BU3IjIanQ_JSscHxQesvU/) for an overview of the changes required to migrate. [#320](https://github.com/webrtc-rs/webrtc/pull/320) by [@algesten](https://github.com/algesten).
2122

23+
2224
## 0.5.1
2325

2426
* Promote agent lock in ice_gather.rs create_agent() to top level of the function to avoid a race condition. [#290 Promote create_agent lock to top of function, to avoid race condition](https://github.com/webrtc-rs/webrtc/pull/290) contributed by [efer-ms](https://github.com/efer-ms)

webrtc/src/track/track_local/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub(crate) struct TrackBinding {
104104
id: String,
105105
ssrc: SSRC,
106106
payload_type: PayloadType,
107+
params: RTCRtpParameters,
107108
write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>,
108109
sender_paused: Arc<AtomicBool>,
109110
}

webrtc/src/track/track_local/track_local_static_rtp.rs

Lines changed: 91 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
use std::collections::HashMap;
2+
13
use super::*;
24

35
use crate::error::flatten_errs;
6+
use bytes::BytesMut;
47
use tokio::sync::Mutex;
8+
use util::{Marshal, MarshalSize};
59

610
/// TrackLocalStaticRTP is a TrackLocal that has a pre-set codec and accepts RTP Packets.
711
/// If you wish to send a media.Sample use TrackLocalStaticSample
@@ -42,6 +46,91 @@ impl TrackLocalStaticRTP {
4246
.iter()
4347
.all(|b| b.sender_paused.load(Ordering::SeqCst))
4448
}
49+
50+
/// write_rtp_with_extensions writes a RTP Packet to the TrackLocalStaticRTP
51+
/// If one PeerConnection fails the packets will still be sent to
52+
/// all PeerConnections. The error message will contain the ID of the failed
53+
/// PeerConnections so you can remove them
54+
///
55+
/// If the RTCRtpSender direction is such that no packets should be sent, any call to this
56+
/// function are blocked internally. Care must be taken to not increase the sequence number
57+
/// while the sender is paused. While the actual _sending_ is blocked, the receiver will
58+
/// miss out when the sequence number "rolls over", which in turn will break SRTP.
59+
///
60+
/// Extensions that are already configured on the packet are overwritten by extensions in
61+
/// `extensions`.
62+
pub async fn write_rtp_with_extensions(
63+
&self,
64+
p: &rtp::packet::Packet,
65+
extensions: &[rtp::extension::HeaderExtension],
66+
) -> Result<usize> {
67+
let mut n = 0;
68+
let mut write_errs = vec![];
69+
let mut pkt = p.clone();
70+
71+
let bindings = {
72+
let bindings = self.bindings.lock().await;
73+
bindings.clone()
74+
};
75+
// Prepare the extensions data
76+
let extension_data: HashMap<_, _> = extensions
77+
.iter()
78+
.flat_map(|extension| {
79+
let buf = {
80+
let mut buf = BytesMut::with_capacity(extension.marshal_size());
81+
buf.resize(extension.marshal_size(), 0);
82+
if let Err(err) = extension.marshal_to(&mut buf) {
83+
write_errs.push(Error::Util(err));
84+
return None;
85+
}
86+
87+
buf.freeze()
88+
};
89+
90+
Some((extension.uri(), buf))
91+
})
92+
.collect();
93+
94+
for b in bindings.into_iter() {
95+
if b.is_sender_paused() {
96+
// See caveat in function doc.
97+
continue;
98+
}
99+
pkt.header.ssrc = b.ssrc;
100+
pkt.header.payload_type = b.payload_type;
101+
102+
for (uri, data) in extension_data.iter() {
103+
if let Some(id) = b
104+
.params
105+
.header_extensions
106+
.iter()
107+
.find(|ext| &ext.uri == uri)
108+
.map(|ext| ext.id)
109+
{
110+
if let Err(err) = pkt.header.set_extension(id as u8, data.clone()) {
111+
write_errs.push(Error::Rtp(err));
112+
continue;
113+
}
114+
}
115+
}
116+
117+
if let Some(write_stream) = &b.write_stream {
118+
match write_stream.write_rtp(&pkt).await {
119+
Ok(m) => {
120+
n += m;
121+
}
122+
Err(err) => {
123+
write_errs.push(err);
124+
}
125+
}
126+
} else {
127+
write_errs.push(Error::new("track binding has none write_stream".to_owned()));
128+
}
129+
}
130+
131+
flatten_errs(write_errs)?;
132+
Ok(n)
133+
}
45134
}
46135

47136
#[async_trait]
@@ -63,6 +152,7 @@ impl TrackLocal for TrackLocalStaticRTP {
63152
ssrc: t.ssrc(),
64153
payload_type: codec.payload_type,
65154
write_stream: t.write_stream(),
155+
params: t.params.clone(),
66156
id: t.id(),
67157
sender_paused: t.paused.clone(),
68158
}));
@@ -133,37 +223,7 @@ impl TrackLocalWriter for TrackLocalStaticRTP {
133223
/// while the sender is paused. While the actual _sending_ is blocked, the receiver will
134224
/// miss out when the sequence number "rolls over", which in turn will break SRTP.
135225
async fn write_rtp(&self, p: &rtp::packet::Packet) -> Result<usize> {
136-
let mut n = 0;
137-
let mut write_errs = vec![];
138-
let mut pkt = p.clone();
139-
140-
let bindings = {
141-
let bindings = self.bindings.lock().await;
142-
bindings.clone()
143-
};
144-
for b in bindings {
145-
if b.is_sender_paused() {
146-
// See caveat in function doc.
147-
continue;
148-
}
149-
pkt.header.ssrc = b.ssrc;
150-
pkt.header.payload_type = b.payload_type;
151-
if let Some(write_stream) = &b.write_stream {
152-
match write_stream.write_rtp(&pkt).await {
153-
Ok(m) => {
154-
n += m;
155-
}
156-
Err(err) => {
157-
write_errs.push(err);
158-
}
159-
}
160-
} else {
161-
write_errs.push(Error::new("track binding has none write_stream".to_owned()));
162-
}
163-
}
164-
165-
flatten_errs(write_errs)?;
166-
Ok(n)
226+
self.write_rtp_with_extensions(p, &[]).await
167227
}
168228

169229
/// write writes a RTP Packet as a buffer to the TrackLocalStaticRTP

0 commit comments

Comments
 (0)