From 3532f37f0559a2218d482a0375eb034ddffc2901 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 4 Jun 2025 11:24:07 -0500 Subject: [PATCH 1/7] Update pyth-lazer-publisher-sdk transaction --- Cargo.lock | 267 ++++++++++++++++++++++++++- Cargo.toml | 2 +- src/agent/services/lazer_exporter.rs | 16 +- 3 files changed, 272 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d1c22d2..4592a8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,7 +69,7 @@ version = "2.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "973a83d0d66d1f04647d1146a07736864f0742300b56bf2a5aadf5ce7b22fe47" dependencies = [ - "ahash", + "ahash 0.8.11", "solana-epoch-schedule", "solana-feature-set-interface", "solana-hash", @@ -113,6 +113,17 @@ dependencies = [ "solana-sdk-ids", ] +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom 0.2.16", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.8.11" @@ -642,6 +653,18 @@ dependencies = [ "serde", ] +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "blake3" version = "1.8.2" @@ -788,6 +811,28 @@ dependencies = [ "serde", ] +[[package]] +name = "bytecheck" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" +dependencies = [ + "bytecheck_derive", + "ptr_meta", + "simdutf8", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "bytemuck" version = "1.23.0" @@ -1347,6 +1392,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "digest" version = "0.9.0" @@ -1663,6 +1728,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs-err" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f89bda4c2a21204059a977ed3bfe746677dfd137b83c339e702b0ac91d482aa" +dependencies = [ + "autocfg", +] + +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.31" @@ -1902,6 +1982,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.8", +] [[package]] name = "hashbrown" @@ -1909,7 +1992,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash", + "ahash 0.8.11", ] [[package]] @@ -2483,6 +2566,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" @@ -3138,7 +3230,7 @@ dependencies = [ "lazy_static", "once_cell", "opentelemetry", - "ordered-float", + "ordered-float 4.6.0", "percent-encoding", "rand 0.8.5", "thiserror 1.0.69", @@ -3146,6 +3238,15 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "4.6.0" @@ -3562,6 +3663,26 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "pyth-agent" version = "3.0.0" @@ -3613,14 +3734,37 @@ dependencies = [ "winnow", ] +[[package]] +name = "pyth-lazer-protocol" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9bdf4e2ba853a8b437309487542e742c7d094d8db189db194cb538f2be02ecd" +dependencies = [ + "anyhow", + "base64 0.22.1", + "byteorder", + "derive_more", + "itertools 0.13.0", + "protobuf", + "rust_decimal", + "serde", + "serde_json", +] + [[package]] name = "pyth-lazer-publisher-sdk" -version = "0.1.1" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13f8de2f7e8ba23b57167b7b150295ac806263aad39d7258d2c3684320ffce74" +checksum = "a3e633db28ca38210de8ab3e99d5bd85ad8cae08a08bb0292506340ee9d1c718" dependencies = [ + "anyhow", + "fs-err", + "humantime", "protobuf", "protobuf-codegen", + "pyth-lazer-protocol", + "serde-value", + "tracing", ] [[package]] @@ -3764,6 +3908,12 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rand" version = "0.7.3" @@ -3955,6 +4105,15 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "rend" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" +dependencies = [ + "bytecheck", +] + [[package]] name = "reqwest" version = "0.11.27" @@ -4072,6 +4231,35 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rkyv" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" +dependencies = [ + "bitvec", + "bytecheck", + "bytes", + "hashbrown 0.12.3", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", + "tinyvec", + "uuid", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "ron" version = "0.8.1" @@ -4095,6 +4283,22 @@ dependencies = [ "trim-in-place", ] +[[package]] +name = "rust_decimal" +version = "1.37.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faa7de2ba56ac291bd90c6b9bece784a52ae1411f9506544b3eae36dd2356d50" +dependencies = [ + "arrayvec", + "borsh 1.5.7", + "bytes", + "num-traits", + "rand 0.8.5", + "rkyv", + "serde", + "serde_json", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -4352,6 +4556,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "security-framework" version = "2.11.1" @@ -4421,6 +4631,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float 2.10.1", + "serde", +] + [[package]] name = "serde_bytes" version = "0.11.17" @@ -4602,6 +4822,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "siphasher" version = "0.3.11" @@ -5155,7 +5381,7 @@ version = "2.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92f6c09cc41059c0e03ccbee7f5d4cc0a315d68ef0d59b67eb90246adfd8cc35" dependencies = [ - "ahash", + "ahash 0.8.11", "lazy_static", "solana-epoch-schedule", "solana-hash", @@ -5169,7 +5395,7 @@ version = "4.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02007757246e40f10aa936dae4fa27efbf8dbd6a59575a12ccc802c1aea6e708" dependencies = [ - "ahash", + "ahash 0.8.11", "solana-pubkey", ] @@ -5566,7 +5792,7 @@ version = "2.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97222a3fda48570754ce114e43ca56af34741098c357cb8d3cb6695751e60330" dependencies = [ - "ahash", + "ahash 0.8.11", "bincode 1.3.3", "bv", "caps", @@ -7363,6 +7589,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "task-local-extensions" version = "0.1.4" @@ -8049,6 +8281,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" @@ -8712,6 +8954,15 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "x509-parser" version = "0.14.0" diff --git a/Cargo.toml b/Cargo.toml index 84c94ff..5db1c00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ bytemuck = "1.13.0" tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] } http = "1.3.1" url = { version = "2.5.4", features = ["serde"] } -pyth-lazer-publisher-sdk = "0.1.1" +pyth-lazer-publisher-sdk = "0.1.5" [profile.release] panic = 'abort' diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index 92b41f1..2e4c0ec 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -198,10 +198,12 @@ mod lazer_exporter { feed_update::Update, }, transaction::{ + Ed25519SignatureData, LazerTransaction, + SignatureData, SignedLazerTransaction, - TransactionSignatureType, lazer_transaction::Payload, + signature_data::Data::Ed25519, }, }, solana_sdk::signer::keypair, @@ -299,7 +301,6 @@ mod lazer_exporter { let publisher_timestamp = MessageField::some(Timestamp::now()); let mut publisher_update = PublisherUpdate { updates: vec![], - publisher_id: Some(config.publisher_id), publisher_timestamp, special_fields: Default::default(), }; @@ -343,9 +344,16 @@ mod lazer_exporter { } }; let signature = signing_key.sign(&buf); + let signature_data = SignatureData { + data: Some(Ed25519(Ed25519SignatureData { + signature: Some(signature.to_bytes().into()), + public_key: Some(signing_key.verifying_key().to_bytes().into()), + special_fields: Default::default(), + })), + special_fields: Default::default(), + }; let signed_lazer_transaction = SignedLazerTransaction { - signature_type: Some(TransactionSignatureType::ed25519.into()), - signature: Some(signature.to_bytes().to_vec()), + signature_data: MessageField::some(signature_data), payload: Some(buf), special_fields: Default::default(), }; From d27384df9ffa5685393b9a48af5bc45a2c268575 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 4 Jun 2025 15:38:45 -0500 Subject: [PATCH 2/7] Remove publisher_id --- src/agent/services/lazer_exporter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index 2e4c0ec..e3c479f 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -46,7 +46,6 @@ use { pub struct Config { pub history_url: Url, pub relayer_urls: Vec, - pub publisher_id: u32, pub authorization_token: String, pub publish_keypair_path: PathBuf, #[serde(with = "humantime_serde", default = "default_publish_interval")] From 0496481d2e73863d607f48121b7e22dbe903ddf5 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 5 Jun 2025 13:11:06 -0500 Subject: [PATCH 3/7] Split relayer sessions into separate threads --- src/agent/services/lazer_exporter.rs | 241 ++++++++++++++++----------- 1 file changed, 140 insertions(+), 101 deletions(-) diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index e3c479f..9a9820f 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -3,6 +3,7 @@ use { anyhow::{ Result, anyhow, + bail, }, futures_util::{ SinkExt, @@ -24,6 +25,11 @@ use { }, tokio::{ net::TcpStream, + select, + sync::mpsc::{ + self, + Receiver, + }, task::JoinHandle, }, tokio_tungstenite::{ @@ -42,6 +48,8 @@ use { url::Url, }; +pub const RELAYER_CHANNEL_CAPACITY: usize = 1000; + #[derive(Clone, Debug, Deserialize)] pub struct Config { pub history_url: Url, @@ -56,21 +64,21 @@ fn default_publish_interval() -> Duration { Duration::from_millis(200) } -struct RelayerSender { - ws_senders: Vec>, TungsteniteMessage>>, +struct RelayerWsSession { + ws_sender: SplitSink>, TungsteniteMessage>, } -impl RelayerSender { - async fn send_price_update( +impl RelayerWsSession { + async fn send_transaction( &mut self, signed_lazer_transaction: &SignedLazerTransaction, ) -> Result<()> { tracing::debug!("price_update: {:?}", signed_lazer_transaction); let buf = signed_lazer_transaction.write_to_bytes()?; - for sender in self.ws_senders.iter_mut() { - sender.send(TungsteniteMessage::from(buf.clone())).await?; - sender.flush().await?; - } + self.ws_sender + .send(TungsteniteMessage::from(buf.clone())) + .await?; + self.ws_sender.flush().await?; Ok(()) } } @@ -88,31 +96,80 @@ async fn connect_to_relayer( let headers = req.headers_mut(); headers.insert( "Authorization", - HeaderValue::from_str(&format!("Bearer {}", token))?, + HeaderValue::from_str(&format!("Bearer {token}"))?, ); let (ws_stream, _) = connect_async_with_config(req, None, true).await?; Ok(ws_stream.split()) } -async fn connect_to_relayers( - config: &Config, -) -> Result<( - RelayerSender, - Vec>>>, -)> { - let mut relayer_senders = Vec::new(); - let mut relayer_receivers = Vec::new(); - for url in config.relayer_urls.clone() { - let (relayer_sender, relayer_receiver) = - connect_to_relayer(url, &config.authorization_token).await?; - relayer_senders.push(relayer_sender); - relayer_receivers.push(relayer_receiver); +struct RelayerSessionTask { + // connection state + url: Url, + token: String, + receiver: Receiver, +} + +impl RelayerSessionTask { + pub async fn run(&mut self) { + let mut failure_count = 0; + let retry_duration = Duration::from_secs(1); + + loop { + match self.run_relayer_connection().await { + Ok(()) => { + tracing::info!("relayer session graceful shutdown"); + return; + } + Err(e) => { + failure_count += 1; + tracing::error!( + "relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}", + e, + failure_count, + retry_duration + ); + tokio::time::sleep(retry_duration).await; + } + } + } + } + + pub async fn run_relayer_connection(&mut self) -> Result<()> { + // Establish relayer connection + // Relayer will drop the connection if no data received in 5s + let (relayer_ws_sender, mut relayer_ws_receiver) = + connect_to_relayer(self.url.clone(), &self.token).await?; + let mut relayer_ws_session = RelayerWsSession { + ws_sender: relayer_ws_sender, + }; + + loop { + select! { + Some(transaction) = self.receiver.recv() => { + if let Err(e) = relayer_ws_session.send_transaction(&transaction).await + { + tracing::error!("Error publishing transaction to Lazer relayer: {e:?}"); + bail!("Failed to publish transaction to Lazer relayer: {e:?}"); + } + } + // Handle messages from the relayers, such as errors if we send a bad update + msg = relayer_ws_receiver.next() => { + match msg { + Some(Ok(msg)) => { + tracing::debug!("Received message from relayer: {msg:?}"); + } + Some(Err(e)) => { + tracing::error!("Error receiving message from at relayer: {e:?}"); + } + None => { + tracing::error!("relayer connection closed"); + bail!("relayer connection closed"); + } + } + } + } + } } - let sender = RelayerSender { - ws_senders: relayer_senders, - }; - tracing::info!("connected to relayers: {:?}", config.relayer_urls); - Ok((sender, relayer_receivers)) } // TODO: This is copied from history-service; move to Lazer protocol sdk. @@ -156,10 +213,26 @@ async fn fetch_symbols(history_url: &Url) -> Result> { #[instrument(skip(config, state))] pub fn lazer_exporter(config: Config, state: Arc) -> Vec> { - let handles = vec![tokio::spawn(lazer_exporter::lazer_exporter( + let mut handles = vec![]; + let mut relayer_senders = vec![]; + + for url in config.relayer_urls.iter() { + let (sender, receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY); + let mut task = RelayerSessionTask { + url: url.clone(), + token: config.authorization_token.to_owned(), + receiver, + }; + handles.push(tokio::spawn(async move { task.run().await })); + relayer_senders.push(sender); + } + + handles.push(tokio::spawn(lazer_exporter::lazer_exporter( config.clone(), state, - ))]; + relayer_senders, + ))); + handles } @@ -170,20 +243,19 @@ mod lazer_exporter { services::lazer_exporter::{ Config, SymbolResponse, - connect_to_relayers, fetch_symbols, }, state::local::LocalStore, }, anyhow::{ Context, + Result, bail, }, ed25519_dalek::{ Signer, SigningKey, }, - futures_util::StreamExt, protobuf::{ Message, MessageField, @@ -209,44 +281,44 @@ mod lazer_exporter { std::{ collections::HashMap, sync::Arc, - time::Duration, }, - tokio_stream::StreamMap, + tokio::sync::mpsc::Sender, }; - pub async fn lazer_exporter(config: Config, state: Arc) - where - S: LocalStore, - S: Send + Sync + 'static, - { - let mut failure_count = 0; - let retry_duration = Duration::from_secs(1); - - loop { - match run(&config, state.clone()).await { - Ok(()) => { - tracing::info!("lazer_exporter graceful shutdown"); - return; - } - Err(e) => { - failure_count += 1; - tracing::error!( - "lazer_exporter failed with error: {:?}, failure_count: {}; retrying in {:?}", - e, - failure_count, - retry_duration - ); - tokio::time::sleep(retry_duration).await; - } + fn get_signing_key(config: &Config) -> Result { + // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher + let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { + Ok(k) => k, + Err(e) => { + tracing::error!( + error = ?e, + publish_keypair_path = config.publish_keypair_path.display().to_string(), + "Reading publish keypair returned an error. ", + ); + bail!("Reading publish keypair returned an error. "); } - } + }; + + SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) + .context("Failed to create signing key from keypair") } - async fn run(config: &Config, state: Arc) -> anyhow::Result<()> - where + pub async fn lazer_exporter( + config: Config, + state: Arc, + relayer_senders: Vec>, + ) where S: LocalStore, S: Send + Sync + 'static, { + let signing_key = match get_signing_key(&config) { + Ok(signing_key) => signing_key, + Err(e) => { + tracing::error!("lazer_exporter signing key failure: {e:?}"); + return; + } + }; + // TODO: Re-fetch on an interval? let lazer_symbols: HashMap = match fetch_symbols(&config.history_url).await { @@ -265,33 +337,10 @@ mod lazer_exporter { .collect(), Err(e) => { tracing::error!("Failed to fetch Lazer symbols: {e:?}"); - bail!("Failed to fetch Lazer symbols: {e:?}"); + return; } }; - // Establish relayer connections - // Relayer will drop the connection if no data received in 5s - let (mut relayer_sender, relayer_receivers) = connect_to_relayers(config).await?; - let mut stream_map = StreamMap::new(); - for (i, receiver) in relayer_receivers.into_iter().enumerate() { - stream_map.insert(config.relayer_urls[i].clone(), receiver); - } - - // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher - let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { - Ok(k) => k, - Err(e) => { - tracing::error!( - error = ?e, - publish_keypair_path = config.publish_keypair_path.display().to_string(), - "Reading publish keypair returned an error. ", - ); - bail!("Reading publish keypair returned an error. "); - } - }; - - let signing_key = SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) - .context("Failed to create signing key from keypair")?; let mut publish_interval = tokio::time::interval(config.publish_interval_duration); loop { @@ -356,23 +405,13 @@ mod lazer_exporter { payload: Some(buf), special_fields: Default::default(), }; - if let Err(e) = relayer_sender.send_price_update(&signed_lazer_transaction).await { - tracing::error!("Error publishing update to Lazer relayer: {e:?}"); - bail!("Failed to publish update to Lazer relayer: {e:?}"); - } - } - // Handle messages from the relayers, such as errors if we send a bad update - mapped_msg = stream_map.next() => { - match mapped_msg { - Some((relayer_url, Ok(msg))) => { - tracing::debug!("Received message from relayer at {relayer_url}: {msg:?}"); - } - Some((relayer_url, Err(e))) => { - tracing::error!("Error receiving message from at relayer {relayer_url}: {e:?}"); - } - None => { - tracing::error!("relayer connection closed"); - bail!("relayer connection closed"); + for relayer_sender in relayer_senders.iter() { + if let Err(e) = relayer_sender + .send(signed_lazer_transaction.clone()) + .await + { + tracing::error!("Error sending transaction to Lazer relayer session: {e:?}"); + // TODO: Under what circumstances would the channel be hosed and is it worth retry? } } } From 0a4a099bcbd4b2c327802257108e5f95066995cc Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 5 Jun 2025 23:12:10 -0500 Subject: [PATCH 4/7] Parallelize channel send; exponential backoff --- Cargo.lock | 21 ++++++++++++++++++++ Cargo.toml | 1 + src/agent/services/lazer_exporter.rs | 29 ++++++++++++++++------------ 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4592a8e..bd3509c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -549,6 +549,17 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom 0.2.16", + "instant", + "rand 0.8.5", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -2536,6 +2547,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -3689,6 +3709,7 @@ version = "3.0.0" dependencies = [ "anyhow", "async-trait", + "backoff", "bincode 2.0.1", "bytemuck", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 5db1c00..1fa3634 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ path = "src/bin/agent.rs" [dependencies] anyhow = "1.0.81" +backoff = "0.4.0" ed25519-dalek = "2.1.1" serde = { version = "1.0.197", features = ["derive", "rc"] } async-trait = "0.1.79" diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index 9a9820f..2999476 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -5,6 +5,10 @@ use { anyhow, bail, }, + backoff::{ + ExponentialBackoffBuilder, + backoff::Backoff, + }, futures_util::{ SinkExt, stream::{ @@ -112,7 +116,13 @@ struct RelayerSessionTask { impl RelayerSessionTask { pub async fn run(&mut self) { let mut failure_count = 0; - let retry_duration = Duration::from_secs(1); + let initial_interval = Duration::from_millis(100); + let max_interval = Duration::from_secs(5); + let mut backoff = ExponentialBackoffBuilder::new() + .with_initial_interval(initial_interval) + .with_max_interval(max_interval) + .with_max_elapsed_time(None) + .build(); loop { match self.run_relayer_connection().await { @@ -122,13 +132,14 @@ impl RelayerSessionTask { } Err(e) => { failure_count += 1; + let next_backoff = backoff.next_backoff().unwrap_or(max_interval); tracing::error!( "relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}", e, failure_count, - retry_duration + next_backoff ); - tokio::time::sleep(retry_duration).await; + tokio::time::sleep(next_backoff).await; } } } @@ -405,15 +416,9 @@ mod lazer_exporter { payload: Some(buf), special_fields: Default::default(), }; - for relayer_sender in relayer_senders.iter() { - if let Err(e) = relayer_sender - .send(signed_lazer_transaction.clone()) - .await - { - tracing::error!("Error sending transaction to Lazer relayer session: {e:?}"); - // TODO: Under what circumstances would the channel be hosed and is it worth retry? - } - } + futures::future::join_all(relayer_senders.iter().map(|relayer_sender| + relayer_sender.send(signed_lazer_transaction.clone())) + ).await; } } } From 02e2a7842a688a8bae784cce5ca59aaf1b355f12 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Tue, 10 Jun 2025 10:44:01 -0500 Subject: [PATCH 5/7] Retry period reset logic --- src/agent/services/lazer_exporter.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index 2999476..595bfdf 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -25,7 +25,10 @@ use { std::{ path::PathBuf, sync::Arc, - time::Duration, + time::{ + Duration, + Instant, + }, }, tokio::{ net::TcpStream, @@ -115,7 +118,6 @@ struct RelayerSessionTask { impl RelayerSessionTask { pub async fn run(&mut self) { - let mut failure_count = 0; let initial_interval = Duration::from_millis(100); let max_interval = Duration::from_secs(5); let mut backoff = ExponentialBackoffBuilder::new() @@ -124,6 +126,10 @@ impl RelayerSessionTask { .with_max_elapsed_time(None) .build(); + const FAILURE_RESET_TIME: Duration = Duration::from_secs(300); + let mut first_failure_time = Instant::now(); + let mut failure_count = 0; + loop { match self.run_relayer_connection().await { Ok(()) => { @@ -131,6 +137,12 @@ impl RelayerSessionTask { return; } Err(e) => { + if first_failure_time.elapsed() > FAILURE_RESET_TIME { + failure_count = 0; + first_failure_time = Instant::now(); + backoff.reset(); + } + failure_count += 1; let next_backoff = backoff.next_backoff().unwrap_or(max_interval); tracing::error!( From 434b67918af8ca181696c76b9b256f3c704ecd8c Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Tue, 10 Jun 2025 12:40:12 -0500 Subject: [PATCH 6/7] Use broadcast::channel for multiplex to relayers --- src/agent/services/lazer_exporter.rs | 60 +++++++++++++++++----------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index 595bfdf..5dcc4b7 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -33,10 +33,7 @@ use { tokio::{ net::TcpStream, select, - sync::mpsc::{ - self, - Receiver, - }, + sync::broadcast, task::JoinHandle, }, tokio_tungstenite::{ @@ -80,7 +77,7 @@ impl RelayerWsSession { &mut self, signed_lazer_transaction: &SignedLazerTransaction, ) -> Result<()> { - tracing::debug!("price_update: {:?}", signed_lazer_transaction); + tracing::debug!("signed_lazer_transaction: {:?}", signed_lazer_transaction); let buf = signed_lazer_transaction.write_to_bytes()?; self.ws_sender .send(TungsteniteMessage::from(buf.clone())) @@ -113,7 +110,7 @@ struct RelayerSessionTask { // connection state url: Url, token: String, - receiver: Receiver, + receiver: broadcast::Receiver, } impl RelayerSessionTask { @@ -168,11 +165,25 @@ impl RelayerSessionTask { loop { select! { - Some(transaction) = self.receiver.recv() => { - if let Err(e) = relayer_ws_session.send_transaction(&transaction).await - { - tracing::error!("Error publishing transaction to Lazer relayer: {e:?}"); - bail!("Failed to publish transaction to Lazer relayer: {e:?}"); + recv_result = self.receiver.recv() => { + match recv_result { + Ok(transaction) => { + if let Err(e) = relayer_ws_session.send_transaction(&transaction).await { + tracing::error!("Error publishing transaction to Lazer relayer: {e:?}"); + bail!("Failed to publish transaction to Lazer relayer: {e:?}"); + } + }, + Err(e) => { + match e { + broadcast::error::RecvError::Closed => { + tracing::error!("transaction broadcast channel closed"); + bail!("transaction broadcast channel closed"); + } + broadcast::error::RecvError::Lagged(skipped_count) => { + tracing::warn!("transaction broadcast channel lagged by {skipped_count} messages"); + } + } + } } } // Handle messages from the relayers, such as errors if we send a bad update @@ -237,23 +248,23 @@ async fn fetch_symbols(history_url: &Url) -> Result> { #[instrument(skip(config, state))] pub fn lazer_exporter(config: Config, state: Arc) -> Vec> { let mut handles = vec![]; - let mut relayer_senders = vec![]; + + // can safely drop first receiver for ease of iteration + let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); for url in config.relayer_urls.iter() { - let (sender, receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY); let mut task = RelayerSessionTask { - url: url.clone(), - token: config.authorization_token.to_owned(), - receiver, + url: url.clone(), + token: config.authorization_token.to_owned(), + receiver: relayer_sender.subscribe(), }; handles.push(tokio::spawn(async move { task.run().await })); - relayer_senders.push(sender); } handles.push(tokio::spawn(lazer_exporter::lazer_exporter( config.clone(), state, - relayer_senders, + relayer_sender, ))); handles @@ -305,7 +316,7 @@ mod lazer_exporter { collections::HashMap, sync::Arc, }, - tokio::sync::mpsc::Sender, + tokio::sync::broadcast::Sender, }; fn get_signing_key(config: &Config) -> Result { @@ -329,7 +340,7 @@ mod lazer_exporter { pub async fn lazer_exporter( config: Config, state: Arc, - relayer_senders: Vec>, + relayer_sender: Sender, ) where S: LocalStore, S: Send + Sync + 'static, @@ -428,9 +439,12 @@ mod lazer_exporter { payload: Some(buf), special_fields: Default::default(), }; - futures::future::join_all(relayer_senders.iter().map(|relayer_sender| - relayer_sender.send(signed_lazer_transaction.clone())) - ).await; + match relayer_sender.send(signed_lazer_transaction.clone()) { + Ok(_) => (), + Err(e) => { + tracing::error!("Error sending transaction to relayer receivers: {e}"); + } + } } } } From 02b143c49a4e58e30a4502615a9c9012ba52a293 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 11 Jun 2025 16:00:42 -0500 Subject: [PATCH 7/7] patch version bump --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd3509c..5c8a80d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3705,7 +3705,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "3.0.0" +version = "3.0.1" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 1fa3634..6fc1626 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "3.0.0" +version = "3.0.1" edition = "2024" [[bin]]