Skip to content

Commit bc5c52e

Browse files
author
yngrtc
committed
[RTCP] remove unnecessary async function/trait
1 parent ad555d1 commit bc5c52e

File tree

6 files changed

+49
-65
lines changed

6 files changed

+49
-65
lines changed

interceptor/src/nack/responder/mod.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@ impl ResponderInternal {
6969
};
7070

7171
for n in &nack.nacks {
72+
// can't use n.range() since this callback is async fn,
73+
// instead, use NackPair into_iter()
7274
let stream2 = Arc::clone(&stream);
73-
n.range(Box::new(
75+
let f = Box::new(
7476
move |seq: u16| -> Pin<Box<dyn Future<Output = bool> + Send + 'static>> {
7577
let stream3 = Arc::clone(&stream2);
7678
Box::pin(async move {
@@ -80,12 +82,15 @@ impl ResponderInternal {
8082
log::warn!("failed resending nacked packet: {}", err);
8183
}
8284
}
83-
8485
true
8586
})
8687
},
87-
))
88-
.await;
88+
);
89+
for packet_id in n.into_iter() {
90+
if !f(packet_id).await {
91+
return;
92+
}
93+
}
8994
}
9095
}
9196
}

rtcp/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,3 @@ util = { version = "0.7.0", path = "../util", package = "webrtc-util", default-f
1515

1616
bytes = "1"
1717
thiserror = "1.0"
18-
19-
[dev-dependencies]
20-
tokio = { version = "1.19", features = ["sync"] }
21-
tokio-test = "0.4.0" # must match the min version of the `tokio` crate above

rtcp/src/transport_feedbacks/transport_layer_nack/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ use util::marshal::{Marshal, MarshalSize, Unmarshal};
77
use bytes::{Buf, BufMut};
88
use std::any::Any;
99
use std::fmt;
10-
use std::future::Future;
11-
use std::pin::Pin;
1210

1311
/// PacketBitmap shouldn't be used like a normal integral,
1412
/// so it's type is masked here. Access it with PacketList().
@@ -24,9 +22,6 @@ pub struct NackPair {
2422
pub lost_packets: PacketBitmap,
2523
}
2624

27-
pub type RangeFn =
28-
Box<dyn (Fn(u16) -> Pin<Box<dyn Future<Output = bool> + Send + 'static>>) + Send + Sync>;
29-
3025
pub struct NackIterator {
3126
packet_id: u16,
3227
bitfield: PacketBitmap,
@@ -72,9 +67,12 @@ impl NackPair {
7267
self.into_iter().collect()
7368
}
7469

75-
pub async fn range(&self, f: RangeFn) {
70+
pub fn range<F>(&self, f: F)
71+
where
72+
F: Fn(u16) -> bool,
73+
{
7674
for packet_id in self.into_iter() {
77-
if !f(packet_id).await {
75+
if !f(packet_id) {
7876
return;
7977
}
8078
}

rtcp/src/transport_feedbacks/transport_layer_nack/transport_layer_nack_test.rs

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use super::*;
22
use bytes::Bytes;
3-
use std::sync::Arc;
4-
use tokio::sync::Mutex;
3+
use std::sync::{Arc, Mutex};
54

65
#[test]
76
fn test_transport_layer_nack_unmarshal() {
@@ -200,48 +199,38 @@ fn test_nack_pair() {
200199
);
201200
}
202201

203-
#[tokio::test]
204-
async fn test_nack_pair_range() {
202+
#[test]
203+
fn test_nack_pair_range() {
205204
let n = NackPair {
206205
packet_id: 42,
207206
lost_packets: 2,
208207
};
209208

210209
let out = Arc::new(Mutex::new(vec![]));
211210
let out1 = Arc::clone(&out);
212-
n.range(Box::new(
213-
move |s: u16| -> Pin<Box<dyn Future<Output = bool> + Send + 'static>> {
214-
let out2 = Arc::clone(&out1);
215-
Box::pin(async move {
216-
let mut o = out2.lock().await;
217-
o.push(s);
218-
true
219-
})
220-
},
221-
))
222-
.await;
211+
n.range(move |s: u16| -> bool {
212+
let out2 = Arc::clone(&out1);
213+
let mut o = out2.lock().unwrap();
214+
o.push(s);
215+
true
216+
});
223217

224218
{
225-
let o = out.lock().await;
219+
let o = out.lock().unwrap();
226220
assert_eq!(*o, &[42, 44]);
227221
}
228222

229223
let out = Arc::new(Mutex::new(vec![]));
230224
let out1 = Arc::clone(&out);
231-
n.range(Box::new(
232-
move |s: u16| -> Pin<Box<dyn Future<Output = bool> + Send + 'static>> {
233-
let out2 = Arc::clone(&out1);
234-
Box::pin(async move {
235-
let mut o = out2.lock().await;
236-
o.push(s);
237-
false
238-
})
239-
},
240-
))
241-
.await;
225+
n.range(move |s: u16| -> bool {
226+
let out2 = Arc::clone(&out1);
227+
let mut o = out2.lock().unwrap();
228+
o.push(s);
229+
false
230+
});
242231

243232
{
244-
let o = out.lock().await;
233+
let o = out.lock().unwrap();
245234
assert_eq!(*o, &[42]);
246235
}
247236
}

rtp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repository = "https://github.com/webrtc-rs/rtp"
1111
rust-version = "1.63.0"
1212

1313
[dependencies]
14-
util = { version = "0.7.0", path = "../util", package = "webrtc-util", default-features = false, features = ["marshal", "sync"] }
14+
util = { version = "0.7.0", path = "../util", package = "webrtc-util", default-features = false, features = ["marshal"] }
1515

1616
bytes = "1"
1717
rand = "0.8.5"

rtp/src/sequence.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use std::fmt;
2+
use std::sync::atomic::{AtomicU16, AtomicU64, Ordering};
23
use std::sync::Arc;
34

4-
use util::sync::Mutex;
5-
65
/// Sequencer generates sequential sequence numbers for building RTP packets
76
pub trait Sequencer: fmt::Debug {
87
fn next_sequence_number(&self) -> u16;
@@ -20,10 +19,10 @@ impl Clone for Box<dyn Sequencer + Send + Sync> {
2019
/// number
2120
pub fn new_random_sequencer() -> impl Sequencer {
2221
let c = Counters {
23-
sequence_number: rand::random::<u16>(),
24-
roll_over_count: 0,
22+
sequence_number: Arc::new(AtomicU16::new(rand::random::<u16>())),
23+
roll_over_count: Arc::new(AtomicU64::new(0)),
2524
};
26-
SequencerImpl(Arc::new(Mutex::new(c)))
25+
SequencerImpl(c)
2726
}
2827

2928
/// NewFixedSequencer returns a new sequencer starting from a specific
@@ -32,42 +31,39 @@ pub fn new_fixed_sequencer(s: u16) -> impl Sequencer {
3231
let sequence_number = if s == 0 { u16::MAX } else { s - 1 };
3332

3433
let c = Counters {
35-
sequence_number,
36-
roll_over_count: 0,
34+
sequence_number: Arc::new(AtomicU16::new(sequence_number)),
35+
roll_over_count: Arc::new(AtomicU64::new(0)),
3736
};
3837

39-
SequencerImpl(Arc::new(Mutex::new(c)))
38+
SequencerImpl(c)
4039
}
4140

4241
#[derive(Debug, Clone)]
43-
struct SequencerImpl(Arc<Mutex<Counters>>);
42+
struct SequencerImpl(Counters);
4443

45-
#[derive(Debug)]
44+
#[derive(Debug, Clone)]
4645
struct Counters {
47-
sequence_number: u16,
48-
roll_over_count: u64,
46+
sequence_number: Arc<AtomicU16>,
47+
roll_over_count: Arc<AtomicU64>,
4948
}
5049

5150
impl Sequencer for SequencerImpl {
5251
/// NextSequenceNumber increment and returns a new sequence number for
5352
/// building RTP packets
5453
fn next_sequence_number(&self) -> u16 {
55-
let mut lock = self.0.lock();
56-
57-
if lock.sequence_number == u16::MAX {
58-
lock.roll_over_count += 1;
59-
lock.sequence_number = 0;
54+
if self.0.sequence_number.load(Ordering::SeqCst) == u16::MAX {
55+
self.0.roll_over_count.fetch_add(1, Ordering::SeqCst);
56+
self.0.sequence_number.store(0, Ordering::SeqCst);
57+
0
6058
} else {
61-
lock.sequence_number += 1;
59+
self.0.sequence_number.fetch_add(1, Ordering::SeqCst) + 1
6260
}
63-
64-
lock.sequence_number
6561
}
6662

6763
/// RollOverCount returns the amount of times the 16bit sequence number
6864
/// has wrapped
6965
fn roll_over_count(&self) -> u64 {
70-
self.0.lock().roll_over_count
66+
self.0.roll_over_count.load(Ordering::SeqCst)
7167
}
7268

7369
fn clone_to(&self) -> Box<dyn Sequencer + Send + Sync> {

0 commit comments

Comments
 (0)