Skip to content

pyth-lazer-agent fixes, tests, ci #2787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/workflows/ci-pyth-lazer-agent.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: "pyth-lazer-agent Rust Test Suite"
on:
push:
branches:
- main
pull_request:
paths:
- .github/workflows/ci-pyth-lazer-agent.yml
- apps/pyth-lazer-agent/**

jobs:
pyth-lazer-agent-rust-test-suite:
name: pyth-lazer-agent Rust Test Suite
runs-on: ubuntu-22.04
defaults:
run:
working-directory: apps/pyth-lazer-agent
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: 1.87.0
components: clippy,rustfmt
- uses: Swatinem/rust-cache@v2
with:
workspaces: "apps/pyth-lazer-agent -> target"
- name: Format check
run: cargo fmt --all -- --check
if: success() || failure()
- name: Clippy check
run: cargo clippy --all-targets -- --deny warnings
if: success() || failure()
- name: test
run: cargo test
if: success() || failure()
55 changes: 55 additions & 0 deletions .github/workflows/docker-pyth-lazer-agent.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
name: Build and Push pyth-lazer-agent Image
on:
push:
tags:
- pyth-lazer-agent-v*
pull_request:
paths:
- "apps/pyth-lazer-agent/**"
workflow_dispatch:
inputs:
dispatch_description:
description: "Dispatch description"
required: true
type: string
permissions:
contents: read
id-token: write
packages: write
env:
REGISTRY: ghcr.io
IMAGE_NAME: pyth-network/pyth-lazer-agent
jobs:
pyth-lazer-agent-image:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set image tag to version of the git tag
if: ${{ startsWith(github.ref, 'refs/tags/pyth-lazer-agent-v') }}
run: |
PREFIX="refs/tags/pyth-lazer-agent-"
VERSION="${GITHUB_REF:${#PREFIX}}"
echo "IMAGE_TAG=${VERSION}" >> "${GITHUB_ENV}"
- name: Set image tag to the git commit hash
if: ${{ !startsWith(github.ref, 'refs/tags/pyth-lazer-agent-v') }}
run: |
echo "IMAGE_TAG=${{ github.sha }}" >> "${GITHUB_ENV}"
- name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for Docker
id: metadata_pyth_lazer_agent
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Build and push server docker image
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
with:
context: .
file: "./apps/pyth-lazer-agent/Dockerfile"
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.metadata_pyth_lazer_agent.outputs.tags }}
labels: ${{ steps.metadata_pyth_lazer_agent.outputs.labels }}
File renamed without changes.
File renamed without changes.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pyth-lazer-protocol = "0.7.2"

anyhow = "1.0.98"
backoff = "0.4.0"
base64 = "0.22.1"
bincode = { version = "2.0.1", features = ["serde"] }
clap = { version = "4.5.32", features = ["derive"] }
config = "0.15.11"
Expand All @@ -32,3 +33,6 @@ tokio-util = { version = "0.7.14", features = ["compat"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }
url = { version = "2.5.4", features = ["serde"] }

[dev-dependencies]
tempfile = "3.20.0"
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ pub struct Config {
pub listen_address: SocketAddr,
pub relayer_urls: Vec<Url>,
#[derivative(Debug = "ignore")]
pub authorization_token: String,
#[derivative(Debug = "ignore")]
pub publish_keypair_path: PathBuf,
#[serde(with = "humantime_serde", default = "default_publish_interval")]
pub publish_interval_duration: Duration,
Expand Down
258 changes: 258 additions & 0 deletions apps/pyth-lazer-agent/src/lazer_publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
use crate::config::{CHANNEL_CAPACITY, Config};
use crate::relayer_session::RelayerSessionTask;
use anyhow::{Context, Result, bail};
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use ed25519_dalek::{Signer, SigningKey};
use protobuf::well_known_types::timestamp::Timestamp;
use protobuf::{Message, MessageField};
use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PublisherUpdate};
use pyth_lazer_publisher_sdk::transaction::lazer_transaction::Payload;
use pyth_lazer_publisher_sdk::transaction::signature_data::Data::Ed25519;
use pyth_lazer_publisher_sdk::transaction::{
Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
};
use solana_keypair::read_keypair_file;
use std::path::PathBuf;
use tokio::sync::broadcast;
use tokio::{
select,
sync::mpsc::{self, Receiver, Sender},
time::interval,
};
use tracing::error;

#[derive(Clone)]
pub struct LazerPublisher {
sender: Sender<FeedUpdate>,
}

impl LazerPublisher {
fn load_signing_key(publish_keypair_path: &PathBuf) -> Result<SigningKey> {
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
let publish_keypair = match read_keypair_file(publish_keypair_path) {
Ok(k) => k,
Err(e) => {
tracing::error!(
error = ?e,
publish_keypair_path = publish_keypair_path.display().to_string(),
"Reading publish keypair returned an error. ",
);
bail!("Reading publish keypair returned an error.");
}
};

SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
.context("Failed to create signing key from keypair")
}

pub async fn new(config: &Config) -> Self {
let signing_key = match Self::load_signing_key(&config.publish_keypair_path) {
Ok(signing_key) => signing_key,
Err(e) => {
tracing::error!("Failed to load signing key: {e:?}");
// Can't proceed on key failure
panic!("Failed to load signing key: {e:?}");
}
};

let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
for url in config.relayer_urls.iter() {
let mut task = RelayerSessionTask {
url: url.clone(),
token: BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes()),
receiver: relayer_sender.subscribe(),
};
tokio::spawn(async move { task.run().await });
}

let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
let mut task = LazerPublisherTask {
config: config.clone(),
receiver,
pending_updates: Vec::new(),
relayer_sender,
signing_key,
};
tokio::spawn(async move { task.run().await });
Self { sender }
}

pub async fn push_feed_update(&self, feed_update: FeedUpdate) -> Result<()> {
self.sender.send(feed_update).await?;
Ok(())
}
}

struct LazerPublisherTask {
// connection state
config: Config,
receiver: Receiver<FeedUpdate>,
pending_updates: Vec<FeedUpdate>,
relayer_sender: broadcast::Sender<SignedLazerTransaction>,
signing_key: SigningKey,
}

impl LazerPublisherTask {
pub async fn run(&mut self) {
let mut publish_interval = interval(self.config.publish_interval_duration);
loop {
select! {
Some(feed_update) = self.receiver.recv() => {
self.pending_updates.push(feed_update);
}
_ = publish_interval.tick() => {
if let Err(err) = self.batch_transaction().await {
error!("Failed to publish updates: {}", err);
}
}
}
}
}

async fn batch_transaction(&mut self) -> Result<()> {
if self.pending_updates.is_empty() {
return Ok(());
}

let publisher_update = PublisherUpdate {
updates: self.pending_updates.drain(..).collect(),
publisher_timestamp: MessageField::some(Timestamp::now()),
special_fields: Default::default(),
};
let lazer_transaction = LazerTransaction {
payload: Some(Payload::PublisherUpdate(publisher_update)),
special_fields: Default::default(),
};
let buf = match lazer_transaction.write_to_bytes() {
Ok(buf) => buf,
Err(e) => {
tracing::warn!("Failed to encode Lazer transaction to bytes: {:?}", e);
bail!("Failed to encode Lazer transaction")
}
};
let signature = self.signing_key.sign(&buf);
let signature_data = SignatureData {
data: Some(Ed25519(Ed25519SignatureData {
signature: Some(signature.to_bytes().into()),
public_key: Some(self.signing_key.verifying_key().to_bytes().into()),
special_fields: Default::default(),
})),
special_fields: Default::default(),
};
let signed_lazer_transaction = SignedLazerTransaction {
signature_data: MessageField::some(signature_data),
payload: Some(buf),
special_fields: Default::default(),
};
match self.relayer_sender.send(signed_lazer_transaction.clone()) {
Ok(_) => (),
Err(e) => {
tracing::error!("Error sending transaction to relayer receivers: {e}");
}
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::config::{CHANNEL_CAPACITY, Config};
use crate::lazer_publisher::LazerPublisherTask;
use ed25519_dalek::SigningKey;
use protobuf::well_known_types::timestamp::Timestamp;
use protobuf::{Message, MessageField};
use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update;
use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate};
use pyth_lazer_publisher_sdk::transaction::{LazerTransaction, lazer_transaction};
use std::io::Write;
use std::path::PathBuf;
use std::time::Duration;
use tempfile::NamedTempFile;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, mpsc};
use url::Url;

fn get_private_key() -> SigningKey {
SigningKey::from_keypair_bytes(&[
105, 175, 146, 91, 32, 145, 164, 199, 37, 111, 139, 255, 44, 225, 5, 247, 154, 170,
238, 70, 47, 15, 9, 48, 102, 87, 180, 50, 50, 38, 148, 243, 62, 148, 219, 72, 222, 170,
8, 246, 176, 33, 205, 29, 118, 11, 220, 163, 214, 204, 46, 49, 132, 94, 170, 173, 244,
39, 179, 211, 177, 70, 252, 31,
])
.unwrap()
}

fn get_private_key_file() -> NamedTempFile {
let private_key_string = "[105,175,146,91,32,145,164,199,37,111,139,255,44,225,5,247,154,170,238,70,47,15,9,48,102,87,180,50,50,38,148,243,62,148,219,72,222,170,8,246,176,33,205,29,118,11,220,163,214,204,46,49,132,94,170,173,244,39,179,211,177,70,252,31]";
let mut temp_file = NamedTempFile::new().unwrap();
temp_file
.as_file_mut()
.write(private_key_string.as_bytes())
.unwrap();
temp_file.flush().unwrap();
temp_file
}

#[tokio::test]
async fn test_lazer_exporter_task() {
let signing_key_file = get_private_key_file();
let signing_key = get_private_key();

let config = Config {
listen_address: "0.0.0.0:12345".parse().unwrap(),
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
publish_keypair_path: PathBuf::from(signing_key_file.path()),
publish_interval_duration: Duration::from_millis(25),
};

let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);
let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
let mut task = LazerPublisherTask {
config: config.clone(),
receiver,
pending_updates: Vec::new(),
relayer_sender,
signing_key,
};
tokio::spawn(async move { task.run().await });

tokio::time::sleep(std::time::Duration::from_millis(100)).await;
match relayer_receiver.try_recv() {
Err(TryRecvError::Empty) => (),
_ => panic!("channel should be empty"),
}

let feed_update = FeedUpdate {
feed_id: Some(1),
source_timestamp: MessageField::some(Timestamp::now()),
update: Some(Update::PriceUpdate(PriceUpdate {
price: Some(100_000_00000000),
..PriceUpdate::default()
})),
special_fields: Default::default(),
};
sender.send(feed_update.clone()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

match relayer_receiver.try_recv() {
Ok(transaction) => {
let lazer_transaction =
LazerTransaction::parse_from_bytes(transaction.payload.unwrap().as_slice())
.unwrap();
let publisher_update =
if let lazer_transaction::Payload::PublisherUpdate(publisher_update) =
lazer_transaction.payload.unwrap()
{
publisher_update
} else {
panic!("expected publisher_update")
};
assert_eq!(publisher_update.updates.len(), 1);
assert_eq!(publisher_update.updates[0], feed_update);
}
_ => panic!("channel should have a transaction waiting"),
}
}
}
File renamed without changes.
Loading
Loading