Skip to content

Commit c4e6ecf

Browse files
authored
Add method to delete allocation by username (#263)
1 parent 6d1cc95 commit c4e6ecf

File tree

16 files changed

+354
-154
lines changed

16 files changed

+354
-154
lines changed

turn/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## Unreleased
44

5+
* Added `delete_allocations_by_username` method on `Server`. This method provides possibility to manually delete allocation [#263](https://github.com/webrtc-rs/webrtc/pull/263) by [@logist322](https://github.com/logist322).
6+
7+
58
## v0.6.0
69

710
* [#15 update deps + loosen some requirements](https://github.com/webrtc-rs/turn/pull/15) by [@melekes](https://github.com/melekes).

turn/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ repository = "https://github.com/webrtc-rs/turn"
1212
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1313

1414
[dependencies]
15+
futures = "0.3.21"
1516
util = { version = "0.6.0", path = "../util", package = "webrtc-util", default-features = false, features = ["conn", "vnet"] }
1617
stun = { version = "0.4.3", path = "../stun" }
1718

turn/examples/turn_client_udp.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::net::UdpSocket;
77
use tokio::time::Duration;
88
use util::Conn;
99

10-
// RUST_LOG=trace cargo run --color=always --package webrtc-turn --example turn_client_udp -- --host 0.0.0.0 --user user=pass --ping
10+
// RUST_LOG=trace cargo run --color=always --package turn --example turn_client_udp -- --host 0.0.0.0 --user user=pass --ping
1111

1212
#[tokio::main]
1313
async fn main() -> Result<(), Error> {

turn/examples/turn_server_udp.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl AuthHandler for MyAuthHandler {
3939
}
4040
}
4141

42-
// RUST_LOG=trace cargo run --color=always --package webrtc-turn --example turn_server_udp -- --public-ip 0.0.0.0 --users user=pass
42+
// RUST_LOG=trace cargo run --color=always --package turn --example turn_server_udp -- --public-ip 0.0.0.0 --users user=pass
4343

4444
#[tokio::main]
4545
async fn main() -> Result<(), Error> {

turn/src/allocation/allocation_manager.rs

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use super::*;
55
use crate::error::*;
66
use crate::relay::*;
77

8+
use futures::future;
89
use std::collections::HashMap;
10+
use stun::textattrs::Username;
911
use util::Conn;
1012

1113
// ManagerConfig a bag of config params for Manager.
@@ -34,16 +36,15 @@ impl Manager {
3436
pub async fn close(&self) -> Result<()> {
3537
let allocations = self.allocations.lock().await;
3638
for a in allocations.values() {
37-
let mut a = a.lock().await;
3839
a.close().await?;
3940
}
4041
Ok(())
4142
}
4243

4344
// get_allocation fetches the allocation matching the passed FiveTuple
44-
pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Mutex<Allocation>>> {
45+
pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>> {
4546
let allocations = self.allocations.lock().await;
46-
allocations.get(&five_tuple.fingerprint()).map(Arc::clone)
47+
allocations.get(five_tuple).map(Arc::clone)
4748
}
4849

4950
// create_allocation creates a new allocation and starts relaying
@@ -53,7 +54,8 @@ impl Manager {
5354
turn_socket: Arc<dyn Conn + Send + Sync>,
5455
requested_port: u16,
5556
lifetime: Duration,
56-
) -> Result<Arc<Mutex<Allocation>>> {
57+
username: Username,
58+
) -> Result<Arc<Allocation>> {
5759
if lifetime == Duration::from_secs(0) {
5860
return Err(Error::ErrLifetimeZero);
5961
}
@@ -66,36 +68,68 @@ impl Manager {
6668
.relay_addr_generator
6769
.allocate_conn(true, requested_port)
6870
.await?;
69-
let mut a = Allocation::new(turn_socket, relay_socket, relay_addr, five_tuple.clone());
71+
let mut a = Allocation::new(
72+
turn_socket,
73+
relay_socket,
74+
relay_addr,
75+
five_tuple.clone(),
76+
username,
77+
);
7078
a.allocations = Some(Arc::clone(&self.allocations));
7179

7280
log::debug!("listening on relay addr: {:?}", a.relay_addr);
7381
a.start(lifetime).await;
7482
a.packet_handler().await;
7583

76-
let a = Arc::new(Mutex::new(a));
84+
let a = Arc::new(a);
7785
{
7886
let mut allocations = self.allocations.lock().await;
79-
allocations.insert(five_tuple.fingerprint(), Arc::clone(&a));
87+
allocations.insert(five_tuple, Arc::clone(&a));
8088
}
8189

8290
Ok(a)
8391
}
8492

8593
// delete_allocation removes an allocation
8694
pub async fn delete_allocation(&self, five_tuple: &FiveTuple) {
87-
let fingerprint = five_tuple.fingerprint();
95+
let allocation = self.allocations.lock().await.remove(five_tuple);
8896

89-
let mut allocations = self.allocations.lock().await;
90-
let allocation = allocations.remove(&fingerprint);
9197
if let Some(a) = allocation {
92-
let mut a = a.lock().await;
9398
if let Err(err) = a.close().await {
9499
log::error!("Failed to close allocation: {}", err);
95100
}
96101
}
97102
}
98103

104+
/// Deletes the [`Allocation`]s according to the specified `username`.
105+
pub async fn delete_allocations_by_username(&self, name: &str) {
106+
let to_delete = {
107+
let mut allocations = self.allocations.lock().await;
108+
109+
let mut to_delete = Vec::new();
110+
111+
// TODO(logist322): Use `.drain_filter()` once stabilized.
112+
allocations.retain(|_, allocation| {
113+
let match_name = allocation.username.text == name;
114+
115+
if match_name {
116+
to_delete.push(Arc::clone(allocation));
117+
}
118+
119+
!match_name
120+
});
121+
122+
to_delete
123+
};
124+
125+
future::join_all(to_delete.iter().map(|a| async move {
126+
if let Err(err) = a.close().await {
127+
log::error!("Failed to close allocation: {}", err);
128+
}
129+
}))
130+
.await;
131+
}
132+
99133
// create_reservation stores the reservation for the token+port
100134
pub async fn create_reservation(&self, reservation_token: String, port: u16) {
101135
let reservations = Arc::clone(&self.reservations);

turn/src/allocation/allocation_manager/allocation_manager_test.rs

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use super::*;
2-
use crate::error::Result;
3-
use crate::relay::relay_none::*;
42

5-
use crate::proto::lifetime::DEFAULT_LIFETIME;
6-
use std::net::Ipv4Addr;
7-
use std::str::FromStr;
3+
use crate::{error::Result, proto::lifetime::DEFAULT_LIFETIME, relay::relay_none::*};
4+
5+
use std::{net::Ipv4Addr, str::FromStr};
6+
use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute};
87
use tokio::net::UdpSocket;
98
use util::vnet::net::*;
109

@@ -62,6 +61,7 @@ async fn test_packet_handler() -> Result<()> {
6261
Arc::new(turn_socket),
6362
0,
6463
DEFAULT_LIFETIME,
64+
TextAttribute::new(ATTR_USERNAME, "user".into()),
6565
)
6666
.await?;
6767

@@ -74,8 +74,6 @@ async fn test_packet_handler() -> Result<()> {
7474
);
7575

7676
let port = {
77-
let a = a.lock().await;
78-
7977
// add permission with peer1 address
8078
a.add_permission(Permission::new(peer_listener1.local_addr()?))
8179
.await;
@@ -168,11 +166,18 @@ async fn test_create_allocation_duplicate_five_tuple() -> Result<()> {
168166
Arc::clone(&turn_socket),
169167
0,
170168
DEFAULT_LIFETIME,
169+
TextAttribute::new(ATTR_USERNAME, "user".into()),
171170
)
172171
.await?;
173172

174173
let result = m
175-
.create_allocation(five_tuple, Arc::clone(&turn_socket), 0, DEFAULT_LIFETIME)
174+
.create_allocation(
175+
five_tuple,
176+
Arc::clone(&turn_socket),
177+
0,
178+
DEFAULT_LIFETIME,
179+
TextAttribute::new(ATTR_USERNAME, "user".into()),
180+
)
176181
.await;
177182
assert!(result.is_err(), "expected error, but got ok");
178183

@@ -196,6 +201,7 @@ async fn test_delete_allocation() -> Result<()> {
196201
Arc::clone(&turn_socket),
197202
0,
198203
DEFAULT_LIFETIME,
204+
TextAttribute::new(ATTR_USERNAME, "user".into()),
199205
)
200206
.await?;
201207

@@ -231,7 +237,13 @@ async fn test_allocation_timeout() -> Result<()> {
231237
let five_tuple = random_five_tuple();
232238

233239
let a = m
234-
.create_allocation(five_tuple, Arc::clone(&turn_socket), 0, lifetime)
240+
.create_allocation(
241+
five_tuple,
242+
Arc::clone(&turn_socket),
243+
0,
244+
lifetime,
245+
TextAttribute::new(ATTR_USERNAME, "user".into()),
246+
)
235247
.await?;
236248

237249
allocations.push(a);
@@ -250,8 +262,7 @@ async fn test_allocation_timeout() -> Result<()> {
250262

251263
let any_outstanding = false;
252264

253-
for allocation in &allocations {
254-
let mut a = allocation.lock().await;
265+
for a in &allocations {
255266
if a.close().await.is_ok() {
256267
continue 'outer;
257268
}
@@ -280,6 +291,7 @@ async fn test_manager_close() -> Result<()> {
280291
Arc::clone(&turn_socket),
281292
0,
282293
Duration::from_millis(100),
294+
TextAttribute::new(ATTR_USERNAME, "user".into()),
283295
)
284296
.await?;
285297
allocations.push(a1);
@@ -290,6 +302,7 @@ async fn test_manager_close() -> Result<()> {
290302
Arc::clone(&turn_socket),
291303
0,
292304
Duration::from_millis(200),
305+
TextAttribute::new(ATTR_USERNAME, "user".into()),
293306
)
294307
.await?;
295308
allocations.push(a2);
@@ -300,8 +313,7 @@ async fn test_manager_close() -> Result<()> {
300313

301314
m.close().await?;
302315

303-
for allocation in allocations {
304-
let mut a = allocation.lock().await;
316+
for a in allocations {
305317
assert!(
306318
a.close().await.is_err(),
307319
"Allocation should be closed if lifetime timeout"
@@ -310,3 +322,56 @@ async fn test_manager_close() -> Result<()> {
310322

311323
Ok(())
312324
}
325+
326+
#[tokio::test]
327+
async fn test_delete_allocation_by_username() -> Result<()> {
328+
let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
329+
330+
let m = new_test_manager();
331+
332+
let five_tuple1 = random_five_tuple();
333+
let five_tuple2 = random_five_tuple();
334+
let five_tuple3 = random_five_tuple();
335+
336+
let _ = m
337+
.create_allocation(
338+
five_tuple1.clone(),
339+
Arc::clone(&turn_socket),
340+
0,
341+
DEFAULT_LIFETIME,
342+
TextAttribute::new(ATTR_USERNAME, "user".into()),
343+
)
344+
.await?;
345+
let _ = m
346+
.create_allocation(
347+
five_tuple2.clone(),
348+
Arc::clone(&turn_socket),
349+
0,
350+
DEFAULT_LIFETIME,
351+
TextAttribute::new(ATTR_USERNAME, "user".into()),
352+
)
353+
.await?;
354+
let _ = m
355+
.create_allocation(
356+
five_tuple3.clone(),
357+
Arc::clone(&turn_socket),
358+
0,
359+
DEFAULT_LIFETIME,
360+
TextAttribute::new(ATTR_USERNAME, String::from("user2")),
361+
)
362+
.await?;
363+
364+
assert_eq!(m.allocations.lock().await.len(), 3);
365+
366+
m.delete_allocations_by_username("user").await;
367+
368+
assert_eq!(m.allocations.lock().await.len(), 1);
369+
370+
assert!(
371+
m.get_allocation(&five_tuple1).await.is_none()
372+
&& m.get_allocation(&five_tuple2).await.is_none()
373+
&& m.get_allocation(&five_tuple3).await.is_some()
374+
);
375+
376+
Ok(())
377+
}

0 commit comments

Comments
 (0)