From 76295b268c6db715b1cbeeaba9258223a3f164d7 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 12 Jun 2025 06:22:42 -0500 Subject: [PATCH] Some lazer exporter unit tests --- Cargo.lock | 5 +- Cargo.toml | 3 + src/agent/services/lazer_exporter.rs | 313 ++++++++++++++++++++++++++- 3 files changed, 317 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c8a80d..56eb557 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3743,6 +3743,7 @@ dependencies = [ "solana-pubkey", "solana-sdk", "solana-transaction-status", + "tempfile", "thiserror 2.0.12", "tokio", "tokio-stream", @@ -7627,9 +7628,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.19.1" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ "fastrand", "getrandom 0.3.2", diff --git a/Cargo.toml b/Cargo.toml index 6fc1626..12f93d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,9 @@ http = "1.3.1" url = { version = "2.5.4", features = ["serde"] } pyth-lazer-publisher-sdk = "0.1.5" +[dev-dependencies] +tempfile = "3.20.0" + [profile.release] panic = 'abort' diff --git a/src/agent/services/lazer_exporter.rs b/src/agent/services/lazer_exporter.rs index 5dcc4b7..ab06572 100644 --- a/src/agent/services/lazer_exporter.rs +++ b/src/agent/services/lazer_exporter.rs @@ -21,7 +21,10 @@ use { protobuf::Message as ProtobufMessage, pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction, reqwest::Client, - serde::Deserialize, + serde::{ + Deserialize, + Serialize, + }, std::{ path::PathBuf, sync::Arc, @@ -207,7 +210,7 @@ impl RelayerSessionTask { } // TODO: This is copied from history-service; move to Lazer protocol sdk. -#[derive(Deserialize)] +#[derive(Debug, Serialize, Deserialize)] struct SymbolResponse { pub pyth_lazer_id: u32, #[serde(rename = "name")] @@ -450,3 +453,309 @@ mod lazer_exporter { } } } + +#[cfg(test)] +mod tests { + use { + crate::agent::{ + services::lazer_exporter::{ + Config, + RELAYER_CHANNEL_CAPACITY, + RelayerSessionTask, + SymbolResponse, + lazer_exporter::lazer_exporter, + }, + state::{ + local, + local::{ + LocalStore, + PriceInfo, + }, + }, + }, + ed25519_dalek::{ + Signer, + SigningKey, + }, + futures_util::StreamExt, + prometheus_client::registry::Registry, + protobuf::{ + Message, + MessageField, + well_known_types::timestamp::Timestamp, + }, + pyth_lazer_publisher_sdk::{ + publisher_update::{ + FeedUpdate, + PriceUpdate, + PublisherUpdate, + feed_update::{ + self, + Update, + }, + }, + transaction::{ + Ed25519SignatureData, + LazerTransaction, + SignatureData, + SignedLazerTransaction, + lazer_transaction::{ + self, + Payload, + }, + signature_data::Data::Ed25519, + }, + }, + pyth_sdk_solana::state::PriceStatus, + std::{ + io::Write, + net::SocketAddr, + path::PathBuf, + sync::{ + Arc, + Once, + }, + time::Duration, + }, + tempfile::NamedTempFile, + tokio::{ + net::TcpListener, + sync::{ + broadcast::{ + self, + error::TryRecvError, + }, + mpsc, + }, + }, + url::Url, + warp::Filter, + }; + + static INIT: Once = Once::new(); + + fn init_tracing() { + INIT.call_once(|| { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .with_test_writer() // Send output to test output + .init(); + }); + } + + pub async fn run_mock_history_server(addr: SocketAddr) { + let route = warp::path!("history" / "v1" / "symbols") + .and(warp::get()) + .map(|| { + let response = vec![SymbolResponse { + pyth_lazer_id: 1, + _name: "BTCUSD".to_string(), + _symbol: "Crypto.BTC/USD".to_string(), + _description: "BITCOIN / US DOLLAR".to_string(), + _asset_type: "crypto".to_string(), + _exponent: -8, + _cmc_id: Some(1), + _interval: None, + _min_publishers: 1, + _min_channel: "real_time".to_string(), + _state: "stable".to_string(), + _schedule: "America/New_York;O,O,O,O,O,O,O;".to_string(), + hermes_id: Some( + "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" + .to_string(), + ), + }]; + warp::reply::json(&response) + }); + warp::serve(route).run(addr).await; + } + + fn get_private_key() -> SigningKey { + SigningKey::from_keypair_bytes(&[ + 105, 175, 146, 91, 32, 145, 164, 199, 37, 111, 139, 255, 44, 225, 5, 247, 154, 170, + 238, 70, 47, 15, 9, 48, 102, 87, 180, 50, 50, 38, 148, 243, 62, 148, 219, 72, 222, 170, + 8, 246, 176, 33, 205, 29, 118, 11, 220, 163, 214, 204, 46, 49, 132, 94, 170, 173, 244, + 39, 179, 211, 177, 70, 252, 31, + ]) + .unwrap() + } + + fn get_private_key_file() -> NamedTempFile { + let private_key_string = "[105,175,146,91,32,145,164,199,37,111,139,255,44,225,5,247,154,170,238,70,47,15,9,48,102,87,180,50,50,38,148,243,62,148,219,72,222,170,8,246,176,33,205,29,118,11,220,163,214,204,46,49,132,94,170,173,244,39,179,211,177,70,252,31]"; + let mut temp_file = NamedTempFile::new().unwrap(); + temp_file + .as_file_mut() + .write(private_key_string.as_bytes()) + .unwrap(); + temp_file.flush().unwrap(); + temp_file + } + + #[tokio::test] + async fn test_lazer_exporter() { + init_tracing(); + + let history_addr = "127.0.0.1:12345".parse().unwrap(); + tokio::spawn(async move { + run_mock_history_server(history_addr).await; + }); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let state = Arc::new(local::Store::new(&mut Registry::default())); + let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); + let private_key_file = get_private_key_file(); + + let config = Config { + history_url: Url::parse("http://127.0.0.1:12345").unwrap(), + relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()], + authorization_token: "token1".to_string(), + publish_keypair_path: PathBuf::from(private_key_file.path()), + publish_interval_duration: Duration::from_secs(1), + }; + tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender)); + + tokio::time::sleep(std::time::Duration::from_millis(2000)).await; + match relayer_receiver.try_recv() { + Err(TryRecvError::Empty) => (), + _ => panic!("channel should be empty"), + } + + let btc_id = pyth_sdk::Identifier::from_hex( + "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43", + ) + .unwrap(); + let price = PriceInfo { + status: PriceStatus::Trading, + price: 100_000_00000000i64, + conf: 1_00000000u64, + timestamp: Default::default(), + }; + state.update(btc_id, price).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(2000)).await; + match relayer_receiver.try_recv() { + Ok(transaction) => { + let lazer_transaction = + LazerTransaction::parse_from_bytes(transaction.payload.unwrap().as_slice()) + .unwrap(); + let publisher_update = + if let lazer_transaction::Payload::PublisherUpdate(publisher_update) = + lazer_transaction.payload.unwrap() + { + publisher_update + } else { + panic!("expected publisher_update") + }; + assert_eq!(publisher_update.updates.len(), 1); + let feed_update = &publisher_update.updates[0]; + assert_eq!(feed_update.feed_id, Some(1u32)); + let price_update = if let feed_update::Update::PriceUpdate(price_update) = + feed_update.clone().update.unwrap() + { + price_update + } else { + panic!("expected price_update") + }; + assert_eq!(price_update.price, Some(100_000_00000000i64)); + } + _ => panic!("channel should have a transaction waiting"), + } + } + + pub async fn run_mock_relayer( + addr: SocketAddr, + back_sender: mpsc::Sender, + ) { + let listener = TcpListener::bind(addr).await.unwrap(); + + tokio::spawn(async move { + let Ok((stream, _)) = listener.accept().await else { + panic!("failed to accept mock relayer websocket connection"); + }; + let ws_stream = tokio_tungstenite::accept_async(stream) + .await + .expect("handshake failed"); + let (_, mut read) = ws_stream.split(); + while let Some(msg) = read.next().await { + if let Ok(msg) = msg { + if msg.is_binary() { + tracing::info!("Received binary message: {msg:?}"); + let transaction = + SignedLazerTransaction::parse_from_bytes(msg.into_data().as_ref()) + .unwrap(); + back_sender.clone().send(transaction).await.unwrap(); + } + } else { + tracing::error!("Received a malformed message: {msg:?}"); + } + } + }); + } + + #[tokio::test] + async fn test_relayer_session() { + init_tracing(); + + let (back_sender, mut back_receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY); + let relayer_addr = "127.0.0.1:12346".parse().unwrap(); + run_mock_relayer(relayer_addr, back_sender).await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); + + let mut relayer_session_task = RelayerSessionTask { + // connection state + url: Url::parse("ws://127.0.0.1:12346").unwrap(), + token: "token1".to_string(), + receiver: relayer_receiver, + }; + tokio::spawn(async move { relayer_session_task.run().await }); + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + let transaction = get_signed_lazer_transaction(); + relayer_sender + .send(transaction.clone()) + .expect("relayer_sender.send failed"); + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + let received_transaction = back_receiver + .recv() + .await + .expect("back_receiver.recv failed"); + assert_eq!(transaction, received_transaction); + } + + fn get_signed_lazer_transaction() -> SignedLazerTransaction { + let publisher_update = PublisherUpdate { + updates: vec![FeedUpdate { + feed_id: Some(1), + source_timestamp: MessageField::some(Timestamp::now()), + update: Some(Update::PriceUpdate(PriceUpdate { + price: Some(1_000_000_000i64), + ..PriceUpdate::default() + })), + special_fields: Default::default(), + }], + publisher_timestamp: MessageField::some(Timestamp::now()), + special_fields: Default::default(), + }; + let lazer_transaction = LazerTransaction { + payload: Some(Payload::PublisherUpdate(publisher_update)), + special_fields: Default::default(), + }; + let buf = lazer_transaction.write_to_bytes().unwrap(); + let signing_key = get_private_key(); + let signature = signing_key.sign(&buf); + let signature_data = SignatureData { + data: Some(Ed25519(Ed25519SignatureData { + signature: Some(signature.to_bytes().into()), + public_key: Some(signing_key.verifying_key().to_bytes().into()), + special_fields: Default::default(), + })), + special_fields: Default::default(), + }; + SignedLazerTransaction { + signature_data: MessageField::some(signature_data), + payload: Some(buf), + special_fields: Default::default(), + } + } +}