Skip to content

Commit d548a79

Browse files
feat: Add source.rs module for reading data blocks related functions from source
1 parent 06e0fd6 commit d548a79

File tree

1 file changed

+312
-0
lines changed

1 file changed

+312
-0
lines changed

src/rust/lib_ccxr/src/net/source.rs

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
use super::{Block, BlockStream, Command, NetError, DEFAULT_TCP_PORT, PING_INTERVAL};
2+
use crate::time::units::Timestamp;
3+
use crate::util::log::{fatal, info, ExitCause};
4+
5+
use socket2::{Domain, Socket, Type};
6+
7+
use std::io;
8+
use std::io::{Read, Write};
9+
use std::net::{
10+
IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, TcpListener, TcpStream, ToSocketAddrs,
11+
UdpSocket,
12+
};
13+
14+
/// An enum of configuration parameters to construct [`RecvSource`].
15+
#[derive(Copy, Clone, Debug)]
16+
pub enum RecvSourceConfig<'a> {
17+
Tcp {
18+
/// The port number where TCP socket will be bound.
19+
///
20+
/// If no port number is provided then [`DEFAULT_TCP_PORT`] will be used instead.
21+
port: Option<u16>,
22+
23+
/// The password of receiving server.
24+
///
25+
/// The password sent from client will be compared to this and only allow furthur
26+
/// communication if the passwords match.
27+
password: Option<&'a str>,
28+
},
29+
Udp {
30+
/// Source's IP address or hostname if trying to open a multicast SSM channel.
31+
source: Option<&'a str>,
32+
33+
/// The IP address or hostname where UDP socket will be bound.
34+
address: Option<&'a str>,
35+
36+
/// The port number where UDP socket will be bound.
37+
port: u16,
38+
},
39+
}
40+
41+
enum SourceSocket {
42+
Tcp(TcpStream),
43+
Udp {
44+
socket: UdpSocket,
45+
source: Option<Ipv4Addr>,
46+
address: Ipv4Addr,
47+
},
48+
}
49+
50+
/// A struct used for receiving subtitle data from the network.
51+
///
52+
/// Even though it exposes methods from [`BlockStream`], it is recommended to not use them.
53+
/// Instead use the methods provided directly by [`RecvSource`] like
54+
/// [`RecvSource::recv_header_or_cc`].
55+
///
56+
/// Note: To create a [`RecvSource`], one must first construct a [`RecvSourceConfig`].
57+
///
58+
/// ```no_run
59+
/// # use lib_ccxr::net::{RecvSource, RecvSourceConfig};
60+
/// let config = RecvSourceConfig::Tcp {
61+
/// port: None,
62+
/// password: Some("12345678"),
63+
/// };
64+
/// let mut recv_source = RecvSource::new(config);
65+
///
66+
/// // Once recv_source is constructed, we can use it to receive data.
67+
/// let block = recv_source.recv_header_or_cc().unwrap();
68+
/// ```
69+
pub struct RecvSource {
70+
socket: SourceSocket,
71+
last_ping: Timestamp,
72+
}
73+
74+
impl BlockStream for RecvSource {
75+
fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
76+
match &mut self.socket {
77+
SourceSocket::Tcp(stream) => stream.write(buf),
78+
SourceSocket::Udp { socket, .. } => socket.send(buf),
79+
}
80+
}
81+
82+
fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
83+
match &mut self.socket {
84+
SourceSocket::Tcp(stream) => stream.read(buf),
85+
SourceSocket::Udp {
86+
socket,
87+
source,
88+
address,
89+
} => {
90+
if cfg!(target_os = "windows") {
91+
socket.recv(buf)
92+
} else {
93+
let should_check_source = address.is_multicast() && source.is_some();
94+
if should_check_source {
95+
loop {
96+
if let Ok((size, src_addr)) = socket.recv_from(buf) {
97+
if src_addr.ip() == source.unwrap() {
98+
return Ok(size);
99+
}
100+
}
101+
}
102+
} else {
103+
socket.recv(buf)
104+
}
105+
}
106+
}
107+
}
108+
}
109+
}
110+
111+
impl RecvSource {
112+
/// Create a new [`RecvSource`] from the configuration parameters of [`RecvSourceConfig`].
113+
///
114+
/// Note: This method attempts to create a server. It does not return a [`Result`]. When
115+
/// it is unable to start a server, it crashes instantly by calling [`fatal!`].
116+
///
117+
/// This method will continously block until a connection with a client is made.
118+
pub fn new(config: RecvSourceConfig) -> RecvSource {
119+
match config {
120+
RecvSourceConfig::Tcp { port, password } => {
121+
let port = port.unwrap_or(DEFAULT_TCP_PORT);
122+
123+
info!(
124+
"\n\r----------------------------------------------------------------------\n"
125+
);
126+
info!("Binding to {}\n", port);
127+
128+
let addresses = [
129+
SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port),
130+
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port),
131+
];
132+
133+
let listener = TcpListener::bind(addresses.as_slice()).unwrap_or_else(
134+
|_| fatal!(cause = ExitCause::Failure; "Unable to start server\n"),
135+
);
136+
137+
if let Some(pwd) = &password {
138+
info!("Password: {}\n", pwd);
139+
}
140+
141+
info!("Waiting for connections\n");
142+
143+
loop {
144+
if let Ok((socket, _)) = listener.accept() {
145+
let mut source = RecvSource {
146+
socket: SourceSocket::Tcp(socket),
147+
last_ping: Timestamp::from_millis(0),
148+
};
149+
if check_password(&mut source, password) {
150+
return source;
151+
}
152+
153+
info!("Connection closed\n");
154+
drop(source);
155+
}
156+
}
157+
}
158+
RecvSourceConfig::Udp {
159+
source,
160+
address,
161+
port,
162+
} => {
163+
let address = address
164+
.map(|x| {
165+
for s in (x, 0).to_socket_addrs().unwrap() {
166+
if let SocketAddr::V4(sv4) = s {
167+
return *sv4.ip();
168+
}
169+
}
170+
fatal!(cause = ExitCause::Failure; "Could not resolve udp address")
171+
})
172+
.unwrap_or(Ipv4Addr::UNSPECIFIED);
173+
174+
let source = source.map(|x| {
175+
for s in (x, 0).to_socket_addrs().unwrap() {
176+
if let SocketAddr::V4(sv4) = s {
177+
return *sv4.ip();
178+
}
179+
}
180+
fatal!(cause = ExitCause::Failure; "Could not resolve udp source")
181+
});
182+
183+
let socket = Socket::new(Domain::IPV4, Type::DGRAM, None).unwrap_or_else(
184+
|_| fatal!(cause = ExitCause::Failure; "Socket creation error"),
185+
);
186+
187+
if address.is_multicast() {
188+
socket.set_reuse_address(true).unwrap_or_else(|_| {
189+
info!("Cannot not set reuse address\n");
190+
});
191+
}
192+
193+
let binding_address = if cfg!(target_os = "windows") && address.is_multicast() {
194+
Ipv4Addr::UNSPECIFIED
195+
} else {
196+
address
197+
};
198+
199+
socket
200+
.bind(&SocketAddrV4::new(binding_address, port).into())
201+
.unwrap_or_else(|_| fatal!(cause = ExitCause::Bug; "Socket bind error"));
202+
203+
if address.is_multicast() {
204+
if let Some(src) = source {
205+
socket.join_ssm_v4(&src, &address, &Ipv4Addr::UNSPECIFIED)
206+
} else {
207+
socket.join_multicast_v4(&address, &Ipv4Addr::UNSPECIFIED)
208+
}
209+
.unwrap_or_else(
210+
|_| fatal!(cause = ExitCause::Bug; "Cannot join multicast group"),
211+
);
212+
}
213+
214+
info!(
215+
"\n\r----------------------------------------------------------------------\n"
216+
);
217+
if address == Ipv4Addr::UNSPECIFIED {
218+
info!("\rReading from UDP socket {}\n", port);
219+
} else if let Some(src) = source {
220+
info!("\rReading from UDP socket {}@{}:{}\n", src, address, port);
221+
} else {
222+
info!("\rReading from UDP socket {}:{}\n", address, port);
223+
}
224+
225+
RecvSource {
226+
socket: SourceSocket::Udp {
227+
socket: socket.into(),
228+
address,
229+
source,
230+
},
231+
last_ping: Timestamp::from_millis(0),
232+
}
233+
}
234+
}
235+
}
236+
237+
/// Receive a [`BinHeader`] or [`BinData`] [`Block`].
238+
///
239+
/// Note that this method will continously block until it receives a
240+
/// [`BinHeader`] or [`BinData`] [`Block`].
241+
///
242+
/// It returns a [`NetError`] if some transmission failure ocurred or byte format is violated.
243+
/// It will return a [`None`] if the connection has shutdown down correctly.
244+
///
245+
/// [`BinHeader`]: Command::BinHeader
246+
/// [`BinData`]: Command::BinData
247+
pub fn recv_header_or_cc<'a>(&mut self) -> Result<Option<Block<'a>>, NetError> {
248+
let now = Timestamp::now();
249+
if self.last_ping.millis() == 0 {
250+
self.last_ping = now;
251+
}
252+
253+
if now - self.last_ping > PING_INTERVAL {
254+
self.last_ping = now;
255+
if self.send_ping().is_err() {
256+
fatal!(cause = ExitCause::Failure; "Unable to send keep-alive packet to client\n");
257+
}
258+
}
259+
260+
loop {
261+
if let Some(block) = self.recv_block()? {
262+
if block.command() == Command::BinHeader || block.command() == Command::BinData {
263+
return Ok(Some(block));
264+
}
265+
} else {
266+
return Ok(None);
267+
}
268+
}
269+
}
270+
271+
/// Send a [`Ping`](Command::Ping) [`Block`].
272+
///
273+
/// It returns a [`NetError`] if some transmission failure ocurred, or else it will return a bool
274+
/// that indicates the status of this connection. It will be `false` if the connection shutdown
275+
/// correctly.
276+
fn send_ping(&mut self) -> Result<bool, NetError> {
277+
self.send_block(&Block::ping())
278+
}
279+
}
280+
281+
/// Check if the received password matches with the current password.
282+
///
283+
/// This methods attempts to read a [`Password`](Command::Password) [`Block`] from `socket`. Any
284+
/// form of error in this operation results in `false`.
285+
///
286+
/// If `password` is [`None`], then no checking is done and results in `true`.
287+
fn check_password(socket: &mut RecvSource, password: Option<&str>) -> bool {
288+
let block = match socket.recv_block() {
289+
Ok(Some(b)) => b,
290+
_ => return false,
291+
};
292+
293+
let pwd = match password {
294+
Some(p) => p,
295+
None => return true,
296+
};
297+
298+
if block.command() == Command::Password && String::from_utf8_lossy(block.data()) == *pwd {
299+
true
300+
} else {
301+
#[cfg(feature = "debug_out")]
302+
{
303+
eprintln!("[C] Wrong password");
304+
eprintln!("[S] PASSWORD");
305+
}
306+
// TODO: Check if the below portion is really required, since the receiver is not even
307+
// listening for a Password block
308+
let _ = socket.send_block(&Block::password(""));
309+
310+
false
311+
}
312+
}

0 commit comments

Comments
 (0)