Skip to content

Commit 49b801a

Browse files
committed
Further fixes: backoff reset, use broadcast::channel, default config, drain pending updates
1 parent 435b8aa commit 49b801a

File tree

3 files changed

+55
-51
lines changed

3 files changed

+55
-51
lines changed

pyth-lazer-agent/src/lazer_publisher.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::config::{CHANNEL_CAPACITY, Config};
2-
use crate::relayer_session::RelayerSender;
2+
use crate::relayer_session::RelayerSessionTask;
33
use anyhow::{Context, Result, bail};
44
use ed25519_dalek::{Signer, SigningKey};
55
use protobuf::well_known_types::timestamp::Timestamp;
@@ -11,6 +11,7 @@ use pyth_lazer_publisher_sdk::transaction::{
1111
Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
1212
};
1313
use solana_keypair::read_keypair_file;
14+
use tokio::sync::broadcast;
1415
use tokio::{
1516
select,
1617
sync::mpsc::{self, Receiver, Sender},
@@ -25,20 +26,22 @@ pub struct LazerPublisher {
2526

2627
impl LazerPublisher {
2728
pub async fn new(config: &Config) -> Self {
28-
let relayer_senders = futures::future::join_all(
29-
config
30-
.relayer_urls
31-
.iter()
32-
.map(async |url| RelayerSender::new(url, &config.authorization_token).await),
33-
)
34-
.await;
29+
let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
30+
for url in config.relayer_urls.iter() {
31+
let mut task = RelayerSessionTask {
32+
url: url.clone(),
33+
token: config.authorization_token.to_owned(),
34+
receiver: relayer_sender.subscribe(),
35+
};
36+
tokio::spawn(async move { task.run().await });
37+
}
3538

3639
let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
3740
let mut task = LazerPublisherTask {
3841
config: config.clone(),
3942
receiver,
4043
pending_updates: Vec::new(),
41-
relayer_senders,
44+
relayer_sender,
4245
};
4346
tokio::spawn(async move { task.run().await });
4447
Self { sender }
@@ -55,7 +58,7 @@ struct LazerPublisherTask {
5558
config: Config,
5659
receiver: Receiver<FeedUpdate>,
5760
pending_updates: Vec<FeedUpdate>,
58-
relayer_senders: Vec<RelayerSender>,
61+
relayer_sender: broadcast::Sender<SignedLazerTransaction>,
5962
}
6063

6164
impl LazerPublisherTask {
@@ -108,7 +111,7 @@ impl LazerPublisherTask {
108111
}
109112

110113
let publisher_update = PublisherUpdate {
111-
updates: self.pending_updates.clone(),
114+
updates: self.pending_updates.drain(..).collect(),
112115
publisher_timestamp: MessageField::some(Timestamp::now()),
113116
special_fields: Default::default(),
114117
};
@@ -137,14 +140,13 @@ impl LazerPublisherTask {
137140
payload: Some(buf),
138141
special_fields: Default::default(),
139142
};
140-
futures::future::join_all(
141-
self.relayer_senders
142-
.iter_mut()
143-
.map(|relayer_sender| relayer_sender.sender.send(signed_lazer_transaction.clone())),
144-
)
145-
.await;
143+
match self.relayer_sender.send(signed_lazer_transaction.clone()) {
144+
Ok(_) => (),
145+
Err(e) => {
146+
tracing::error!("Error sending transaction to relayer receivers: {e}");
147+
}
148+
}
146149

147-
self.pending_updates.clear();
148150
Ok(())
149151
}
150152
}

pyth-lazer-agent/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ mod websocket_utils;
1616
#[derive(Parser)]
1717
#[command(version)]
1818
struct Cli {
19-
#[clap(short, long, default_value = "config.toml")]
19+
#[clap(short, long, default_value = "config/config.toml")]
2020
config: String,
2121
}
2222

pyth-lazer-agent/src/relayer_session.rs

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::config::CHANNEL_CAPACITY;
21
use anyhow::{Result, bail};
32
use backoff::ExponentialBackoffBuilder;
43
use backoff::backoff::Backoff;
@@ -7,36 +6,17 @@ use futures_util::{SinkExt, StreamExt};
76
use http::HeaderValue;
87
use protobuf::Message;
98
use pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction;
10-
use std::time::Duration;
9+
use std::time::{Duration, Instant};
1110
use tokio::net::TcpStream;
12-
use tokio::{
13-
select,
14-
sync::mpsc::{self, Receiver, Sender},
15-
};
11+
use tokio::select;
12+
use tokio::sync::broadcast;
1613
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
1714
use tokio_tungstenite::{
1815
MaybeTlsStream, WebSocketStream, connect_async_with_config,
1916
tungstenite::Message as TungsteniteMessage,
2017
};
2118
use url::Url;
2219

23-
pub struct RelayerSender {
24-
pub(crate) sender: Sender<SignedLazerTransaction>,
25-
}
26-
27-
impl RelayerSender {
28-
pub async fn new(url: &Url, token: &str) -> Self {
29-
let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
30-
let mut task = RelayerSessionTask {
31-
url: url.clone(),
32-
token: token.to_owned(),
33-
receiver,
34-
};
35-
tokio::spawn(async move { task.run().await });
36-
Self { sender }
37-
}
38-
}
39-
4020
type RelayerWsSender = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>;
4121
type RelayerWsReceiver = SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
4222

@@ -78,11 +58,11 @@ impl RelayerWsSession {
7858
}
7959
}
8060

81-
struct RelayerSessionTask {
61+
pub struct RelayerSessionTask {
8262
// connection state
83-
url: Url,
84-
token: String,
85-
receiver: Receiver<SignedLazerTransaction>,
63+
pub url: Url,
64+
pub token: String,
65+
pub receiver: broadcast::Receiver<SignedLazerTransaction>,
8666
}
8767

8868
impl RelayerSessionTask {
@@ -95,6 +75,8 @@ impl RelayerSessionTask {
9575
.with_max_elapsed_time(None)
9676
.build();
9777

78+
const FAILURE_RESET_TIME: Duration = Duration::from_secs(300);
79+
let mut first_failure_time = Instant::now();
9880
let mut failure_count = 0;
9981

10082
loop {
@@ -104,6 +86,12 @@ impl RelayerSessionTask {
10486
return;
10587
}
10688
Err(e) => {
89+
if first_failure_time.elapsed() > FAILURE_RESET_TIME {
90+
failure_count = 0;
91+
first_failure_time = Instant::now();
92+
backoff.reset();
93+
}
94+
10795
failure_count += 1;
10896
let next_backoff = backoff.next_backoff().unwrap_or(max_interval);
10997
tracing::error!(
@@ -129,11 +117,25 @@ impl RelayerSessionTask {
129117

130118
loop {
131119
select! {
132-
Some(transaction) = self.receiver.recv() => {
133-
if let Err(e) = relayer_ws_session.send_transaction(transaction).await
134-
{
135-
tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
136-
bail!("Failed to publish transaction to Lazer relayer: {e:?}");
120+
recv_result = self.receiver.recv() => {
121+
match recv_result {
122+
Ok(transaction) => {
123+
if let Err(e) = relayer_ws_session.send_transaction(transaction).await {
124+
tracing::error!("Error publishing transaction to Lazer relayer: {e:?}");
125+
bail!("Failed to publish transaction to Lazer relayer: {e:?}");
126+
}
127+
},
128+
Err(e) => {
129+
match e {
130+
broadcast::error::RecvError::Closed => {
131+
tracing::error!("transaction broadcast channel closed");
132+
bail!("transaction broadcast channel closed");
133+
}
134+
broadcast::error::RecvError::Lagged(skipped_count) => {
135+
tracing::warn!("transaction broadcast channel lagged by {skipped_count} messages");
136+
}
137+
}
138+
}
137139
}
138140
}
139141
// Handle messages from the relayers, such as errors if we send a bad update

0 commit comments

Comments
 (0)