-
Notifications
You must be signed in to change notification settings - Fork 19
feat(lazer_exporter): Add lazer exporter #155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
b89ffc3
8c8a20c
cdd2caf
9742119
944551e
b0b9134
c4fad12
def7899
cf19a7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
[toolchain] | ||
channel = "1.79.0" | ||
channel = "1.86.0" | ||
profile = "minimal" | ||
components = ["rustfmt", "clippy"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
use anyhow::Result; | ||
use futures_util::SinkExt; | ||
use futures_util::stream::{SplitSink, SplitStream, StreamExt}; | ||
use http::HeaderValue; | ||
use pyth_lazer_protocol::publisher::PriceFeedDataV1; | ||
use reqwest::Client; | ||
use serde::Deserialize; | ||
use tokio::net::TcpStream; | ||
use tokio::task::JoinHandle; | ||
use tracing::instrument; | ||
use tracing; | ||
use tokio_tungstenite::WebSocketStream; | ||
use tokio_tungstenite::{ | ||
connect_async_with_config, | ||
tungstenite::{client::IntoClientRequest, Message}, | ||
MaybeTlsStream, | ||
}; | ||
use tokio_util::bytes::{BufMut, BytesMut}; | ||
use url::Url; | ||
use crate::agent::state; | ||
|
||
#[derive(Clone, Debug, Deserialize)] | ||
pub struct Config { | ||
pub history_url: Url, | ||
pub relayer_urls: Vec<Url>, | ||
pub authorization_token: String, | ||
#[serde(with = "humantime_serde")] | ||
pub publish_interval_duration: Duration, | ||
} | ||
|
||
struct RelayerSender { | ||
ws_senders: Vec<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>, | ||
} | ||
|
||
impl RelayerSender { | ||
async fn send_price_update(&mut self, price_feed_data: &PriceFeedDataV1) -> Result<()> { | ||
tracing::debug!("price_update: {:?}", price_feed_data); | ||
let mut buf = BytesMut::new().writer(); | ||
bincode::serde::encode_into_std_write( | ||
price_feed_data, | ||
&mut buf, | ||
bincode::config::legacy(), | ||
)?; | ||
let buf = Message::Binary(buf.into_inner().freeze()); | ||
for sender in self.ws_senders.iter_mut() { | ||
sender.send(buf.clone()).await?; | ||
sender.flush().await?; | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
async fn connect_to_relayer( | ||
url: &Url, | ||
token: &str, | ||
) -> Result<( | ||
SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>, | ||
SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, | ||
)> { | ||
tracing::info!("connecting to the relayer at {}", url); | ||
let mut req = url.clone().into_client_request()?; | ||
let headers = req.headers_mut(); | ||
headers.insert( | ||
"Authorization", | ||
HeaderValue::from_str(&format!("Bearer {}", token))?, | ||
); | ||
let (ws_stream, _) = connect_async_with_config(req, None, true).await?; | ||
Ok(ws_stream.split()) | ||
} | ||
|
||
async fn connect_to_relayers( | ||
config: &Config, | ||
) -> Result<( | ||
RelayerSender, | ||
Vec<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>, | ||
)> { | ||
let mut relayer_senders = Vec::new(); | ||
let mut relayer_receivers = Vec::new(); | ||
for url in config.relayer_urls.clone() { | ||
let (relayer_sender, relayer_receiver) = | ||
connect_to_relayer(&url, &config.authorization_token).await?; | ||
relayer_senders.push(relayer_sender); | ||
relayer_receivers.push(relayer_receiver); | ||
} | ||
let sender = RelayerSender { ws_senders: relayer_senders }; | ||
tracing::info!("connected to relayers: {:?}", config.relayer_urls); | ||
Ok((sender, relayer_receivers)) | ||
} | ||
|
||
#[derive(Deserialize)] | ||
struct SymbolResponse { | ||
pub pyth_lazer_id: u32, | ||
pub name: String, | ||
pub symbol: String, | ||
pub description: String, | ||
pub asset_type: String, | ||
pub exponent: i32, | ||
pub cmc_id: Option<u32>, | ||
pub interval: Option<String>, | ||
pub min_publishers: u16, | ||
pub min_channel: String, | ||
pub state: String, | ||
pub hermes_id: Option<String>, | ||
ali-behjati marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe a leave a todo to move it to Lazer's protocol SDK? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the types inside are also not great. we have String for many things but they are more explicit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I copied from https://github.com/pyth-network/pyth-lazer/blob/main/history-service/src/api.rs#L91 ; sorry, not clear what they should be. |
||
|
||
async fn fetch_symbols( | ||
history_url: &Url, | ||
token: &str, | ||
) -> Result<Vec<SymbolResponse>> { | ||
let mut url = history_url.clone(); | ||
url.set_scheme("http").unwrap(); | ||
url.set_path("/history/v1/symbols"); | ||
let client = Client::new(); | ||
let response = client | ||
.get(url) | ||
.header( | ||
"Authorization", | ||
HeaderValue::from_str(&format!("Bearer {}", token))?, | ||
) | ||
merolish marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.send() | ||
.await? | ||
.text() | ||
ali-behjati marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.await?; | ||
Ok(serde_json::from_str(&response)?) | ||
} | ||
|
||
#[instrument(skip(config, state))] | ||
pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>> | ||
{ | ||
let mut handles = Vec::new(); | ||
handles.push(tokio::spawn(lazer_exporter::lazer_exporter(config.clone(), state))); | ||
handles | ||
} | ||
|
||
mod lazer_exporter { | ||
use std::collections::HashMap; | ||
use std::num::NonZeroI64; | ||
use std::sync::Arc; | ||
use futures_util::StreamExt; | ||
use pyth_lazer_protocol::publisher::PriceFeedDataV1; | ||
use pyth_lazer_protocol::router::{Price, PriceFeedId, TimestampUs}; | ||
use tokio_stream::StreamMap; | ||
use crate::agent::services::lazer_exporter::{Config, connect_to_relayers, fetch_symbols, SymbolResponse}; | ||
use crate::agent::state::local::LocalStore; | ||
|
||
pub async fn lazer_exporter<S>(config: Config, state: Arc<S>) | ||
where | ||
S: LocalStore, | ||
S: Send + Sync + 'static, | ||
{ | ||
// TODO: Re-fetch on an interval? | ||
let lazer_symbols: HashMap<String, SymbolResponse> = match fetch_symbols(&config.history_url, &config.authorization_token).await { | ||
Ok(symbols) => symbols.into_iter().filter_map(|symbol| { | ||
symbol.hermes_id.clone().map(|id| (id, symbol)) | ||
}).collect(), | ||
Err(e) => { | ||
tracing::error!("Failed to fetch Lazer symbols: {e:?}"); | ||
return; | ||
} | ||
}; | ||
|
||
// Establish relayer connections | ||
// Relayer will drop the connection if no data received in 5s | ||
let (mut relayer_sender, relayer_receivers) = connect_to_relayers(&config) | ||
.await.expect("failed to connect to relayers"); | ||
ali-behjati marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut stream_map = StreamMap::new(); | ||
for (i, receiver) in relayer_receivers.into_iter().enumerate() { | ||
stream_map.insert(config.relayer_urls[i].clone(), receiver); | ||
} | ||
|
||
let mut publish_interval = tokio::time::interval(config.publish_interval_duration); | ||
|
||
loop { | ||
tokio::select! { | ||
_ = publish_interval.tick() => { | ||
for (identifier, price_info) in state.get_all_price_infos().await { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we want to run this in high frequency i want to make sure this doesn't become expensive as it might copy the data. |
||
if let Some(symbol) = lazer_symbols.get(&identifier.to_string()) { | ||
if let Err(e) = relayer_sender.send_price_update(&PriceFeedDataV1 { | ||
price: Some(Price(NonZeroI64::try_from(price_info.price).unwrap())), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if the price is zero? are we sure it won't happen? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Excellent question, no idea, and I was curious on this enforcement in the Lazer messages. In the general case there are instruments out there (e.g. futures spreads) that can have zero or negative prices, but in terms of anything that anyone will reasonably send to agent, perhaps just warning log and drop a zero price message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we support negative but just not zero. This came from Pythnet. What i like you to do is at least not crash the whole process when it happens and gracefully handle it instead. |
||
best_ask_price: None, | ||
best_bid_price: None, | ||
Comment on lines
+250
to
+251
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use conf as bid and ask? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know. Typically conf (although we ask for it) is wider than bid-ask by publishers and it makes me a bit worried. let me ask internally. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i still prefer None here just because bid ask might not be symmetric. we can do it on a newer update protocol. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are calculating conf with bid/ask, this will be exactly what we are calculating in pythnet. But the bid and ask themselves might get skewed. |
||
price_feed_id: PriceFeedId(symbol.pyth_lazer_id), | ||
publisher_timestamp_us: TimestampUs::now(), | ||
source_timestamp_us: TimestampUs(price_info.timestamp.and_utc().timestamp_micros() as u64), | ||
}).await { | ||
tracing::error!("Error sending price update to relayer: {e:?}"); | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
// Handle messages from the relayers, such as errors if we send a bad update | ||
mapped_msg = stream_map.next() => { | ||
match mapped_msg { | ||
Some((relayer_url, Ok(msg))) => { | ||
tracing::debug!("Received message from relayer at {relayer_url}: {msg:?}"); | ||
} | ||
Some((relayer_url, Err(e))) => { | ||
tracing::error!("Error receiving message from at relayer {relayer_url}: {e:?}"); | ||
} | ||
None => { | ||
tracing::error!("relayer connection closed"); | ||
return; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does the thread halt when we get an error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, was wondering if that should crash the agent altogether or have retries in-process. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's have a few retries before crashing altogether. Also I'm not sure if we should crash at all because for example if lazer goes down the publishers will stop publishing to pythnet too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc: @ali-bahjati There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the behaviour on agent is to aggressively reconnect upon failures for Pythnet without crashing at all and i think it should be the same for Lazer. It's like a software you run once and it works all the time and should have lower operational overhead. |
||
} | ||
} | ||
} | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.