Skip to content

Commit 871dcfb

Browse files
authored
Merge pull request #306 from nubificus/feat_add_unix_tcp_support
feat: Add TCP socket support
2 parents 2244900 + f240a50 commit 871dcfb

File tree

13 files changed

+242
-29
lines changed

13 files changed

+242
-29
lines changed

example/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ ctrlc = { version = "3.0", features = ["termination"] }
2323
tokio = { version = "1.0.1", features = ["signal", "time"] }
2424
async-trait = "0.1.42"
2525
rand = "0.8.5"
26-
26+
clap = { version = "4.5.40", features = ["derive"] }
2727

2828
[[example]]
2929
name = "client"

example/async-client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use ttrpc::r#async::Client;
1111

1212
#[tokio::main(flavor = "current_thread")]
1313
async fn main() {
14-
let c = Client::connect(utils::SOCK_ADDR).await.unwrap();
14+
let sock_addr = utils::get_sock_addr();
15+
let c = Client::connect(sock_addr).await.unwrap();
16+
1517
let hc = health_ttrpc::HealthClient::new(c.clone());
1618
let ac = agent_ttrpc::AgentServiceClient::new(c);
1719

example/async-server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,11 @@ async fn main() {
5858
let hservice = health_ttrpc::create_health(Arc::new(HealthService {}));
5959
let aservice = agent_ttrpc::create_agent_service(Arc::new(AgentService {}));
6060

61-
utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();
61+
let sock_addr = utils::get_sock_addr();
62+
utils::remove_if_sock_exist(sock_addr).unwrap();
6263

6364
let mut server = Server::new()
64-
.bind(utils::SOCK_ADDR)
65+
.bind(sock_addr)
6566
.unwrap()
6667
.register_service(hservice)
6768
.register_service(aservice);

example/async-stream-client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ use ttrpc::r#async::Client;
1313
async fn main() {
1414
simple_logging::log_to_stderr(log::LevelFilter::Info);
1515

16-
let c = Client::connect(utils::SOCK_ADDR).await.unwrap();
16+
let sock_addr = utils::get_sock_addr();
17+
let c = Client::connect(sock_addr).await.unwrap();
18+
1719
let sc = streaming_ttrpc::StreamingClient::new(c);
1820

1921
let _now = std::time::Instant::now();

example/async-stream-server.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,12 @@ impl streaming_ttrpc::Streaming for StreamingService {
171171
async fn main() {
172172
simple_logging::log_to_stderr(LevelFilter::Info);
173173
let service = streaming_ttrpc::create_streaming(Arc::new(StreamingService {}));
174-
utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();
174+
175+
let sock_addr = utils::get_sock_addr();
176+
utils::remove_if_sock_exist(sock_addr).unwrap();
175177

176178
let mut server = Server::new()
177-
.bind(utils::SOCK_ADDR)
179+
.bind(sock_addr)
178180
.unwrap()
179181
.register_service(service);
180182

example/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ fn main() {
5050
fn connect_once() {
5151
simple_logging::log_to_stderr(LevelFilter::Trace);
5252

53-
let c = Client::connect(utils::SOCK_ADDR).unwrap();
53+
let sock_addr = utils::get_sock_addr();
54+
let c = Client::connect(sock_addr).unwrap();
55+
5456
let hc = health_ttrpc::HealthClient::new(c.clone());
5557
let ac = agent_ttrpc::AgentServiceClient::new(c);
5658

example/server.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@ fn main() {
5858
let hservice = health_ttrpc::create_health(Arc::new(HealthService {}));
5959
let aservice = agent_ttrpc::create_agent_service(Arc::new(AgentService {}));
6060

61-
utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();
61+
let sock_addr = utils::get_sock_addr();
62+
utils::remove_if_sock_exist(sock_addr).unwrap();
63+
6264
let mut server = ttrpc::Server::new()
63-
.bind(utils::SOCK_ADDR)
65+
.bind(sock_addr)
6466
.unwrap()
6567
.register_service(hservice)
6668
.register_service(aservice);

example/utils.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,42 @@
11
#![allow(dead_code)]
2+
use clap::Parser;
3+
use log::warn;
24
use std::io::Result;
35

46
#[cfg(unix)]
5-
pub const SOCK_ADDR: &str = r"unix:///tmp/ttrpc-test";
7+
pub const SOCK_ADDR_LOCAL: &str = r"unix:///tmp/ttrpc-test";
68

79
#[cfg(windows)]
8-
pub const SOCK_ADDR: &str = r"\\.\pipe\ttrpc-test";
10+
pub const SOCK_ADDR_LOCAL: &str = r"\\.\pipe\ttrpc-test";
11+
12+
pub const SOCK_ADDR_TCP: &str = r"tcp://127.0.0.1:65500";
13+
14+
#[derive(Debug, Default, Parser)]
15+
pub struct Cli {
16+
#[arg(long = "tcp")]
17+
#[arg(help = "Use a TCP socket instead of a local one")]
18+
pub tcp: bool,
19+
}
20+
21+
pub fn get_sock_addr() -> &'static str {
22+
let cli = Cli::parse();
23+
if cli.tcp {
24+
if cfg!(windows) {
25+
warn!("'--tcp' flag ignored; TCP sockets not supported on Windows");
26+
return SOCK_ADDR_LOCAL;
27+
}
28+
SOCK_ADDR_TCP
29+
} else {
30+
SOCK_ADDR_LOCAL
31+
}
32+
}
933

1034
#[cfg(unix)]
1135
pub fn remove_if_sock_exist(sock_addr: &str) -> Result<()> {
36+
if sock_addr.starts_with("tcp://") {
37+
return Ok(());
38+
}
39+
1240
let path = sock_addr
1341
.strip_prefix("unix://")
1442
.expect("socket address is not expected");

src/asynchronous/server.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ impl Server {
101101
Ok(self.add_listener(listener))
102102
}
103103

104+
#[cfg(unix)]
105+
/// # Safety
106+
/// The file descriptor must represent a unix listener.
107+
pub unsafe fn add_tcp_listener(self, fd: RawFd) -> Result<Server> {
108+
let listener = Listener::from_raw_tcp_listener_fd(fd)
109+
.map_err(err_to_others_err!(e, "from_raw_tcp_listener_fd error"))?;
110+
Ok(self.add_listener(listener))
111+
}
112+
104113
#[cfg(any(target_os = "linux", target_os = "android"))]
105114
/// # Safety
106115
/// The file descriptor must represent a vsock listener.

src/asynchronous/transport/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ macro_rules! io_other {
2222
#[cfg(unix)]
2323
mod unix;
2424

25+
#[cfg(unix)]
26+
mod tcp;
27+
2528
#[cfg(any(target_os = "linux", target_os = "android"))]
2629
mod vsock;
2730

@@ -43,6 +46,11 @@ impl Listener {
4346
return Self::bind_unix(addr);
4447
}
4548

49+
#[cfg(unix)]
50+
if let Some(addr) = addr.strip_prefix("tcp://") {
51+
return Self::bind_tcp(addr);
52+
}
53+
4654
#[cfg(any(target_os = "linux", target_os = "android"))]
4755
if let Some(addr) = addr.strip_prefix("vsock://") {
4856
return Self::bind_vsock(addr);
@@ -70,6 +78,11 @@ impl Socket {
7078
return Self::connect_unix(addr).await;
7179
}
7280

81+
#[cfg(unix)]
82+
if let Some(addr) = addr.strip_prefix("tcp://") {
83+
return Self::connect_tcp(addr).await;
84+
}
85+
7386
#[cfg(any(target_os = "linux", target_os = "android"))]
7487
if let Some(addr) = addr.strip_prefix("vsock://") {
7588
return Self::connect_vsock(addr).await;

0 commit comments

Comments
 (0)