Skip to content

Commit fae793d

Browse files
committed
PoC proxy protocol stream wrapper
1 parent 00654aa commit fae793d

File tree

10 files changed

+760
-0
lines changed

10 files changed

+760
-0
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
members = [
33
"actix-codec",
44
"actix-macros",
5+
"actix-proxy-protocol",
56
"actix-rt",
67
"actix-server",
78
"actix-service",
@@ -16,6 +17,7 @@ members = [
1617
[patch.crates-io]
1718
actix-codec = { path = "actix-codec" }
1819
actix-macros = { path = "actix-macros" }
20+
actix-proxy-protocol = { path = "actix-proxy-protocol" }
1921
actix-rt = { path = "actix-rt" }
2022
actix-server = { path = "actix-server" }
2123
actix-service = { path = "actix-service" }

actix-proxy-protocol/CHANGES.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Changes
2+
3+
## Unreleased - 2022-xx-xx
4+
5+
## 0.0.1 - 2022-xx-xx
6+
7+
- delete me

actix-proxy-protocol/Cargo.toml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
[package]
2+
name = "actix-proxy-protocol"
3+
version = "0.0.1"
4+
authors = [
5+
"Rob Ede <robjtede@icloud.com>",
6+
]
7+
description = "PROXY protocol utilities"
8+
keywords = ["proxy", "protocol", "network", "haproxy", "tcp"]
9+
categories = ["network-programming", "asynchronous"]
10+
homepage = "https://actix.rs"
11+
repository = "https://github.com/actix/actix-net.git"
12+
license = "MIT OR Apache-2.0"
13+
edition = "2018"
14+
15+
[dependencies]
16+
actix-service = "2"
17+
actix-utils = "3"
18+
19+
arrayvec = "0.7"
20+
crc32fast = "1"
21+
futures-core = { version = "0.3.17", default-features = false, features = ["std"] }
22+
futures-util = { version = "0.3.17", default-features = false, features = ["std"] }
23+
itoa = "1"
24+
tokio = { version = "1.13.1", features = ["sync", "io-util"] }
25+
tracing = { version = "0.1.30", default-features = false, features = ["log"] }
26+
27+
[dev-dependencies]
28+
actix-codec = "0.5"
29+
actix-rt = "2.6"
30+
actix-server = "2"
31+
32+
bytes = "1"
33+
const-str = "0.5"
34+
env_logger = "0.9"
35+
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
36+
once_cell = "1"
37+
pretty_assertions = "1"
38+
tokio = { version = "1.13.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] }

actix-proxy-protocol/LICENSE-APACHE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../LICENSE-APACHE

actix-proxy-protocol/LICENSE-MIT

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../LICENSE-MIT

actix-proxy-protocol/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# actix-proxy-protocol
2+
3+
> Implementation of the [PROXY protocol].
4+
5+
[![crates.io](https://img.shields.io/crates/v/actix-proxy-protocol?label=latest)](https://crates.io/crates/actix-proxy-protocol)
6+
[![Documentation](https://docs.rs/actix-proxy-protocol/badge.svg?version=0.1.0)](https://docs.rs/actix-proxy-protocol/0.1.0)
7+
[![Version](https://img.shields.io/badge/rustc-1.52+-ab6000.svg)](https://blog.rust-lang.org/2021/05/06/Rust-1.52.0.html)
8+
![License](https://img.shields.io/crates/l/actix-proxy-protocol.svg)
9+
[![Dependency Status](https://deps.rs/crate/actix-proxy-protocol/0.1.0/status.svg)](https://deps.rs/crate/actix-proxy-protocol/0.1.0)
10+
![Downloads](https://img.shields.io/crates/d/actix-proxy-protocol.svg)
11+
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)
12+
13+
## Resources
14+
15+
- [Examples](./examples)
16+
17+
[proxy protocol]: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
//! Adds PROXY protocol v1 prelude to connections.
2+
3+
#![allow(unused)]
4+
5+
use std::{
6+
io, mem,
7+
net::{IpAddr, Ipv4Addr, SocketAddr},
8+
sync::{
9+
atomic::{AtomicUsize, Ordering},
10+
Arc,
11+
},
12+
};
13+
14+
use actix_proxy_protocol::{v1, v2, AddressFamily, Command, Tlv, TransportProtocol};
15+
use actix_rt::net::TcpStream;
16+
use actix_server::Server;
17+
use actix_service::{fn_service, ServiceFactoryExt as _};
18+
use bytes::BytesMut;
19+
use const_str::concat_bytes;
20+
use once_cell::sync::Lazy;
21+
use tokio::io::{copy_bidirectional, AsyncReadExt as _, AsyncWriteExt as _};
22+
23+
static UPSTREAM: Lazy<SocketAddr> = Lazy::new(|| SocketAddr::from(([127, 0, 0, 1], 8080)));
24+
25+
/*
26+
NOTES:
27+
108 byte buffer on receiver side is enough for any PROXY header
28+
after PROXY, receive until CRLF, *then* decode parts
29+
TLV = type-length-value
30+
31+
TO DO:
32+
handle UNKNOWN transport
33+
v2 UNSPEC mode
34+
*/
35+
36+
fn extend_with_ip_bytes(buf: &mut Vec<u8>, ip: IpAddr) {
37+
match ip {
38+
IpAddr::V4(ip) => buf.extend_from_slice(&ip.octets()),
39+
IpAddr::V6(ip) => buf.extend_from_slice(&ip.octets()),
40+
}
41+
}
42+
43+
async fn wrap_with_proxy_protocol_v1(mut stream: TcpStream) -> io::Result<()> {
44+
let mut upstream = TcpStream::connect(("127.0.0.1", 8080)).await?;
45+
46+
tracing::info!(
47+
"PROXYv1 {} -> {}",
48+
stream.peer_addr().unwrap(),
49+
UPSTREAM.to_string()
50+
);
51+
52+
let proxy_header = v1::Header::new(
53+
AddressFamily::Inet,
54+
SocketAddr::from(([127, 0, 0, 1], 8081)),
55+
*UPSTREAM,
56+
);
57+
58+
proxy_header.write_to_tokio(&mut upstream).await?;
59+
60+
let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?;
61+
62+
Ok(())
63+
}
64+
65+
async fn wrap_with_proxy_protocol_v2(mut stream: TcpStream) -> io::Result<()> {
66+
let mut upstream = TcpStream::connect(("127.0.0.1", 8080)).await?;
67+
68+
tracing::info!(
69+
"PROXYv2 {} -> {}",
70+
stream.peer_addr().unwrap(),
71+
UPSTREAM.to_string()
72+
);
73+
74+
let mut proxy_header = v2::Header::new(
75+
Command::Proxy,
76+
TransportProtocol::Stream,
77+
AddressFamily::Inet,
78+
SocketAddr::from(([127, 0, 0, 1], 8082)),
79+
*UPSTREAM,
80+
vec![
81+
Tlv::new(0x05, [0x34, 0x32, 0x36, 0x39]), // UNIQUE_ID
82+
Tlv::new(0x04, "NOOP m9"), // NOOP
83+
],
84+
);
85+
86+
proxy_header.add_crc23c_checksum_tlv();
87+
88+
proxy_header.write_to_tokio(&mut upstream).await?;
89+
90+
let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?;
91+
92+
Ok(())
93+
}
94+
95+
fn start_server() -> io::Result<Server> {
96+
let addr = ("127.0.0.1", 8082);
97+
tracing::info!("starting proxy server on port: {}", &addr.0);
98+
tracing::info!("proxying to 127.0.0.1:8080");
99+
100+
Ok(Server::build()
101+
.bind("proxy-protocol-v1", ("127.0.0.1", 8081), move || {
102+
fn_service(wrap_with_proxy_protocol_v1)
103+
.map_err(|err| tracing::error!("service error: {:?}", err))
104+
})?
105+
.bind("proxy-protocol-v2", addr, move || {
106+
fn_service(wrap_with_proxy_protocol_v2)
107+
.map_err(|err| tracing::error!("service error: {:?}", err))
108+
})?
109+
.workers(2)
110+
.run())
111+
}
112+
113+
#[tokio::main]
114+
async fn main() -> io::Result<()> {
115+
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
116+
start_server()?.await?;
117+
Ok(())
118+
}

0 commit comments

Comments
 (0)