From 6fd4328f82647b2a88d9449192786f11f782c533 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Mon, 16 Jun 2025 04:30:46 -0500 Subject: [PATCH 1/4] pyth-lazer-agent fixes, tests, ci --- .github/workflows/ci-pyth-lazer-agent.yml | 37 +++ .github/workflows/docker-pyth-lazer-agent.yml | 55 ++++ .../pyth-lazer-agent}/.dockerignore | 0 .../pyth-lazer-agent}/.gitignore | 0 .../pyth-lazer-agent}/Cargo.lock | 2 + .../pyth-lazer-agent}/Cargo.toml | 4 + .../pyth-lazer-agent}/Dockerfile | 0 .../pyth-lazer-agent}/config/config.toml | 0 .../pyth-lazer-agent}/rust-toolchain.toml | 0 .../pyth-lazer-agent}/src/config.rs | 2 - .../pyth-lazer-agent}/src/http_server.rs | 0 apps/pyth-lazer-agent/src/lazer_publisher.rs | 258 ++++++++++++++++++ .../pyth-lazer-agent}/src/main.rs | 0 .../pyth-lazer-agent}/src/publisher_handle.rs | 0 .../pyth-lazer-agent}/src/relayer_session.rs | 128 +++++++++ .../pyth-lazer-agent}/src/websocket_utils.rs | 0 pyth-lazer-agent/src/lazer_publisher.rs | 152 ----------- 17 files changed, 484 insertions(+), 154 deletions(-) create mode 100644 .github/workflows/ci-pyth-lazer-agent.yml create mode 100644 .github/workflows/docker-pyth-lazer-agent.yml rename {pyth-lazer-agent => apps/pyth-lazer-agent}/.dockerignore (100%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/.gitignore (100%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/Cargo.lock (99%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/Cargo.toml (95%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/Dockerfile (100%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/config/config.toml (100%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/rust-toolchain.toml (100%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/src/config.rs (93%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/src/http_server.rs (100%) create mode 100644 apps/pyth-lazer-agent/src/lazer_publisher.rs rename {pyth-lazer-agent => apps/pyth-lazer-agent}/src/main.rs (100%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/src/publisher_handle.rs (100%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/src/relayer_session.rs (52%) rename {pyth-lazer-agent => apps/pyth-lazer-agent}/src/websocket_utils.rs (100%) delete mode 100644 pyth-lazer-agent/src/lazer_publisher.rs diff --git a/.github/workflows/ci-pyth-lazer-agent.yml b/.github/workflows/ci-pyth-lazer-agent.yml new file mode 100644 index 0000000000..722e55b52f --- /dev/null +++ b/.github/workflows/ci-pyth-lazer-agent.yml @@ -0,0 +1,37 @@ +name: "pyth-lazer-agent Rust Test Suite" +on: + push: + branches: + - main + pull_request: + paths: + - .github/workflows/ci-pyth-lazer-agent.yml + - apps/pyth-lazer-agent/** + +jobs: + pyth-lazer-agent-rust-test-suite: + name: pyth-lazer-agent Rust Test Suite + runs-on: ubuntu-22.04 + defaults: + run: + working-directory: apps/pyth-lazer-agent + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + - uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: 1.87.0 + components: clippy,rustfmt + - uses: Swatinem/rust-cache@v2 + with: + workspaces: "apps/pyth-lazer-agent -> target" + - name: Format check + run: cargo fmt --all -- --check + if: success() || failure() + - name: Clippy check + run: cargo clippy --all-targets -- --deny warnings + if: success() || failure() + - name: test + run: cargo test + if: success() || failure() diff --git a/.github/workflows/docker-pyth-lazer-agent.yml b/.github/workflows/docker-pyth-lazer-agent.yml new file mode 100644 index 0000000000..46a78051f0 --- /dev/null +++ b/.github/workflows/docker-pyth-lazer-agent.yml @@ -0,0 +1,55 @@ +name: Build and Push pyth-lazer-agent Image +on: + push: + tags: + - pyth-lazer-agent-v* + pull_request: + paths: + - "apps/pyth-lazer-agent/**" + workflow_dispatch: + inputs: + dispatch_description: + description: "Dispatch description" + required: true + type: string +permissions: + contents: read + id-token: write + packages: write +env: + REGISTRY: ghcr.io + IMAGE_NAME: pyth-network/pyth-lazer-agent +jobs: + pyth-lazer-agent-image: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set image tag to version of the git tag + if: ${{ startsWith(github.ref, 'refs/tags/pyth-lazer-agent-v') }} + run: | + PREFIX="refs/tags/pyth-lazer-agent-" + VERSION="${GITHUB_REF:${#PREFIX}}" + echo "IMAGE_TAG=${VERSION}" >> "${GITHUB_ENV}" + - name: Set image tag to the git commit hash + if: ${{ !startsWith(github.ref, 'refs/tags/pyth-lazer-agent-v') }} + run: | + echo "IMAGE_TAG=${{ github.sha }}" >> "${GITHUB_ENV}" + - name: Log in to the Container registry + uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Extract metadata (tags, labels) for Docker + id: metadata_pyth_lazer_agent + uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + - name: Build and push server docker image + uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4 + with: + context: . + file: "./apps/pyth-lazer-agent/Dockerfile" + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.metadata_pyth_lazer_agent.outputs.tags }} + labels: ${{ steps.metadata_pyth_lazer_agent.outputs.labels }} diff --git a/pyth-lazer-agent/.dockerignore b/apps/pyth-lazer-agent/.dockerignore similarity index 100% rename from pyth-lazer-agent/.dockerignore rename to apps/pyth-lazer-agent/.dockerignore diff --git a/pyth-lazer-agent/.gitignore b/apps/pyth-lazer-agent/.gitignore similarity index 100% rename from pyth-lazer-agent/.gitignore rename to apps/pyth-lazer-agent/.gitignore diff --git a/pyth-lazer-agent/Cargo.lock b/apps/pyth-lazer-agent/Cargo.lock similarity index 99% rename from pyth-lazer-agent/Cargo.lock rename to apps/pyth-lazer-agent/Cargo.lock index 838fcb5c4a..fe69988790 100644 --- a/pyth-lazer-agent/Cargo.lock +++ b/apps/pyth-lazer-agent/Cargo.lock @@ -1660,6 +1660,7 @@ version = "0.1.0" dependencies = [ "anyhow", "backoff", + "base64 0.22.1", "bincode", "clap", "config", @@ -1679,6 +1680,7 @@ dependencies = [ "serde_json", "soketto", "solana-keypair", + "tempfile", "tokio", "tokio-tungstenite", "tokio-util", diff --git a/pyth-lazer-agent/Cargo.toml b/apps/pyth-lazer-agent/Cargo.toml similarity index 95% rename from pyth-lazer-agent/Cargo.toml rename to apps/pyth-lazer-agent/Cargo.toml index badb901217..f6a5b713a8 100644 --- a/pyth-lazer-agent/Cargo.toml +++ b/apps/pyth-lazer-agent/Cargo.toml @@ -9,6 +9,7 @@ pyth-lazer-protocol = "0.7.2" anyhow = "1.0.98" backoff = "0.4.0" +base64 = "0.22.1" bincode = { version = "2.0.1", features = ["serde"] } clap = { version = "4.5.32", features = ["derive"] } config = "0.15.11" @@ -32,3 +33,6 @@ tokio-util = { version = "0.7.14", features = ["compat"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] } url = { version = "2.5.4", features = ["serde"] } + +[dev-dependencies] +tempfile = "3.20.0" diff --git a/pyth-lazer-agent/Dockerfile b/apps/pyth-lazer-agent/Dockerfile similarity index 100% rename from pyth-lazer-agent/Dockerfile rename to apps/pyth-lazer-agent/Dockerfile diff --git a/pyth-lazer-agent/config/config.toml b/apps/pyth-lazer-agent/config/config.toml similarity index 100% rename from pyth-lazer-agent/config/config.toml rename to apps/pyth-lazer-agent/config/config.toml diff --git a/pyth-lazer-agent/rust-toolchain.toml b/apps/pyth-lazer-agent/rust-toolchain.toml similarity index 100% rename from pyth-lazer-agent/rust-toolchain.toml rename to apps/pyth-lazer-agent/rust-toolchain.toml diff --git a/pyth-lazer-agent/src/config.rs b/apps/pyth-lazer-agent/src/config.rs similarity index 93% rename from pyth-lazer-agent/src/config.rs rename to apps/pyth-lazer-agent/src/config.rs index aed33dc4bf..3453859b73 100644 --- a/pyth-lazer-agent/src/config.rs +++ b/apps/pyth-lazer-agent/src/config.rs @@ -13,8 +13,6 @@ pub struct Config { pub listen_address: SocketAddr, pub relayer_urls: Vec, #[derivative(Debug = "ignore")] - pub authorization_token: String, - #[derivative(Debug = "ignore")] pub publish_keypair_path: PathBuf, #[serde(with = "humantime_serde", default = "default_publish_interval")] pub publish_interval_duration: Duration, diff --git a/pyth-lazer-agent/src/http_server.rs b/apps/pyth-lazer-agent/src/http_server.rs similarity index 100% rename from pyth-lazer-agent/src/http_server.rs rename to apps/pyth-lazer-agent/src/http_server.rs diff --git a/apps/pyth-lazer-agent/src/lazer_publisher.rs b/apps/pyth-lazer-agent/src/lazer_publisher.rs new file mode 100644 index 0000000000..4523f8e007 --- /dev/null +++ b/apps/pyth-lazer-agent/src/lazer_publisher.rs @@ -0,0 +1,258 @@ +use crate::config::{CHANNEL_CAPACITY, Config}; +use crate::relayer_session::RelayerSessionTask; +use anyhow::{Context, Result, bail}; +use base64::Engine; +use base64::prelude::BASE64_STANDARD; +use ed25519_dalek::{Signer, SigningKey}; +use protobuf::well_known_types::timestamp::Timestamp; +use protobuf::{Message, MessageField}; +use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PublisherUpdate}; +use pyth_lazer_publisher_sdk::transaction::lazer_transaction::Payload; +use pyth_lazer_publisher_sdk::transaction::signature_data::Data::Ed25519; +use pyth_lazer_publisher_sdk::transaction::{ + Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction, +}; +use solana_keypair::read_keypair_file; +use std::path::PathBuf; +use tokio::sync::broadcast; +use tokio::{ + select, + sync::mpsc::{self, Receiver, Sender}, + time::interval, +}; +use tracing::error; + +#[derive(Clone)] +pub struct LazerPublisher { + sender: Sender, +} + +impl LazerPublisher { + fn load_signing_key(publish_keypair_path: &PathBuf) -> Result { + // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher + let publish_keypair = match read_keypair_file(publish_keypair_path) { + Ok(k) => k, + Err(e) => { + tracing::error!( + error = ?e, + publish_keypair_path = publish_keypair_path.display().to_string(), + "Reading publish keypair returned an error. ", + ); + bail!("Reading publish keypair returned an error."); + } + }; + + SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) + .context("Failed to create signing key from keypair") + } + + pub async fn new(config: &Config) -> Self { + let signing_key = match Self::load_signing_key(&config.publish_keypair_path) { + Ok(signing_key) => signing_key, + Err(e) => { + tracing::error!("Failed to load signing key: {e:?}"); + // Can't proceed on key failure + panic!("Failed to load signing key: {e:?}"); + } + }; + + let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY); + for url in config.relayer_urls.iter() { + let mut task = RelayerSessionTask { + url: url.clone(), + token: BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes()), + receiver: relayer_sender.subscribe(), + }; + tokio::spawn(async move { task.run().await }); + } + + let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY); + let mut task = LazerPublisherTask { + config: config.clone(), + receiver, + pending_updates: Vec::new(), + relayer_sender, + signing_key, + }; + tokio::spawn(async move { task.run().await }); + Self { sender } + } + + pub async fn push_feed_update(&self, feed_update: FeedUpdate) -> Result<()> { + self.sender.send(feed_update).await?; + Ok(()) + } +} + +struct LazerPublisherTask { + // connection state + config: Config, + receiver: Receiver, + pending_updates: Vec, + relayer_sender: broadcast::Sender, + signing_key: SigningKey, +} + +impl LazerPublisherTask { + pub async fn run(&mut self) { + let mut publish_interval = interval(self.config.publish_interval_duration); + loop { + select! { + Some(feed_update) = self.receiver.recv() => { + self.pending_updates.push(feed_update); + } + _ = publish_interval.tick() => { + if let Err(err) = self.batch_transaction().await { + error!("Failed to publish updates: {}", err); + } + } + } + } + } + + async fn batch_transaction(&mut self) -> Result<()> { + if self.pending_updates.is_empty() { + return Ok(()); + } + + let publisher_update = PublisherUpdate { + updates: self.pending_updates.drain(..).collect(), + publisher_timestamp: MessageField::some(Timestamp::now()), + special_fields: Default::default(), + }; + let lazer_transaction = LazerTransaction { + payload: Some(Payload::PublisherUpdate(publisher_update)), + special_fields: Default::default(), + }; + let buf = match lazer_transaction.write_to_bytes() { + Ok(buf) => buf, + Err(e) => { + tracing::warn!("Failed to encode Lazer transaction to bytes: {:?}", e); + bail!("Failed to encode Lazer transaction") + } + }; + let signature = self.signing_key.sign(&buf); + let signature_data = SignatureData { + data: Some(Ed25519(Ed25519SignatureData { + signature: Some(signature.to_bytes().into()), + public_key: Some(self.signing_key.verifying_key().to_bytes().into()), + special_fields: Default::default(), + })), + special_fields: Default::default(), + }; + let signed_lazer_transaction = SignedLazerTransaction { + signature_data: MessageField::some(signature_data), + payload: Some(buf), + special_fields: Default::default(), + }; + match self.relayer_sender.send(signed_lazer_transaction.clone()) { + Ok(_) => (), + Err(e) => { + tracing::error!("Error sending transaction to relayer receivers: {e}"); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::config::{CHANNEL_CAPACITY, Config}; + use crate::lazer_publisher::LazerPublisherTask; + use ed25519_dalek::SigningKey; + use protobuf::well_known_types::timestamp::Timestamp; + use protobuf::{Message, MessageField}; + use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update; + use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate}; + use pyth_lazer_publisher_sdk::transaction::{LazerTransaction, lazer_transaction}; + use std::io::Write; + use std::path::PathBuf; + use std::time::Duration; + use tempfile::NamedTempFile; + use tokio::sync::broadcast::error::TryRecvError; + use tokio::sync::{broadcast, mpsc}; + use url::Url; + + fn get_private_key() -> SigningKey { + SigningKey::from_keypair_bytes(&[ + 105, 175, 146, 91, 32, 145, 164, 199, 37, 111, 139, 255, 44, 225, 5, 247, 154, 170, + 238, 70, 47, 15, 9, 48, 102, 87, 180, 50, 50, 38, 148, 243, 62, 148, 219, 72, 222, 170, + 8, 246, 176, 33, 205, 29, 118, 11, 220, 163, 214, 204, 46, 49, 132, 94, 170, 173, 244, + 39, 179, 211, 177, 70, 252, 31, + ]) + .unwrap() + } + + fn get_private_key_file() -> NamedTempFile { + let private_key_string = "[105,175,146,91,32,145,164,199,37,111,139,255,44,225,5,247,154,170,238,70,47,15,9,48,102,87,180,50,50,38,148,243,62,148,219,72,222,170,8,246,176,33,205,29,118,11,220,163,214,204,46,49,132,94,170,173,244,39,179,211,177,70,252,31]"; + let mut temp_file = NamedTempFile::new().unwrap(); + temp_file + .as_file_mut() + .write(private_key_string.as_bytes()) + .unwrap(); + temp_file.flush().unwrap(); + temp_file + } + + #[tokio::test] + async fn test_lazer_exporter_task() { + let signing_key_file = get_private_key_file(); + let signing_key = get_private_key(); + + let config = Config { + listen_address: "0.0.0.0:12345".parse().unwrap(), + relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()], + publish_keypair_path: PathBuf::from(signing_key_file.path()), + publish_interval_duration: Duration::from_millis(25), + }; + + let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY); + let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY); + let mut task = LazerPublisherTask { + config: config.clone(), + receiver, + pending_updates: Vec::new(), + relayer_sender, + signing_key, + }; + tokio::spawn(async move { task.run().await }); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + match relayer_receiver.try_recv() { + Err(TryRecvError::Empty) => (), + _ => panic!("channel should be empty"), + } + + let feed_update = FeedUpdate { + feed_id: Some(1), + source_timestamp: MessageField::some(Timestamp::now()), + update: Some(Update::PriceUpdate(PriceUpdate { + price: Some(100_000_00000000), + ..PriceUpdate::default() + })), + special_fields: Default::default(), + }; + sender.send(feed_update.clone()).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + match relayer_receiver.try_recv() { + Ok(transaction) => { + let lazer_transaction = + LazerTransaction::parse_from_bytes(transaction.payload.unwrap().as_slice()) + .unwrap(); + let publisher_update = + if let lazer_transaction::Payload::PublisherUpdate(publisher_update) = + lazer_transaction.payload.unwrap() + { + publisher_update + } else { + panic!("expected publisher_update") + }; + assert_eq!(publisher_update.updates.len(), 1); + assert_eq!(publisher_update.updates[0], feed_update); + } + _ => panic!("channel should have a transaction waiting"), + } + } +} diff --git a/pyth-lazer-agent/src/main.rs b/apps/pyth-lazer-agent/src/main.rs similarity index 100% rename from pyth-lazer-agent/src/main.rs rename to apps/pyth-lazer-agent/src/main.rs diff --git a/pyth-lazer-agent/src/publisher_handle.rs b/apps/pyth-lazer-agent/src/publisher_handle.rs similarity index 100% rename from pyth-lazer-agent/src/publisher_handle.rs rename to apps/pyth-lazer-agent/src/publisher_handle.rs diff --git a/pyth-lazer-agent/src/relayer_session.rs b/apps/pyth-lazer-agent/src/relayer_session.rs similarity index 52% rename from pyth-lazer-agent/src/relayer_session.rs rename to apps/pyth-lazer-agent/src/relayer_session.rs index 1b1339442f..3f09b04279 100644 --- a/pyth-lazer-agent/src/relayer_session.rs +++ b/apps/pyth-lazer-agent/src/relayer_session.rs @@ -157,3 +157,131 @@ impl RelayerSessionTask { } } } + +#[cfg(test)] +mod tests { + use crate::relayer_session::RelayerSessionTask; + use ed25519_dalek::{Signer, SigningKey}; + use futures_util::StreamExt; + use protobuf::well_known_types::timestamp::Timestamp; + use protobuf::{Message, MessageField}; + use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update; + use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate, PublisherUpdate}; + use pyth_lazer_publisher_sdk::transaction::lazer_transaction::Payload; + use pyth_lazer_publisher_sdk::transaction::signature_data::Data::Ed25519; + use pyth_lazer_publisher_sdk::transaction::{ + Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction, + }; + use std::net::SocketAddr; + use tokio::net::TcpListener; + use tokio::sync::{broadcast, mpsc}; + use url::Url; + + pub const RELAYER_CHANNEL_CAPACITY: usize = 1000; + + fn get_private_key() -> SigningKey { + SigningKey::from_keypair_bytes(&[ + 105, 175, 146, 91, 32, 145, 164, 199, 37, 111, 139, 255, 44, 225, 5, 247, 154, 170, + 238, 70, 47, 15, 9, 48, 102, 87, 180, 50, 50, 38, 148, 243, 62, 148, 219, 72, 222, 170, + 8, 246, 176, 33, 205, 29, 118, 11, 220, 163, 214, 204, 46, 49, 132, 94, 170, 173, 244, + 39, 179, 211, 177, 70, 252, 31, + ]) + .unwrap() + } + + pub async fn run_mock_relayer( + addr: SocketAddr, + back_sender: mpsc::Sender, + ) { + let listener = TcpListener::bind(addr).await.unwrap(); + + tokio::spawn(async move { + let Ok((stream, _)) = listener.accept().await else { + panic!("failed to accept mock relayer websocket connection"); + }; + let ws_stream = tokio_tungstenite::accept_async(stream) + .await + .expect("handshake failed"); + let (_, mut read) = ws_stream.split(); + while let Some(msg) = read.next().await { + if let Ok(msg) = msg { + if msg.is_binary() { + tracing::info!("Received binary message: {msg:?}"); + let transaction = + SignedLazerTransaction::parse_from_bytes(msg.into_data().as_ref()) + .unwrap(); + back_sender.clone().send(transaction).await.unwrap(); + } + } else { + tracing::error!("Received a malformed message: {msg:?}"); + } + } + }); + } + + #[tokio::test] + async fn test_relayer_session() { + let (back_sender, mut back_receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY); + let relayer_addr = "127.0.0.1:12346".parse().unwrap(); + run_mock_relayer(relayer_addr, back_sender).await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); + + let mut relayer_session_task = RelayerSessionTask { + // connection state + url: Url::parse("ws://127.0.0.1:12346").unwrap(), + token: "token1".to_string(), + receiver: relayer_receiver, + }; + tokio::spawn(async move { relayer_session_task.run().await }); + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + let transaction = get_signed_lazer_transaction(); + relayer_sender + .send(transaction.clone()) + .expect("relayer_sender.send failed"); + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + let received_transaction = back_receiver + .recv() + .await + .expect("back_receiver.recv failed"); + assert_eq!(transaction, received_transaction); + } + + fn get_signed_lazer_transaction() -> SignedLazerTransaction { + let publisher_update = PublisherUpdate { + updates: vec![FeedUpdate { + feed_id: Some(1), + source_timestamp: MessageField::some(Timestamp::now()), + update: Some(Update::PriceUpdate(PriceUpdate { + price: Some(1_000_000_000i64), + ..PriceUpdate::default() + })), + special_fields: Default::default(), + }], + publisher_timestamp: MessageField::some(Timestamp::now()), + special_fields: Default::default(), + }; + let lazer_transaction = LazerTransaction { + payload: Some(Payload::PublisherUpdate(publisher_update)), + special_fields: Default::default(), + }; + let buf = lazer_transaction.write_to_bytes().unwrap(); + let signing_key = get_private_key(); + let signature = signing_key.sign(&buf); + let signature_data = SignatureData { + data: Some(Ed25519(Ed25519SignatureData { + signature: Some(signature.to_bytes().into()), + public_key: Some(signing_key.verifying_key().to_bytes().into()), + special_fields: Default::default(), + })), + special_fields: Default::default(), + }; + SignedLazerTransaction { + signature_data: MessageField::some(signature_data), + payload: Some(buf), + special_fields: Default::default(), + } + } +} diff --git a/pyth-lazer-agent/src/websocket_utils.rs b/apps/pyth-lazer-agent/src/websocket_utils.rs similarity index 100% rename from pyth-lazer-agent/src/websocket_utils.rs rename to apps/pyth-lazer-agent/src/websocket_utils.rs diff --git a/pyth-lazer-agent/src/lazer_publisher.rs b/pyth-lazer-agent/src/lazer_publisher.rs deleted file mode 100644 index 1e95145feb..0000000000 --- a/pyth-lazer-agent/src/lazer_publisher.rs +++ /dev/null @@ -1,152 +0,0 @@ -use crate::config::{CHANNEL_CAPACITY, Config}; -use crate::relayer_session::RelayerSessionTask; -use anyhow::{Context, Result, bail}; -use ed25519_dalek::{Signer, SigningKey}; -use protobuf::well_known_types::timestamp::Timestamp; -use protobuf::{Message, MessageField}; -use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PublisherUpdate}; -use pyth_lazer_publisher_sdk::transaction::lazer_transaction::Payload; -use pyth_lazer_publisher_sdk::transaction::signature_data::Data::Ed25519; -use pyth_lazer_publisher_sdk::transaction::{ - Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction, -}; -use solana_keypair::read_keypair_file; -use tokio::sync::broadcast; -use tokio::{ - select, - sync::mpsc::{self, Receiver, Sender}, - time::interval, -}; -use tracing::error; - -#[derive(Clone)] -pub struct LazerPublisher { - sender: Sender, -} - -impl LazerPublisher { - pub async fn new(config: &Config) -> Self { - let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY); - for url in config.relayer_urls.iter() { - let mut task = RelayerSessionTask { - url: url.clone(), - token: config.authorization_token.to_owned(), - receiver: relayer_sender.subscribe(), - }; - tokio::spawn(async move { task.run().await }); - } - - let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY); - let mut task = LazerPublisherTask { - config: config.clone(), - receiver, - pending_updates: Vec::new(), - relayer_sender, - }; - tokio::spawn(async move { task.run().await }); - Self { sender } - } - - pub async fn push_feed_update(&self, feed_update: FeedUpdate) -> Result<()> { - self.sender.send(feed_update).await?; - Ok(()) - } -} - -struct LazerPublisherTask { - // connection state - config: Config, - receiver: Receiver, - pending_updates: Vec, - relayer_sender: broadcast::Sender, -} - -impl LazerPublisherTask { - fn load_signing_key(&self) -> Result { - // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher - let publish_keypair = match read_keypair_file(&self.config.publish_keypair_path) { - Ok(k) => k, - Err(e) => { - tracing::error!( - error = ?e, - publish_keypair_path = self.config.publish_keypair_path.display().to_string(), - "Reading publish keypair returned an error. ", - ); - bail!("Reading publish keypair returned an error."); - } - }; - - SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) - .context("Failed to create signing key from keypair") - } - - pub async fn run(&mut self) { - let signing_key = match self.load_signing_key() { - Ok(signing_key) => signing_key, - Err(e) => { - tracing::error!("Failed to load signing key: {e:?}"); - // Can't proceed on key failure - panic!("Failed to load signing key: {e:?}"); - } - }; - - let mut publish_interval = interval(self.config.publish_interval_duration); - loop { - select! { - Some(feed_update) = self.receiver.recv() => { - self.pending_updates.push(feed_update); - } - _ = publish_interval.tick() => { - if let Err(err) = self.batch_transaction(&signing_key).await { - error!("Failed to publish updates: {}", err); - } - } - } - } - } - - async fn batch_transaction(&mut self, signing_key: &SigningKey) -> Result<()> { - if self.pending_updates.is_empty() { - return Ok(()); - } - - let publisher_update = PublisherUpdate { - updates: self.pending_updates.drain(..).collect(), - publisher_timestamp: MessageField::some(Timestamp::now()), - special_fields: Default::default(), - }; - let lazer_transaction = LazerTransaction { - payload: Some(Payload::PublisherUpdate(publisher_update)), - special_fields: Default::default(), - }; - let buf = match lazer_transaction.write_to_bytes() { - Ok(buf) => buf, - Err(e) => { - tracing::warn!("Failed to encode Lazer transaction to bytes: {:?}", e); - bail!("Failed to encode Lazer transaction") - } - }; - let signature = signing_key.sign(&buf); - let signature_data = SignatureData { - data: Some(Ed25519(Ed25519SignatureData { - signature: Some(signature.to_bytes().into()), - public_key: Some(signing_key.verifying_key().to_bytes().into()), - special_fields: Default::default(), - })), - special_fields: Default::default(), - }; - let signed_lazer_transaction = SignedLazerTransaction { - signature_data: MessageField::some(signature_data), - payload: Some(buf), - special_fields: Default::default(), - }; - match self.relayer_sender.send(signed_lazer_transaction.clone()) { - Ok(_) => (), - Err(e) => { - tracing::error!("Error sending transaction to relayer receivers: {e}"); - } - } - - Ok(()) - } -} From 571703956bc83fbc64c7ef7d31d9ed8582eebe8e Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Mon, 16 Jun 2025 12:43:05 -0500 Subject: [PATCH 2/4] clippy fixes --- apps/pyth-lazer-agent/src/lazer_publisher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/pyth-lazer-agent/src/lazer_publisher.rs b/apps/pyth-lazer-agent/src/lazer_publisher.rs index 4523f8e007..9edf10b76b 100644 --- a/apps/pyth-lazer-agent/src/lazer_publisher.rs +++ b/apps/pyth-lazer-agent/src/lazer_publisher.rs @@ -189,7 +189,7 @@ mod tests { let mut temp_file = NamedTempFile::new().unwrap(); temp_file .as_file_mut() - .write(private_key_string.as_bytes()) + .write_all(private_key_string.as_bytes()) .unwrap(); temp_file.flush().unwrap(); temp_file @@ -228,7 +228,7 @@ mod tests { feed_id: Some(1), source_timestamp: MessageField::some(Timestamp::now()), update: Some(Update::PriceUpdate(PriceUpdate { - price: Some(100_000_00000000), + price: Some(100_000 * 100_000_000), ..PriceUpdate::default() })), special_fields: Default::default(), From 810da2447c0a73a4a603ef661b160358422f015d Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Mon, 16 Jun 2025 12:55:07 -0500 Subject: [PATCH 3/4] relative path fix in Dockerfile --- apps/pyth-lazer-agent/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/pyth-lazer-agent/Dockerfile b/apps/pyth-lazer-agent/Dockerfile index 47ef128813..f1260bb801 100644 --- a/apps/pyth-lazer-agent/Dockerfile +++ b/apps/pyth-lazer-agent/Dockerfile @@ -2,7 +2,7 @@ FROM rust:slim-bookworm AS builder RUN apt update && apt install -y curl libssl-dev pkg-config build-essential && apt clean all -ADD . /pyth-lazer-agent +ADD apps/pyth-lazer-agent /pyth-lazer-agent WORKDIR /pyth-lazer-agent RUN cargo build --release From d6e930a9cb912ec998976628cd2b9bdb9ff14aea Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Mon, 16 Jun 2025 13:02:27 -0500 Subject: [PATCH 4/4] fix config --- apps/pyth-lazer-agent/config/config.toml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/pyth-lazer-agent/config/config.toml b/apps/pyth-lazer-agent/config/config.toml index 18d8481d12..d75a708880 100644 --- a/apps/pyth-lazer-agent/config/config.toml +++ b/apps/pyth-lazer-agent/config/config.toml @@ -1,5 +1,4 @@ relayer_urls = ["ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction", "ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction"] -authorization_token = "token1" publish_keypair_path = "/path/to/solana/id.json" -listen_address = "0.0.0.0:1776" -publish_interval_duration = "0.5ms" +listen_address = "0.0.0.0:8910" +publish_interval_duration = "25ms"