Skip to content

Commit 461ef34

Browse files
authored
Merge pull request #421 from clia/master
Add alloc_close_notify config param.
2 parents 9afdf0a + af18f54 commit 461ef34

File tree

16 files changed

+102
-4
lines changed

16 files changed

+102
-4
lines changed

ice/src/agent/agent_vnet_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ pub(crate) async fn add_vnet_stun(wan_net: Arc<net::Net>) -> Result<turn::server
237237
realm: "webrtc.rs".to_owned(),
238238
auth_handler: Arc::new(TestAuthHandler::new()),
239239
channel_bind_timeout: Duration::from_secs(0),
240+
alloc_close_notify: None,
240241
})
241242
.await?;
242243

ice/src/candidate/candidate_relay_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ async fn test_relay_only_connection() -> Result<(), Error> {
6161
}),
6262
}],
6363
channel_bind_timeout: Duration::from_secs(0),
64+
alloc_close_notify: None,
6465
})
6566
.await?;
6667

ice/src/candidate/candidate_server_reflexive_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ async fn test_server_reflexive_only_connection() -> Result<()> {
4040
}),
4141
}],
4242
channel_bind_timeout: Duration::from_secs(0),
43+
alloc_close_notify: None,
4344
})
4445
.await?;
4546

turn/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Unreleased
44

55
* [#330 Fix the problem that the UDP port of the server relay is not released](https://github.com/webrtc-rs/webrtc/pull/330) by [@clia](https://github.com/clia).
6+
* Added `alloc_close_notify` config parameter to `ServerConfig` and `Allocation`, to receive notify on allocation close event, with metrics data.
67

78
## v0.6.1
89

turn/examples/turn_server_udp.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ async fn main() -> Result<(), Error> {
125125
realm: realm.to_owned(),
126126
auth_handler: Arc::new(MyAuthHandler::new(cred_map)),
127127
channel_bind_timeout: Duration::from_secs(0),
128+
alloc_close_notify: None,
128129
})
129130
.await?;
130131

turn/src/allocation/allocation_manager.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,21 @@ use crate::relay::*;
88
use futures::future;
99
use std::collections::HashMap;
1010
use stun::textattrs::Username;
11+
use tokio::sync::mpsc;
1112
use util::Conn;
1213

1314
// ManagerConfig a bag of config params for Manager.
1415
pub struct ManagerConfig {
1516
pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
17+
pub alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
1618
}
1719

1820
// Manager is used to hold active allocations
1921
pub struct Manager {
2022
allocations: AllocationMap,
2123
reservations: Arc<Mutex<HashMap<String, u16>>>,
2224
relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
25+
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
2326
}
2427

2528
impl Manager {
@@ -29,6 +32,7 @@ impl Manager {
2932
allocations: Arc::new(Mutex::new(HashMap::new())),
3033
reservations: Arc::new(Mutex::new(HashMap::new())),
3134
relay_addr_generator: config.relay_addr_generator,
35+
alloc_close_notify: config.alloc_close_notify,
3236
}
3337
}
3438

@@ -95,7 +99,14 @@ impl Manager {
9599
.relay_addr_generator
96100
.allocate_conn(true, requested_port)
97101
.await?;
98-
let mut a = Allocation::new(turn_socket, relay_socket, relay_addr, five_tuple, username);
102+
let mut a = Allocation::new(
103+
turn_socket,
104+
relay_socket,
105+
relay_addr,
106+
five_tuple,
107+
username,
108+
self.alloc_close_notify.clone(),
109+
);
99110
a.allocations = Some(Arc::clone(&self.allocations));
100111

101112
log::debug!("listening on relay addr: {:?}", a.relay_addr);

turn/src/allocation/allocation_manager/allocation_manager_test.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::{
1818
};
1919
use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute};
2020
use tokio::net::UdpSocket;
21+
use tokio::sync::mpsc::Sender;
2122
use util::vnet::net::*;
2223

2324
fn new_test_manager() -> Manager {
@@ -26,6 +27,7 @@ fn new_test_manager() -> Manager {
2627
address: "0.0.0.0".to_owned(),
2728
net: Arc::new(Net::new(None)),
2829
}),
30+
alloc_close_notify: None,
2931
};
3032
Manager::new(config)
3133
}
@@ -395,7 +397,9 @@ impl AuthHandler for TestAuthHandler {
395397
}
396398
}
397399

398-
async fn create_server() -> Result<(Server, u16)> {
400+
async fn create_server(
401+
alloc_close_notify: Option<Sender<AllocationInfo>>,
402+
) -> Result<(Server, u16)> {
399403
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
400404
let server_port = conn.local_addr()?.port();
401405

@@ -411,6 +415,7 @@ async fn create_server() -> Result<(Server, u16)> {
411415
realm: "webrtc.rs".to_owned(),
412416
auth_handler: Arc::new(TestAuthHandler {}),
413417
channel_bind_timeout: Duration::from_secs(0),
418+
alloc_close_notify,
414419
})
415420
.await?;
416421

@@ -437,7 +442,7 @@ async fn create_client(username: String, server_port: u16) -> Result<Client> {
437442
#[cfg(feature = "metrics")]
438443
#[tokio::test]
439444
async fn test_get_allocations_info() -> Result<()> {
440-
let (server, server_port) = create_server().await?;
445+
let (server, server_port) = create_server(None).await?;
441446

442447
let client1 = create_client("user1".to_owned(), server_port).await?;
443448
client1.listen().await?;
@@ -489,7 +494,7 @@ async fn test_get_allocations_info() -> Result<()> {
489494
#[cfg(feature = "metrics")]
490495
#[tokio::test]
491496
async fn test_get_allocations_info_bytes_count() -> Result<()> {
492-
let (server, server_port) = create_server().await?;
497+
let (server, server_port) = create_server(None).await?;
493498

494499
let client = create_client("foo".to_owned(), server_port).await?;
495500

@@ -558,3 +563,45 @@ async fn test_get_allocations_info_bytes_count() -> Result<()> {
558563

559564
Ok(())
560565
}
566+
567+
#[cfg(feature = "metrics")]
568+
#[tokio::test]
569+
async fn test_alloc_close_notify() -> Result<()> {
570+
let (tx, mut rx) = mpsc::channel::<AllocationInfo>(1);
571+
572+
tokio::spawn(async move {
573+
if let Some(alloc) = rx.recv().await {
574+
assert_eq!(alloc.relayed_bytes, 50);
575+
}
576+
});
577+
578+
let (server, server_port) = create_server(Some(tx)).await?;
579+
580+
let client = create_client("foo".to_owned(), server_port).await?;
581+
582+
client.listen().await?;
583+
584+
assert!(server.get_allocations_info(None).await?.is_empty());
585+
586+
let conn = client.allocate().await?;
587+
let addr = client
588+
.send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
589+
.await?;
590+
591+
assert!(!server.get_allocations_info(None).await?.is_empty());
592+
593+
for _ in 0..10 {
594+
conn.send_to(b"Hello", addr).await?;
595+
596+
tokio::time::sleep(Duration::from_millis(100)).await;
597+
}
598+
599+
tokio::time::sleep(Duration::from_millis(1000)).await;
600+
601+
client.close().await?;
602+
server.close().await?;
603+
604+
tokio::time::sleep(Duration::from_millis(1000)).await;
605+
606+
Ok(())
607+
}

turn/src/allocation/allocation_test.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ async fn test_has_permission() -> Result<()> {
1616
relay_addr,
1717
FiveTuple::default(),
1818
TextAttribute::new(ATTR_USERNAME, "user".into()),
19+
None,
1920
);
2021

2122
let addr1 = SocketAddr::from_str("127.0.0.1:3478")?;
@@ -53,6 +54,7 @@ async fn test_add_permission() -> Result<()> {
5354
relay_addr,
5455
FiveTuple::default(),
5556
TextAttribute::new(ATTR_USERNAME, "user".into()),
57+
None,
5658
);
5759

5860
let addr = SocketAddr::from_str("127.0.0.1:3478")?;
@@ -76,6 +78,7 @@ async fn test_remove_permission() -> Result<()> {
7678
relay_addr,
7779
FiveTuple::default(),
7880
TextAttribute::new(ATTR_USERNAME, "user".into()),
81+
None,
7982
);
8083

8184
let addr = SocketAddr::from_str("127.0.0.1:3478")?;
@@ -108,6 +111,7 @@ async fn test_add_channel_bind() -> Result<()> {
108111
relay_addr,
109112
FiveTuple::default(),
110113
TextAttribute::new(ATTR_USERNAME, "user".into()),
114+
None,
111115
);
112116

113117
let addr = SocketAddr::from_str("127.0.0.1:3478")?;
@@ -141,6 +145,7 @@ async fn test_get_channel_by_number() -> Result<()> {
141145
relay_addr,
142146
FiveTuple::default(),
143147
TextAttribute::new(ATTR_USERNAME, "user".into()),
148+
None,
144149
);
145150

146151
let addr = SocketAddr::from_str("127.0.0.1:3478")?;
@@ -176,6 +181,7 @@ async fn test_get_channel_by_addr() -> Result<()> {
176181
relay_addr,
177182
FiveTuple::default(),
178183
TextAttribute::new(ATTR_USERNAME, "user".into()),
184+
None,
179185
);
180186

181187
let addr = SocketAddr::from_str("127.0.0.1:3478")?;
@@ -207,6 +213,7 @@ async fn test_remove_channel_bind() -> Result<()> {
207213
relay_addr,
208214
FiveTuple::default(),
209215
TextAttribute::new(ATTR_USERNAME, "user".into()),
216+
None,
210217
);
211218

212219
let addr = SocketAddr::from_str("127.0.0.1:3478")?;
@@ -243,6 +250,7 @@ async fn test_allocation_refresh() -> Result<()> {
243250
relay_addr,
244251
FiveTuple::default(),
245252
TextAttribute::new(ATTR_USERNAME, "user".into()),
253+
None,
246254
);
247255

248256
a.start(DEFAULT_LIFETIME).await;
@@ -264,6 +272,7 @@ async fn test_allocation_close() -> Result<()> {
264272
relay_addr,
265273
FiveTuple::default(),
266274
TextAttribute::new(ATTR_USERNAME, "user".into()),
275+
None,
267276
);
268277

269278
// add mock lifetimeTimer

turn/src/allocation/channel_bind/channel_bind_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ async fn create_channel_bind(lifetime: Duration) -> Result<Allocation> {
1717
relay_addr,
1818
FiveTuple::default(),
1919
TextAttribute::new(ATTR_USERNAME, "user".into()),
20+
None,
2021
);
2122

2223
let addr = SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0);

turn/src/allocation/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub struct Allocation {
8383
closed: AtomicBool, // Option<mpsc::Receiver<()>>,
8484
pub(crate) relayed_bytes: AtomicUsize,
8585
drop_tx: Option<Sender<u32>>,
86+
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
8687
}
8788

8889
fn addr2ipfingerprint(addr: &SocketAddr) -> String {
@@ -97,6 +98,7 @@ impl Allocation {
9798
relay_addr: SocketAddr,
9899
five_tuple: FiveTuple,
99100
username: Username,
101+
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
100102
) -> Self {
101103
Allocation {
102104
protocol: PROTO_UDP,
@@ -113,6 +115,7 @@ impl Allocation {
113115
closed: AtomicBool::new(false),
114116
relayed_bytes: Default::default(),
115117
drop_tx: None,
118+
alloc_close_notify,
116119
}
117120
}
118121

@@ -246,6 +249,17 @@ impl Allocation {
246249
let _ = self.turn_socket.close().await;
247250
let _ = self.relay_socket.close().await;
248251

252+
if let Some(notify_tx) = &self.alloc_close_notify {
253+
let _ = notify_tx
254+
.send(AllocationInfo {
255+
five_tuple: self.five_tuple,
256+
username: self.username.text.clone(),
257+
#[cfg(feature = "metrics")]
258+
relayed_bytes: self.relayed_bytes.load(Ordering::Acquire),
259+
})
260+
.await;
261+
}
262+
249263
Ok(())
250264
}
251265

0 commit comments

Comments
 (0)