Skip to content

feat(lazer_exporter): Add lazer exporter #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,008 changes: 2,657 additions & 1,351 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,22 @@ pyth-sdk = "0.8.0"
pyth-sdk-solana = "0.10.4"
solana-account-decoder = "1.18.8"
solana-client = "1.18.8"
solana-pubkey = "2.3.0"
solana-sdk = "1.18.8"
bincode = "1.3.3"
bincode = { version = "2.0.1", features = ["serde"] }
rand = "0.8.5"
config = "0.14.0"
thiserror = "1.0.58"
clap = { version = "4.5.4", features = ["derive"] }
humantime-serde = "1.1.1"
serde-this-or-that = "0.4.2"
# The public typed-html 0.2.2 release is causing a recursion limit
# error that cannot be fixed from outside the crate.
#
# Rationale, 2023-03-21: https://stackoverflow.com/questions/74462753
typed-html = { git = "https://github.com/bodil/typed-html", rev = "4c13ecca" }
humantime = "2.1.0"
prometheus-client = "0.22.2"
lazy_static = "1.4.0"
toml_edit = "0.22.9"
winnow = "0.6.5"
proptest = "1.4.0"
reqwest = { version = "0.12.0", features = ["json"] }
smol_str = {version="0.3.2", features=["serde"]}
tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
Expand All @@ -55,9 +52,13 @@ opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"]}
opentelemetry-otlp = { version = "0.16.0" }
pyth-price-store = "0.1.0"
bytemuck = "1.13.0"
tokio-tungstenite = { version = "0.26.2", features = ["native-tls", "url"] }
http = "1.3.1"
url = { version = "2.5.4", features = ["serde"] }
pyth-lazer-protocol = "0.7.0"
tokio-util = { version = "0.7.14", features = ["full"] }

[dev-dependencies]
tokio-util = { version = "0.7.10", features = ["full"] }
soketto = "0.8.0"
portpicker = "0.1.1"
rand = "0.8.5"
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[toolchain]
channel = "1.79.0"
channel = "1.86.0"
profile = "minimal"
components = ["rustfmt", "clippy"]
8 changes: 8 additions & 0 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ impl Agent {
));
}

// Spawn the Lazer exporter
if let Some(lazer_config) = &self.config.pyth_lazer {
handles.extend(services::lazer_exporter(
lazer_config.clone(),
state.clone(),
));
}

// Create the Notifier task for the Pythd RPC.
handles.push(tokio::spawn(services::notifier(state.clone())));

Expand Down
1 change: 1 addition & 0 deletions src/agent/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct Config {
#[serde(default)]
pub remote_keypair_loader: services::keypairs::Config,
pub opentelemetry: Option<OpenTelemetryConfig>,
pub pyth_lazer: Option<services::lazer_exporter::Config>,
}

impl Config {
Expand Down
14 changes: 7 additions & 7 deletions src/agent/market_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use {
take,
take_till,
},
PResult,
ModalResult,
Parser,
},
};
Expand Down Expand Up @@ -109,7 +109,7 @@ impl MarketSchedule {
}
}

fn market_schedule_parser<'s>(input: &mut &'s str) -> PResult<MarketSchedule> {
fn market_schedule_parser<'s>(input: &mut &'s str) -> ModalResult<MarketSchedule> {
seq!(
MarketSchedule {
timezone: take_till(0.., ';').verify_map(|s| Tz::from_str(s).ok()),
Expand Down Expand Up @@ -157,13 +157,13 @@ pub struct HolidayDaySchedule {
pub kind: ScheduleDayKind,
}

fn two_digit_parser<'s>(input: &mut &'s str) -> PResult<u32> {
fn two_digit_parser<'s>(input: &mut &'s str) -> ModalResult<u32> {
take(2usize)
.verify_map(|s| u32::from_str(s).ok())
.parse_next(input)
}

fn holiday_day_schedule_parser<'s>(input: &mut &'s str) -> PResult<HolidayDaySchedule> {
fn holiday_day_schedule_parser<'s>(input: &mut &'s str) -> ModalResult<HolidayDaySchedule> {
// day and month are not validated to be correct dates
// if they are invalid, it will be ignored since there
// are no real dates that match the invalid input
Expand Down Expand Up @@ -235,7 +235,7 @@ impl Display for ScheduleDayKind {
}
}

fn time_parser(input: &mut &str) -> PResult<NaiveTime> {
fn time_parser(input: &mut &str) -> ModalResult<NaiveTime> {
alt(("2400", take(4usize)))
.verify_map(|time_str| match time_str {
"2400" => Some(MAX_TIME_INSTANT),
Expand All @@ -244,7 +244,7 @@ fn time_parser(input: &mut &str) -> PResult<NaiveTime> {
.parse_next(input)
}

fn time_range_parser(input: &mut &str) -> PResult<RangeInclusive<NaiveTime>> {
fn time_range_parser(input: &mut &str) -> ModalResult<RangeInclusive<NaiveTime>> {
seq!(
time_parser,
_: "-",
Expand All @@ -254,7 +254,7 @@ fn time_range_parser(input: &mut &str) -> PResult<RangeInclusive<NaiveTime>> {
.parse_next(input)
}

fn schedule_day_kind_parser(input: &mut &str) -> PResult<ScheduleDayKind> {
fn schedule_day_kind_parser(input: &mut &str) -> ModalResult<ScheduleDayKind> {
alt((
"C".map(|_| ScheduleDayKind::Closed),
"O".map(|_| ScheduleDayKind::Open),
Expand Down
2 changes: 2 additions & 0 deletions src/agent/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ pub mod exporter;
pub mod keypairs;
pub mod notifier;
pub mod oracle;
pub mod lazer_exporter;

pub use {
exporter::exporter,
lazer_exporter::lazer_exporter,
keypairs::keypairs,
notifier::notifier,
oracle::oracle,
Expand Down
212 changes: 212 additions & 0 deletions src/agent/services/lazer_exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use futures_util::SinkExt;
use futures_util::stream::{SplitSink, SplitStream, StreamExt};
use http::HeaderValue;
use pyth_lazer_protocol::publisher::PriceFeedDataV1;
use reqwest::Client;
use serde::Deserialize;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tracing::instrument;
use tracing;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{
connect_async_with_config,
tungstenite::{client::IntoClientRequest, Message},
MaybeTlsStream,
};
use tokio_util::bytes::{BufMut, BytesMut};
use url::Url;
use crate::agent::state;

#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub history_url: Url,
pub relayer_urls: Vec<Url>,
pub authorization_token: String,
#[serde(with = "humantime_serde")]
pub publish_interval_duration: Duration,
}

struct RelayerSender {
ws_senders: Vec<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
}

impl RelayerSender {
async fn send_price_update(&mut self, price_feed_data: &PriceFeedDataV1) -> Result<()> {
tracing::debug!("price_update: {:?}", price_feed_data);
let mut buf = BytesMut::new().writer();
bincode::serde::encode_into_std_write(
price_feed_data,
&mut buf,
bincode::config::legacy(),
)?;
let buf = Message::Binary(buf.into_inner().freeze());
for sender in self.ws_senders.iter_mut() {
sender.send(buf.clone()).await?;
sender.flush().await?;
}
Ok(())
}
}

async fn connect_to_relayer(
url: &Url,
token: &str,
) -> Result<(
SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
)> {
tracing::info!("connecting to the relayer at {}", url);
let mut req = url.clone().into_client_request()?;
let headers = req.headers_mut();
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bearer {}", token))?,
);
let (ws_stream, _) = connect_async_with_config(req, None, true).await?;
Ok(ws_stream.split())
}

async fn connect_to_relayers(
config: &Config,
) -> Result<(
RelayerSender,
Vec<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
)> {
let mut relayer_senders = Vec::new();
let mut relayer_receivers = Vec::new();
for url in config.relayer_urls.clone() {
let (relayer_sender, relayer_receiver) =
connect_to_relayer(&url, &config.authorization_token).await?;
relayer_senders.push(relayer_sender);
relayer_receivers.push(relayer_receiver);
}
let sender = RelayerSender { ws_senders: relayer_senders };
tracing::info!("connected to relayers: {:?}", config.relayer_urls);
Ok((sender, relayer_receivers))
}

#[derive(Deserialize)]
struct SymbolResponse {
pub pyth_lazer_id: u32,
pub name: String,
pub symbol: String,
pub description: String,
pub asset_type: String,
pub exponent: i32,
pub cmc_id: Option<u32>,
pub interval: Option<String>,
pub min_publishers: u16,
pub min_channel: String,
pub state: String,
pub hermes_id: Option<String>,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a leave a todo to move it to Lazer's protocol SDK?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the types inside are also not great. we have String for many things but they are more explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied from https://github.com/pyth-network/pyth-lazer/blob/main/history-service/src/api.rs#L91 ; sorry, not clear what they should be.


async fn fetch_symbols(
history_url: &Url,
token: &str,
) -> Result<Vec<SymbolResponse>> {
let mut url = history_url.clone();
url.set_scheme("http").unwrap();
url.set_path("/history/v1/symbols");
let client = Client::new();
let response = client
.get(url)
.header(
"Authorization",
HeaderValue::from_str(&format!("Bearer {}", token))?,
)
.send()
.await?
.text()
.await?;
Ok(serde_json::from_str(&response)?)
}

#[instrument(skip(config, state))]
pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>>
{
let mut handles = Vec::new();
handles.push(tokio::spawn(lazer_exporter::lazer_exporter(config.clone(), state)));
handles
}

mod lazer_exporter {
use std::collections::HashMap;
use std::num::NonZeroI64;
use std::sync::Arc;
use futures_util::StreamExt;
use pyth_lazer_protocol::publisher::PriceFeedDataV1;
use pyth_lazer_protocol::router::{Price, PriceFeedId, TimestampUs};
use tokio_stream::StreamMap;
use crate::agent::services::lazer_exporter::{Config, connect_to_relayers, fetch_symbols, SymbolResponse};
use crate::agent::state::local::LocalStore;

pub async fn lazer_exporter<S>(config: Config, state: Arc<S>)
where
S: LocalStore,
S: Send + Sync + 'static,
{
// TODO: Re-fetch on an interval?
let lazer_symbols: HashMap<String, SymbolResponse> = match fetch_symbols(&config.history_url, &config.authorization_token).await {
Ok(symbols) => symbols.into_iter().filter_map(|symbol| {
symbol.hermes_id.clone().map(|id| (id, symbol))
}).collect(),
Err(e) => {
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
return;
}
};

// Establish relayer connections
// Relayer will drop the connection if no data received in 5s
let (mut relayer_sender, relayer_receivers) = connect_to_relayers(&config)
.await.expect("failed to connect to relayers");
let mut stream_map = StreamMap::new();
for (i, receiver) in relayer_receivers.into_iter().enumerate() {
stream_map.insert(config.relayer_urls[i].clone(), receiver);
}

let mut publish_interval = tokio::time::interval(config.publish_interval_duration);

loop {
tokio::select! {
_ = publish_interval.tick() => {
for (identifier, price_info) in state.get_all_price_infos().await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we want to run this in high frequency i want to make sure this doesn't become expensive as it might copy the data.

if let Some(symbol) = lazer_symbols.get(&identifier.to_string()) {
if let Err(e) = relayer_sender.send_price_update(&PriceFeedDataV1 {
price: Some(Price(NonZeroI64::try_from(price_info.price).unwrap())),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the price is zero? are we sure it won't happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent question, no idea, and I was curious on this enforcement in the Lazer messages. In the general case there are instruments out there (e.g. futures spreads) that can have zero or negative prices, but in terms of anything that anyone will reasonably send to agent, perhaps just warning log and drop a zero price message?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we support negative but just not zero. This came from Pythnet.

What i like you to do is at least not crash the whole process when it happens and gracefully handle it instead.

best_ask_price: None,
best_bid_price: None,
Comment on lines +250 to +251
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use conf as bid and ask?
cc: @ali-bahjati

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. Typically conf (although we ask for it) is wider than bid-ask by publishers and it makes me a bit worried. let me ask internally.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i still prefer None here just because bid ask might not be symmetric. we can do it on a newer update protocol.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are calculating conf with bid/ask, this will be exactly what we are calculating in pythnet. But the bid and ask themselves might get skewed.

price_feed_id: PriceFeedId(symbol.pyth_lazer_id),
publisher_timestamp_us: TimestampUs::now(),
source_timestamp_us: TimestampUs(price_info.timestamp.and_utc().timestamp_micros() as u64),
}).await {
tracing::error!("Error sending price update to relayer: {e:?}");
return;
}
}
}
}
// Handle messages from the relayers, such as errors if we send a bad update
mapped_msg = stream_map.next() => {
match mapped_msg {
Some((relayer_url, Ok(msg))) => {
tracing::debug!("Received message from relayer at {relayer_url}: {msg:?}");
}
Some((relayer_url, Err(e))) => {
tracing::error!("Error receiving message from at relayer {relayer_url}: {e:?}");
}
None => {
tracing::error!("relayer connection closed");
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the thread halt when we get an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, was wondering if that should crash the agent altogether or have retries in-process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have a few retries before crashing altogether. Also I'm not sure if we should crash at all because for example if lazer goes down the publishers will stop publishing to pythnet too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @ali-bahjati

Copy link
Collaborator

@ali-behjati ali-behjati Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the behaviour on agent is to aggressively reconnect upon failures for Pythnet without crashing at all and i think it should be the same for Lazer. It's like a software you run once and it works all the time and should have lower operational overhead.

}
}
}
}
}
}
Loading
Loading