diff --git a/Cargo.lock b/Cargo.lock index aab5e1e099..d4e79a1803 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -768,6 +768,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -3383,6 +3389,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.3.1", + "indexmap 2.10.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.11.2" @@ -3659,7 +3684,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -3682,6 +3707,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2 0.4.11", "http 1.3.1", "http-body 1.0.1", "httparse", @@ -3737,6 +3763,22 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.14" @@ -3756,9 +3798,11 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2", + "system-configuration 0.6.1", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -5540,8 +5584,9 @@ dependencies = [ "hyper 1.6.0", "hyper-util", "protobuf", - "pyth-lazer-protocol 0.7.3", - "pyth-lazer-publisher-sdk 0.1.6", + "pyth-lazer-protocol 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "pyth-lazer-publisher-sdk 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "reqwest 0.12.22", "serde", "serde_json", "soketto", @@ -5580,15 +5625,19 @@ dependencies = [ [[package]] name = "pyth-lazer-protocol" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6445dc5d2f7fff7c677fb8edc5a080a82ef7583c1bdb39daa95421788c23f695" +version = "0.8.1" dependencies = [ + "alloy-primitives 0.8.25", "anyhow", - "base64 0.22.1", + "bincode 1.3.3", + "bs58", "byteorder", "derive_more 1.0.0", + "ed25519-dalek 2.1.1", + "hex", + "humantime-serde", "itertools 0.13.0", + "libsecp256k1 0.7.2", "protobuf", "rust_decimal", "serde", @@ -5598,18 +5647,14 @@ dependencies = [ [[package]] name = "pyth-lazer-protocol" version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1258b8770756a82a39b7b02a296c10a91b93aa58c0cded47950defe4d9377644" dependencies = [ - "alloy-primitives 0.8.25", "anyhow", - "bincode 1.3.3", - "bs58", "byteorder", "derive_more 1.0.0", - "ed25519-dalek 2.1.1", - "hex", "humantime-serde", "itertools 0.13.0", - "libsecp256k1 0.7.2", "protobuf", "rust_decimal", "serde", @@ -5618,16 +5663,14 @@ dependencies = [ [[package]] name = "pyth-lazer-publisher-sdk" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6ef4052ebf2a7943259b3d52a10b2231ffc346717735c50e44d73fe92019d5" +version = "0.1.7" dependencies = [ "anyhow", "fs-err", "humantime", "protobuf", "protobuf-codegen", - "pyth-lazer-protocol 0.7.3", + "pyth-lazer-protocol 0.8.1", "serde-value", "tracing", ] @@ -5635,13 +5678,15 @@ dependencies = [ [[package]] name = "pyth-lazer-publisher-sdk" version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8d52a515b21b77a89266d584da4363fcd1e121213ac3065ab7ff0dab1172006" dependencies = [ "anyhow", "fs-err", "humantime", "protobuf", "protobuf-codegen", - "pyth-lazer-protocol 0.8.1", + "pyth-lazer-protocol 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde-value", "tracing", ] @@ -6102,12 +6147,12 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.32", "hyper-rustls 0.24.2", - "hyper-tls", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", @@ -6122,7 +6167,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 0.1.2", - "system-configuration", + "system-configuration 0.5.1", "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", @@ -6144,17 +6189,22 @@ dependencies = [ "async-compression", "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", + "h2 0.4.11", "http 1.3.1", "http-body 1.0.1", "http-body-util", "hyper 1.6.0", "hyper-rustls 0.27.7", + "hyper-tls 0.6.0", "hyper-util", "js-sys", "log", + "mime", + "native-tls", "percent-encoding", "pin-project-lite", "quinn", @@ -6165,6 +6215,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", + "tokio-native-tls", "tokio-rustls 0.26.2", "tokio-util", "tower 0.5.2", @@ -9644,7 +9695,18 @@ checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", "core-foundation 0.9.4", - "system-configuration-sys", + "system-configuration-sys 0.5.0", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.9.1", + "core-foundation 0.9.4", + "system-configuration-sys 0.6.0", ] [[package]] @@ -9657,6 +9719,16 @@ dependencies = [ "libc", ] +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tap" version = "1.0.1" @@ -10743,6 +10815,17 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-registry" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e" +dependencies = [ + "windows-link", + "windows-result", + "windows-strings", +] + [[package]] name = "windows-result" version = "0.3.4" diff --git a/apps/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml index b457a2cdb8..bf73676e22 100644 --- a/apps/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.3" edition = "2024" [dependencies] -pyth-lazer-publisher-sdk = "0.1.5" -pyth-lazer-protocol = "0.7.2" +pyth-lazer-publisher-sdk = "0.1.7" +pyth-lazer-protocol = "0.8.1" anyhow = "1.0.98" backoff = "0.4.0" @@ -20,7 +20,7 @@ futures-util = "0.3.31" http = "1.3.1" http-body-util = "0.1.3" humantime-serde = "1.1.1" -hyper = { version = "1.6.0", features = ["http1", "server"] } +hyper = { version = "1.6.0", features = ["http1", "server", "client"] } hyper-util = { version = "0.1.10", features = ["tokio"] } protobuf = "3.7.2" serde = { version = "1.0.219", features = ["derive"] } @@ -33,6 +33,7 @@ tokio-util = { version = "0.7.14", features = ["compat"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] } url = { version = "2.5.4", features = ["serde"] } +reqwest = "0.12.22" [dev-dependencies] tempfile = "3.20.0" diff --git a/apps/pyth-lazer-agent/config/config.toml b/apps/pyth-lazer-agent/config/config.toml index d75a708880..7459a196ab 100644 --- a/apps/pyth-lazer-agent/config/config.toml +++ b/apps/pyth-lazer-agent/config/config.toml @@ -2,3 +2,4 @@ relayer_urls = ["ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction", "ws:// publish_keypair_path = "/path/to/solana/id.json" listen_address = "0.0.0.0:8910" publish_interval_duration = "25ms" +authorization_token="token1" diff --git a/apps/pyth-lazer-agent/src/config.rs b/apps/pyth-lazer-agent/src/config.rs index 57b0652be7..d0721939e9 100644 --- a/apps/pyth-lazer-agent/src/config.rs +++ b/apps/pyth-lazer-agent/src/config.rs @@ -17,6 +17,7 @@ pub struct Config { pub publish_keypair_path: PathBuf, #[serde(with = "humantime_serde", default = "default_publish_interval")] pub publish_interval_duration: Duration, + pub history_service_url: Option, } fn default_publish_interval() -> Duration { diff --git a/apps/pyth-lazer-agent/src/http_server.rs b/apps/pyth-lazer-agent/src/http_server.rs index 235fdb2af7..8e5b5a2bec 100644 --- a/apps/pyth-lazer-agent/src/http_server.rs +++ b/apps/pyth-lazer-agent/src/http_server.rs @@ -1,32 +1,39 @@ +use crate::jrpc_handle::{JrpcConnectionContext, handle_jrpc}; +use crate::publisher_handle::handle_publisher; +use crate::{ + config::Config, lazer_publisher::LazerPublisher, publisher_handle::PublisherConnectionContext, +}; use anyhow::{Context, Result}; +use hyper::body::Incoming; use hyper::{Response, StatusCode, body::Bytes, server::conn::http1, service::service_fn}; use hyper_util::rt::TokioIo; use soketto::{ BoxedError, handshake::http::{Server, is_upgrade_request}, }; +use std::fmt::Debug; use std::{io, net::SocketAddr}; use tokio::net::{TcpListener, TcpStream}; use tracing::{debug, info, instrument, warn}; -use crate::{ - config::Config, - lazer_publisher::LazerPublisher, - publisher_handle::{PublisherConnectionContext, handle_publisher}, -}; - type FullBody = http_body_util::Full; -#[derive(Debug)] -pub enum Request { +#[derive(Debug, Copy, Clone)] +pub enum PublisherRequest { PublisherV1, PublisherV2, } -pub struct RelayerRequest(pub http::Request); +pub enum Request { + PublisherRequest(PublisherRequest), + JrpcV1, +} + +pub struct RelayerRequest(pub http::Request); -const PUBLISHER_WS_URI: &str = "/v1/publisher"; +const PUBLISHER_WS_URI_V1: &str = "/v1/publisher"; const PUBLISHER_WS_URI_V2: &str = "/v2/publisher"; +const JRPC_WS_URI_V1: &str = "/v1/jprc"; const READINESS_PROBE_PATH: &str = "/ready"; const LIVENESS_PROBE_PATH: &str = "/live"; @@ -38,8 +45,11 @@ pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()> loop { let stream_addr = listener.accept().await; let lazer_publisher_clone = lazer_publisher.clone(); - tokio::spawn(async { - if let Err(err) = try_handle_connection(stream_addr, lazer_publisher_clone).await { + let config = config.clone(); + tokio::spawn(async move { + if let Err(err) = + try_handle_connection(config, stream_addr, lazer_publisher_clone).await + { warn!("error while handling connection: {err:?}"); } }); @@ -47,6 +57,7 @@ pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()> } async fn try_handle_connection( + config: Config, stream_addr: io::Result<(TcpStream, SocketAddr)>, lazer_publisher: LazerPublisher, ) -> Result<()> { @@ -58,7 +69,12 @@ async fn try_handle_connection( TokioIo::new(stream), service_fn(move |r| { let request = RelayerRequest(r); - request_handler(request, remote_addr, lazer_publisher.clone()) + request_handler( + config.clone(), + request, + remote_addr, + lazer_publisher.clone(), + ) }), ) .with_upgrades() @@ -68,6 +84,7 @@ async fn try_handle_connection( #[instrument(skip_all, fields(component = "http_server", remote_addr = remote_addr.to_string()))] async fn request_handler( + config: Config, request: RelayerRequest, remote_addr: SocketAddr, lazer_publisher: LazerPublisher, @@ -75,8 +92,9 @@ async fn request_handler( let path = request.0.uri().path(); let request_type = match path { - PUBLISHER_WS_URI => Request::PublisherV1, - PUBLISHER_WS_URI_V2 => Request::PublisherV2, + PUBLISHER_WS_URI_V1 => Request::PublisherRequest(PublisherRequest::PublisherV1), + PUBLISHER_WS_URI_V2 => Request::PublisherRequest(PublisherRequest::PublisherV2), + JRPC_WS_URI_V1 => Request::JrpcV1, LIVENESS_PROBE_PATH => { let response = Response::builder().status(StatusCode::OK); return Ok(response.body(FullBody::default())?); @@ -113,9 +131,9 @@ async fn request_handler( Ok(response) => { info!("accepted connection from publisher"); match request_type { - Request::PublisherV1 | Request::PublisherV2 => { + Request::PublisherRequest(publisher_request_type) => { let publisher_connection_context = PublisherConnectionContext { - request_type, + request_type: publisher_request_type, _remote_addr: remote_addr, }; tokio::spawn(handle_publisher( @@ -126,6 +144,17 @@ async fn request_handler( )); Ok(response.map(|()| FullBody::default())) } + Request::JrpcV1 => { + let publisher_connection_context = JrpcConnectionContext {}; + tokio::spawn(handle_jrpc( + config.clone(), + server, + request.0, + publisher_connection_context, + lazer_publisher, + )); + Ok(response.map(|()| FullBody::default())) + } } } Err(e) => { diff --git a/apps/pyth-lazer-agent/src/jrpc_handle.rs b/apps/pyth-lazer-agent/src/jrpc_handle.rs new file mode 100644 index 0000000000..b0c752910c --- /dev/null +++ b/apps/pyth-lazer-agent/src/jrpc_handle.rs @@ -0,0 +1,328 @@ +use crate::config::Config; +use crate::lazer_publisher::LazerPublisher; +use crate::websocket_utils::{handle_websocket_error, send_text}; +use anyhow::Error; +use futures::{AsyncRead, AsyncWrite}; +use futures_util::io::{BufReader, BufWriter}; +use hyper_util::rt::TokioIo; +use pyth_lazer_protocol::jrpc::{ + GetMetadataParams, JrpcCall, JrpcError, JrpcErrorResponse, JrpcResponse, JrpcSuccessResponse, + JsonRpcVersion, PythLazerAgentJrpcV1, SymbolMetadata, +}; +use soketto::Sender; +use soketto::handshake::http::Server; +use std::str::FromStr; +use tokio::{pin, select}; +use tokio_util::compat::TokioAsyncReadCompatExt; +use tracing::{debug, error, instrument}; +use url::Url; + +const DEFAULT_HISTORY_SERVICE_URL: &str = + "https://history.pyth-lazer.dourolabs.app/history/v1/symbols"; + +pub struct JrpcConnectionContext {} + +#[instrument( + skip(server, request, lazer_publisher, context), + fields(component = "jrpc_ws") +)] +pub async fn handle_jrpc( + config: Config, + server: Server, + request: hyper::Request, + context: JrpcConnectionContext, + lazer_publisher: LazerPublisher, +) { + if let Err(err) = try_handle_jrpc(config, server, request, context, lazer_publisher).await { + handle_websocket_error(err); + } +} + +#[instrument( + skip(server, request, lazer_publisher, _context), + fields(component = "jrpc_ws") +)] +async fn try_handle_jrpc( + config: Config, + server: Server, + request: hyper::Request, + _context: JrpcConnectionContext, + lazer_publisher: LazerPublisher, +) -> anyhow::Result<()> { + let stream = hyper::upgrade::on(request).await?; + let io = TokioIo::new(stream); + let stream = BufReader::new(BufWriter::new(io.compat())); + let (mut ws_sender, mut ws_receiver) = server.into_builder(stream).finish(); + + let mut receive_buf = Vec::new(); + + loop { + receive_buf.clear(); + { + // soketto is not cancel-safe, so we need to store the future and poll it + // in the inner loop. + let receive = async { ws_receiver.receive(&mut receive_buf).await }; + pin!(receive); + #[allow(clippy::never_loop, reason = "false positive")] // false positive + loop { + select! { + _result = &mut receive => { + break + } + } + } + } + + match handle_jrpc_inner(&config, &mut ws_sender, &mut receive_buf, &lazer_publisher).await { + Ok(_) => {} + Err(err) => { + debug!("Error handling JRPC request: {}", err); + send_text( + &mut ws_sender, + serde_json::to_string::>(&JrpcResponse::Error( + JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::InternalError.into(), + id: None, + }, + ))? + .as_str(), + ) + .await?; + } + } + } +} + +async fn handle_jrpc_inner( + config: &Config, + sender: &mut Sender, + receive_buf: &mut Vec, + lazer_publisher: &LazerPublisher, +) -> anyhow::Result<()> { + match serde_json::from_slice::(receive_buf.as_slice()) { + Ok(jrpc_request) => match jrpc_request.params { + JrpcCall::PushUpdate(request_params) => { + match lazer_publisher + .push_feed_update(request_params.into()) + .await + { + Ok(_) => { + send_text( + sender, + serde_json::to_string::>(&JrpcResponse::Success( + JrpcSuccessResponse:: { + jsonrpc: JsonRpcVersion::V2, + result: "success".to_string(), + id: jrpc_request.id, + }, + ))? + .as_str(), + ) + .await?; + } + Err(err) => { + debug!("error while sending updates: {:?}", err); + send_text( + sender, + serde_json::to_string::>(&JrpcResponse::Error( + JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::InternalError.into(), + id: Some(jrpc_request.id), + }, + ))? + .as_str(), + ) + .await?; + } + } + } + JrpcCall::GetMetadata(request_params) => match get_metadata(config.clone()).await { + Ok(symbols) => { + let symbols = filter_symbols(symbols.clone(), request_params); + + send_text( + sender, + serde_json::to_string::>>( + &JrpcResponse::Success(JrpcSuccessResponse::> { + jsonrpc: JsonRpcVersion::V2, + result: symbols, + id: jrpc_request.id, + }), + )? + .as_str(), + ) + .await?; + } + Err(err) => { + error!("error while retrieving metadata: {:?}", err); + send_text( + sender, + serde_json::to_string::>(&JrpcResponse::Error( + JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + // note: right now specifying an invalid method results in a parse error + error: JrpcError::InternalError.into(), + id: None, + }, + ))? + .as_str(), + ) + .await?; + } + }, + }, + Err(err) => { + debug!("Error parsing JRPC request: {}", err); + send_text( + sender, + serde_json::to_string::>(&JrpcResponse::Error( + JrpcErrorResponse { + jsonrpc: JsonRpcVersion::V2, + error: JrpcError::ParseError(err.to_string()).into(), + id: None, + }, + ))? + .as_str(), + ) + .await?; + } + } + Ok(()) +} + +async fn get_metadata(config: Config) -> Result, Error> { + let result = reqwest::get( + config + .history_service_url + .unwrap_or(Url::from_str(DEFAULT_HISTORY_SERVICE_URL)?), + ) + .await?; + + if result.status().is_success() { + Ok(serde_json::from_str::>( + &result.text().await?, + )?) + } else { + Err(anyhow::anyhow!( + "Error getting metadata (status_code={}, body={})", + result.status(), + result.text().await.unwrap_or("none".to_string()) + )) + } +} + +fn filter_symbols( + symbols: Vec, + get_metadata_params: GetMetadataParams, +) -> Vec { + let names = &get_metadata_params.names.clone(); + let asset_types = &get_metadata_params.asset_types.clone(); + + let res: Vec = symbols + .into_iter() + .filter(|symbol| { + if let Some(names) = names { + if !names.contains(&symbol.name) { + return false; + } + } + + if let Some(asset_types) = asset_types { + if !asset_types.contains(&symbol.asset_type) { + return false; + } + } + + true + }) + .collect(); + + res +} + +#[cfg(test)] +pub mod tests { + use super::*; + use pyth_lazer_protocol::router::{Channel, FixedRate, PriceFeedId}; + use pyth_lazer_protocol::symbol_state::SymbolState; + use std::net::SocketAddr; + + fn gen_test_symbol(name: String, asset_type: String) -> SymbolMetadata { + SymbolMetadata { + pyth_lazer_id: PriceFeedId(1), + name, + symbol: "".to_string(), + description: "".to_string(), + asset_type, + exponent: 0, + cmc_id: None, + funding_rate_interval: None, + min_publishers: 0, + min_channel: Channel::FixedRate(FixedRate::MIN), + state: SymbolState::Stable, + hermes_id: None, + quote_currency: None, + } + } + + #[tokio::test] + #[ignore] + async fn test_try_get_metadata() { + let config = Config { + listen_address: SocketAddr::from(([127, 0, 0, 1], 0)), + relayer_urls: vec![], + authorization_token: None, + publish_keypair_path: Default::default(), + publish_interval_duration: Default::default(), + history_service_url: None, + }; + + println!("{:?}", get_metadata(config).await.unwrap()); + } + + #[test] + fn test_filter_symbols() { + let symbol1 = gen_test_symbol("BTC".to_string(), "crypto".to_string()); + let symbol2 = gen_test_symbol("XMR".to_string(), "crypto".to_string()); + let symbol3 = gen_test_symbol("BTCUSDT".to_string(), "funding-rate".to_string()); + let symbols = vec![symbol1.clone(), symbol2.clone(), symbol3.clone()]; + + // just a name filter + assert_eq!( + filter_symbols( + symbols.clone(), + GetMetadataParams { + names: Some(vec!["XMR".to_string()]), + asset_types: None, + }, + ), + vec![symbol2.clone()] + ); + + // just an asset type filter + assert_eq!( + filter_symbols( + symbols.clone(), + GetMetadataParams { + names: None, + asset_types: Some(vec!["crypto".to_string()]), + }, + ), + vec![symbol1.clone(), symbol2.clone()] + ); + + // name and asset type + assert_eq!( + filter_symbols( + symbols.clone(), + GetMetadataParams { + names: Some(vec!["BTC".to_string()]), + asset_types: Some(vec!["crypto".to_string()]), + }, + ), + vec![symbol1.clone()] + ); + } +} diff --git a/apps/pyth-lazer-agent/src/lazer_publisher.rs b/apps/pyth-lazer-agent/src/lazer_publisher.rs index b5c2d304cd..b453362caf 100644 --- a/apps/pyth-lazer-agent/src/lazer_publisher.rs +++ b/apps/pyth-lazer-agent/src/lazer_publisher.rs @@ -24,7 +24,7 @@ use tokio::{ }; use tracing::error; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct LazerPublisher { sender: Sender, pub(crate) is_ready: Arc, @@ -223,6 +223,7 @@ mod tests { authorization_token: None, publish_keypair_path: PathBuf::from(signing_key_file.path()), publish_interval_duration: Duration::from_millis(25), + history_service_url: None, }; let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY); diff --git a/apps/pyth-lazer-agent/src/main.rs b/apps/pyth-lazer-agent/src/main.rs index 8cbe81c927..7d319a7c4c 100644 --- a/apps/pyth-lazer-agent/src/main.rs +++ b/apps/pyth-lazer-agent/src/main.rs @@ -8,6 +8,7 @@ use { mod config; mod http_server; +mod jrpc_handle; mod lazer_publisher; mod publisher_handle; mod relayer_session; diff --git a/apps/pyth-lazer-agent/src/publisher_handle.rs b/apps/pyth-lazer-agent/src/publisher_handle.rs index 45c2b65b4e..a75ddc6d87 100644 --- a/apps/pyth-lazer-agent/src/publisher_handle.rs +++ b/apps/pyth-lazer-agent/src/publisher_handle.rs @@ -22,7 +22,7 @@ use crate::{ }; pub struct PublisherConnectionContext { - pub request_type: http_server::Request, + pub request_type: http_server::PublisherRequest, pub _remote_addr: SocketAddr, } @@ -81,7 +81,7 @@ async fn try_handle_publisher( // reply with an error if we can't parse the binary update let feed_update: FeedUpdate = match context.request_type { - http_server::Request::PublisherV1 => { + http_server::PublisherRequest::PublisherV1 => { match bincode::serde::decode_from_slice::( &receive_buf, bincode::config::legacy(), @@ -132,7 +132,7 @@ async fn try_handle_publisher( } } } - http_server::Request::PublisherV2 => { + http_server::PublisherRequest::PublisherV2 => { match bincode::serde::decode_from_slice::( &receive_buf, bincode::config::legacy(), diff --git a/apps/pyth-lazer-agent/src/relayer_session.rs b/apps/pyth-lazer-agent/src/relayer_session.rs index d89d8b140f..33423da33b 100644 --- a/apps/pyth-lazer-agent/src/relayer_session.rs +++ b/apps/pyth-lazer-agent/src/relayer_session.rs @@ -35,6 +35,7 @@ async fn connect_to_relayer( HeaderValue::from_str(&format!("Bearer {token}"))?, ); let (ws_stream, _) = connect_async_with_config(req, None, true).await?; + tracing::info!("connected to the relayer at {}", url); Ok(ws_stream.split()) } @@ -149,7 +150,7 @@ impl RelayerSessionTask { msg = relayer_ws_receiver.next() => { match msg { Some(Ok(msg)) => { - tracing::debug!("Received message from relayer: {msg:?}"); + tracing::debug!("Received a message from relayer: {msg:?}"); } Some(Err(e)) => { tracing::error!("Error receiving message from at relayer: {e:?}"); @@ -165,6 +166,7 @@ impl RelayerSessionTask { } } +//noinspection DuplicatedCode #[cfg(test)] mod tests { use crate::relayer_session::RelayerSessionTask; @@ -215,7 +217,7 @@ mod tests { while let Some(msg) = read.next().await { if let Ok(msg) = msg { if msg.is_binary() { - tracing::info!("Received binary message: {msg:?}"); + tracing::info!("Received a binary message: {msg:?}"); let transaction = SignedLazerTransaction::parse_from_bytes(msg.into_data().as_ref()) .unwrap();