Skip to content

Commit 1faff7c

Browse files
committed
feat: Lazer publisher agent
1 parent 25ae56e commit 1faff7c

File tree

9 files changed

+3977
-0
lines changed

9 files changed

+3977
-0
lines changed

lazer_agent/Cargo.lock

Lines changed: 3195 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lazer_agent/Cargo.toml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
[package]
2+
name = "pyth-lazer-agent"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
pyth-lazer-publisher-sdk = "0.1.3"
8+
pyth-lazer-protocol = "0.7.2"
9+
10+
anyhow = "1.0.98"
11+
bincode = { version = "2.0.1", features = ["serde"] }
12+
clap = { version = "4.5.32", features = ["derive"] }
13+
config = "0.15.11"
14+
derivative = "2.2.0"
15+
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
16+
futures = "0.3.31"
17+
futures-util = "0.3.31"
18+
http = "1.3.1"
19+
http-body-util = "0.1.3"
20+
humantime-serde = "1.1.1"
21+
hyper = { version = "1.6.0", features = ["http1", "server"] }
22+
hyper-util = { version = "0.1.10", features = ["tokio"] }
23+
protobuf = "3.7.2"
24+
serde = { version = "1.0.219", features = ["derive"] }
25+
serde_json = "1.0.140"
26+
soketto = { version = "0.8.1", features = ["http"] }
27+
solana-keypair = "2.2.1"
28+
tokio = { version = "1.44.1", features = ["full"] }
29+
tokio-stream = "0.1.17"
30+
tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }
31+
tokio-util = { version = "0.7.14", features = ["compat"] }
32+
tracing = "0.1.41"
33+
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }
34+
url = { version = "2.5.4", features = ["serde"] }

lazer_agent/config.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
relayer_urls = ["ws://localhost:1235/v1/transaction", "ws://localhost:1335/v1/transaction"]
2+
authorization_token = "token1"
3+
publish_keypair_path = "/path/to/solana/id.json"
4+
listen_address = "0.0.0.0:1234"
5+
publish_interval_duration = "50ms"

lazer_agent/src/config.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use std::net::SocketAddr;
2+
use std::path::PathBuf;
3+
use std::time::Duration;
4+
5+
use config::{Environment, File};
6+
use derivative::Derivative;
7+
use pyth_lazer_protocol::router::PublisherId;
8+
use serde::Deserialize;
9+
use url::Url;
10+
11+
#[derive(Deserialize, Derivative, Clone, PartialEq)]
12+
#[derivative(Debug)]
13+
pub struct Config {
14+
pub listen_address: SocketAddr,
15+
pub publisher_id: PublisherId,
16+
pub relayer_urls: Vec<Url>,
17+
pub authorization_token: String,
18+
pub publish_keypair_path: PathBuf,
19+
#[serde(with = "humantime_serde", default = "default_publish_interval")]
20+
pub publish_interval_duration: Duration,
21+
}
22+
23+
fn default_publish_interval() -> Duration {
24+
Duration::from_millis(50)
25+
}
26+
27+
pub fn load_config(config_path: String) -> anyhow::Result<Config> {
28+
let config = config::Config::builder()
29+
.add_source(File::with_name(&config_path))
30+
.add_source(Environment::with_prefix("LAZER_AGENT").separator("__"))
31+
.build()?
32+
.try_deserialize()?;
33+
Ok(config)
34+
}
35+
36+
// Default capacity for all tokio mpsc channels that communicate between tasks.
37+
pub const CHANNEL_CAPACITY: usize = 1000;

lazer_agent/src/http_server.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use anyhow::{Context, Result};
2+
use hyper::{Response, StatusCode, body::Bytes, server::conn::http1, service::service_fn};
3+
use hyper_util::rt::TokioIo;
4+
use soketto::{
5+
BoxedError,
6+
handshake::http::{Server, is_upgrade_request},
7+
};
8+
use std::{io, net::SocketAddr};
9+
use tokio::net::{TcpListener, TcpStream};
10+
use tracing::{debug, info, instrument, warn};
11+
12+
use crate::{
13+
config::Config,
14+
lazer_publisher::LazerPublisher,
15+
publisher_handle::{PublisherConnectionContext, handle_publisher},
16+
};
17+
18+
type FullBody = http_body_util::Full<Bytes>;
19+
20+
#[derive(Debug)]
21+
pub enum Request {
22+
PublisherV1,
23+
PublisherV2,
24+
}
25+
26+
pub struct RelayerRequest(pub http::Request<hyper::body::Incoming>);
27+
28+
const PUBLISHER_WS_URI: &str = "/v1/publisher";
29+
const PUBLISHER_WS_URI_V2: &str = "/v2/publisher";
30+
31+
pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()> {
32+
let listener = TcpListener::bind(&config.listen_address).await?;
33+
info!("listening on {:?}", &config.listen_address);
34+
35+
loop {
36+
let stream_addr = listener.accept().await;
37+
let config_clone = config.clone();
38+
let lazer_publisher_clone = lazer_publisher.clone();
39+
tokio::spawn(async {
40+
if let Err(err) =
41+
try_handle_connection(stream_addr, config_clone, lazer_publisher_clone).await
42+
{
43+
warn!("error while handling connection: {err:?}");
44+
}
45+
});
46+
}
47+
}
48+
49+
async fn try_handle_connection(
50+
stream_addr: io::Result<(TcpStream, SocketAddr)>,
51+
config: Config,
52+
lazer_publisher: LazerPublisher,
53+
) -> Result<()> {
54+
let (stream, remote_addr) = stream_addr?;
55+
debug!("accepted connection from {}", remote_addr);
56+
stream.set_nodelay(true)?;
57+
http1::Builder::new()
58+
.serve_connection(
59+
TokioIo::new(stream),
60+
service_fn(move |r| {
61+
let request = RelayerRequest(r);
62+
request_handler(
63+
request,
64+
remote_addr,
65+
config.clone(),
66+
lazer_publisher.clone(),
67+
)
68+
}),
69+
)
70+
.with_upgrades()
71+
.await?;
72+
Ok(())
73+
}
74+
75+
#[instrument(skip_all, fields(component = "http_server", remote_addr = remote_addr.to_string()))]
76+
async fn request_handler(
77+
request: RelayerRequest,
78+
remote_addr: SocketAddr,
79+
config: Config,
80+
lazer_publisher: LazerPublisher,
81+
) -> Result<Response<FullBody>, BoxedError> {
82+
let path = request.0.uri().path();
83+
84+
let request_type = match path {
85+
PUBLISHER_WS_URI => Request::PublisherV1,
86+
PUBLISHER_WS_URI_V2 => Request::PublisherV2,
87+
_ => {
88+
return Ok(Response::builder()
89+
.status(StatusCode::NOT_FOUND)
90+
.body(FullBody::from("not found"))
91+
.context("builder failed")?);
92+
}
93+
};
94+
95+
if !is_upgrade_request(&request.0) {
96+
return Ok(Response::builder()
97+
.status(StatusCode::BAD_REQUEST)
98+
.body(FullBody::from("bad request"))
99+
.context("builder failed")?);
100+
}
101+
102+
let mut server = Server::new();
103+
match server.receive_request(&request.0) {
104+
Ok(response) => {
105+
info!("accepted connection from publisher");
106+
match request_type {
107+
Request::PublisherV1 | Request::PublisherV2 => {
108+
let publisher_connection_context = PublisherConnectionContext {
109+
request_type,
110+
publisher_id: config.publisher_id,
111+
_remote_addr: remote_addr,
112+
};
113+
tokio::spawn(handle_publisher(
114+
server,
115+
request.0,
116+
publisher_connection_context,
117+
lazer_publisher,
118+
));
119+
Ok(response.map(|()| FullBody::default()))
120+
}
121+
}
122+
}
123+
Err(e) => {
124+
warn!("Could not upgrade connection: {}", e);
125+
Ok(Response::builder()
126+
.status(StatusCode::INTERNAL_SERVER_ERROR)
127+
.body(FullBody::from("internal server error"))
128+
.context("builder failed")?)
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)