Skip to content

Commit 26682c3

Browse files
authored
Get allocations info (#288)
1 parent c9fcddd commit 26682c3

File tree

9 files changed

+352
-26
lines changed

9 files changed

+352
-26
lines changed

.github/workflows/cargo.yml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818
name: Test
1919
strategy:
2020
matrix:
21-
os: ['ubuntu-latest', 'macos-latest']
21+
os: ["ubuntu-latest", "macos-latest"]
2222
toolchain:
2323
- 1.57.0 # min supported version (https://github.com/webrtc-rs/webrtc/#toolchain)
2424
- stable
@@ -28,10 +28,10 @@ jobs:
2828
- name: Install Rust ${{ matrix.toolchain }}
2929
uses: actions-rs/toolchain@v1
3030
with:
31-
toolchain: ${{ matrix.toolchain }}
32-
override: true
31+
toolchain: ${{ matrix.toolchain }}
32+
override: true
3333
- name: Install Rust
34-
run: rustup update stable
34+
run: rustup update stable
3535
- name: 📦 Cache cargo registry
3636
uses: actions/cache@v3
3737
with:
@@ -47,7 +47,7 @@ jobs:
4747
- name: 🏭 Cache dependencies
4848
uses: Swatinem/rust-cache@v2
4949
- name: Test
50-
run: cargo test
50+
run: cargo test --features metrics
5151

5252
test_windows:
5353
name: Test (windows)
@@ -62,10 +62,10 @@ jobs:
6262
- name: Install Rust ${{ matrix.toolchain }}
6363
uses: actions-rs/toolchain@v1
6464
with:
65-
toolchain: ${{ matrix.toolchain }}
66-
override: true
65+
toolchain: ${{ matrix.toolchain }}
66+
override: true
6767
- name: Install Rust
68-
run: rustup update stable
68+
run: rustup update stable
6969
- name: 📦 Cache cargo registry
7070
uses: actions/cache@v3
7171
with:
@@ -84,7 +84,7 @@ jobs:
8484
# uses: Swatinem/rust-cache@v2
8585
- name: Test
8686
working-directory: "C:\\a\\webrtc\\webrtc"
87-
run: cargo test
87+
run: cargo test --features metrics
8888

8989
rustfmt_and_clippy:
9090
name: Check formatting style and run clippy

turn/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Unreleased
44

55
* Added `delete_allocations_by_username` method on `Server`. This method provides possibility to manually delete allocation [#263](https://github.com/webrtc-rs/webrtc/pull/263) by [@logist322](https://github.com/logist322).
6+
* Added `get_allocations_info` method on `Server`. This method provides possibility to get information about allocations [#288](https://github.com/webrtc-rs/webrtc/pull/288) by [@logist322](https://github.com/logist322).
67

78

89
## v0.6.0

turn/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ hex = "0.4.3"
3434
clap = "3.2.6"
3535
criterion = "0.3.5"
3636

37+
[features]
38+
metrics = []
39+
3740
[[bench]]
3841
name = "bench"
3942
harness = false

turn/src/allocation/allocation_manager.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,33 @@ impl Manager {
4141
Ok(())
4242
}
4343

44+
// Returns the information about the all [`Allocation`]s associated with
45+
// the specified [`FiveTuple`]s.
46+
pub async fn get_allocations_info(
47+
&self,
48+
five_tuples: Option<Vec<FiveTuple>>,
49+
) -> HashMap<FiveTuple, AllocationInfo> {
50+
let mut infos = HashMap::new();
51+
52+
let guarded = self.allocations.lock().await;
53+
54+
guarded.iter().for_each(|(five_tuple, alloc)| {
55+
if five_tuples.is_none() || five_tuples.as_ref().unwrap().contains(five_tuple) {
56+
infos.insert(
57+
*five_tuple,
58+
AllocationInfo::new(
59+
*five_tuple,
60+
alloc.username.text.clone(),
61+
#[cfg(feature = "metrics")]
62+
alloc.relayed_bytes.load(Ordering::Acquire),
63+
),
64+
);
65+
}
66+
});
67+
68+
infos
69+
}
70+
4471
// get_allocation fetches the allocation matching the passed FiveTuple
4572
pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>> {
4673
let allocations = self.allocations.lock().await;
@@ -68,13 +95,7 @@ impl Manager {
6895
.relay_addr_generator
6996
.allocate_conn(true, requested_port)
7097
.await?;
71-
let mut a = Allocation::new(
72-
turn_socket,
73-
relay_socket,
74-
relay_addr,
75-
five_tuple.clone(),
76-
username,
77-
);
98+
let mut a = Allocation::new(turn_socket, relay_socket, relay_addr, five_tuple, username);
7899
a.allocations = Some(Arc::clone(&self.allocations));
79100

80101
log::debug!("listening on relay addr: {:?}", a.relay_addr);

turn/src/allocation/allocation_manager/allocation_manager_test.rs

Lines changed: 188 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
11
use super::*;
22

3-
use crate::{error::Result, proto::lifetime::DEFAULT_LIFETIME, relay::relay_none::*};
4-
5-
use std::{net::Ipv4Addr, str::FromStr};
3+
use crate::{
4+
auth::{generate_auth_key, AuthHandler},
5+
client::{Client, ClientConfig},
6+
error::Result,
7+
proto::lifetime::DEFAULT_LIFETIME,
8+
relay::{relay_none::*, relay_static::RelayAddressGeneratorStatic},
9+
server::{
10+
config::{ConnConfig, ServerConfig},
11+
Server,
12+
},
13+
};
14+
15+
use std::{
16+
net::{IpAddr, Ipv4Addr},
17+
str::FromStr,
18+
};
619
use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute};
720
use tokio::net::UdpSocket;
821
use util::vnet::net::*;
@@ -357,7 +370,7 @@ async fn test_delete_allocation_by_username() -> Result<()> {
357370
Arc::clone(&turn_socket),
358371
0,
359372
DEFAULT_LIFETIME,
360-
TextAttribute::new(ATTR_USERNAME, String::from("user2")),
373+
TextAttribute::new(ATTR_USERNAME, "user2".into()),
361374
)
362375
.await?;
363376

@@ -375,3 +388,174 @@ async fn test_delete_allocation_by_username() -> Result<()> {
375388

376389
Ok(())
377390
}
391+
392+
struct TestAuthHandler;
393+
impl AuthHandler for TestAuthHandler {
394+
fn auth_handle(&self, username: &str, realm: &str, _src_addr: SocketAddr) -> Result<Vec<u8>> {
395+
Ok(generate_auth_key(username, realm, "pass"))
396+
}
397+
}
398+
399+
async fn create_server() -> Result<(Server, u16)> {
400+
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
401+
let server_port = conn.local_addr()?.port();
402+
403+
let server = Server::new(ServerConfig {
404+
conn_configs: vec![ConnConfig {
405+
conn,
406+
relay_addr_generator: Box::new(RelayAddressGeneratorStatic {
407+
relay_address: IpAddr::from_str("127.0.0.1")?,
408+
address: "0.0.0.0".to_owned(),
409+
net: Arc::new(Net::new(None)),
410+
}),
411+
}],
412+
realm: "webrtc.rs".to_owned(),
413+
auth_handler: Arc::new(TestAuthHandler {}),
414+
channel_bind_timeout: Duration::from_secs(0),
415+
})
416+
.await?;
417+
418+
Ok((server, server_port))
419+
}
420+
421+
async fn create_client(username: String, server_port: u16) -> Result<Client> {
422+
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
423+
424+
Ok(Client::new(ClientConfig {
425+
stun_serv_addr: format!("127.0.0.1:{}", server_port),
426+
turn_serv_addr: format!("127.0.0.1:{}", server_port),
427+
username,
428+
password: "pass".to_owned(),
429+
realm: String::new(),
430+
software: String::new(),
431+
rto_in_ms: 0,
432+
conn,
433+
vnet: None,
434+
})
435+
.await?)
436+
}
437+
438+
#[cfg(feature = "metrics")]
439+
#[tokio::test]
440+
async fn test_get_allocations_info() -> Result<()> {
441+
let (server, server_port) = create_server().await?;
442+
443+
let client1 = create_client("user1".to_owned(), server_port).await?;
444+
client1.listen().await?;
445+
446+
let client2 = create_client("user2".to_owned(), server_port).await?;
447+
client2.listen().await?;
448+
449+
let client3 = create_client("user3".to_owned(), server_port).await?;
450+
client3.listen().await?;
451+
452+
assert!(server.get_allocations_info(None).await?.is_empty());
453+
454+
let user1 = client1.allocate().await?;
455+
let user2 = client2.allocate().await?;
456+
let user3 = client3.allocate().await?;
457+
458+
assert_eq!(server.get_allocations_info(None).await?.len(), 3);
459+
460+
let addr1 = client1
461+
.send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str())
462+
.await?;
463+
let addr2 = client2
464+
.send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str())
465+
.await?;
466+
let addr3 = client3
467+
.send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str())
468+
.await?;
469+
470+
user1.send_to(b"1", addr1).await?;
471+
user2.send_to(b"12", addr2).await?;
472+
user3.send_to(b"123", addr3).await?;
473+
474+
tokio::time::sleep(Duration::from_millis(100)).await;
475+
476+
server
477+
.get_allocations_info(None)
478+
.await?
479+
.iter()
480+
.for_each(|(_, ai)| match ai.username.as_str() {
481+
"user1" => assert_eq!(ai.relayed_bytes, 1),
482+
"user2" => assert_eq!(ai.relayed_bytes, 2),
483+
"user3" => assert_eq!(ai.relayed_bytes, 3),
484+
_ => unreachable!(),
485+
});
486+
487+
Ok(())
488+
}
489+
490+
#[cfg(feature = "metrics")]
491+
#[tokio::test]
492+
async fn test_get_allocations_info_bytes_count() -> Result<()> {
493+
let (server, server_port) = create_server().await?;
494+
495+
let client = create_client("foo".to_owned(), server_port).await?;
496+
497+
client.listen().await?;
498+
499+
assert!(server.get_allocations_info(None).await?.is_empty());
500+
501+
let conn = client.allocate().await?;
502+
let addr = client
503+
.send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str())
504+
.await?;
505+
506+
assert!(!server.get_allocations_info(None).await?.is_empty());
507+
508+
assert_eq!(
509+
server
510+
.get_allocations_info(None)
511+
.await?
512+
.values()
513+
.last()
514+
.unwrap()
515+
.relayed_bytes,
516+
0
517+
);
518+
519+
for _ in 0..10 {
520+
conn.send_to(b"Hello", addr).await?;
521+
522+
tokio::time::sleep(Duration::from_millis(100)).await;
523+
}
524+
525+
tokio::time::sleep(Duration::from_millis(1000)).await;
526+
527+
assert_eq!(
528+
server
529+
.get_allocations_info(None)
530+
.await?
531+
.values()
532+
.last()
533+
.unwrap()
534+
.relayed_bytes,
535+
50
536+
);
537+
538+
for _ in 0..10 {
539+
conn.send_to(b"Hello", addr).await?;
540+
541+
tokio::time::sleep(Duration::from_millis(100)).await;
542+
}
543+
544+
tokio::time::sleep(Duration::from_millis(1000)).await;
545+
546+
assert_eq!(
547+
server
548+
.get_allocations_info(None)
549+
.await?
550+
.values()
551+
.last()
552+
.unwrap()
553+
.relayed_bytes,
554+
100
555+
);
556+
557+
client.close().await?;
558+
server.close().await?;
559+
560+
Ok(())
561+
}

turn/src/allocation/five_tuple.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::net::{Ipv4Addr, SocketAddr};
1212
// server. The 5-tuple uniquely identifies this communication
1313
// stream. The 5-tuple also uniquely identifies the Allocation on
1414
// the server.
15-
#[derive(PartialEq, Eq, Clone, Hash)]
15+
#[derive(PartialEq, Eq, Clone, Copy, Hash)]
1616
pub struct FiveTuple {
1717
pub protocol: Protocol,
1818
pub src_addr: SocketAddr,
@@ -34,3 +34,13 @@ impl fmt::Display for FiveTuple {
3434
write!(f, "{}_{}_{}", self.protocol, self.src_addr, self.dst_addr)
3535
}
3636
}
37+
38+
impl fmt::Debug for FiveTuple {
39+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40+
f.debug_struct("FiveTuple")
41+
.field("protocol", &self.protocol)
42+
.field("src_addr", &self.src_addr)
43+
.field("dst_addr", &self.dst_addr)
44+
.finish()
45+
}
46+
}

0 commit comments

Comments
 (0)