Skip to content

Commit ca3c7a1

Browse files
committed
simplify the ws socket handling
1 parent e7b4a1b commit ca3c7a1

File tree

7 files changed

+502
-189
lines changed

7 files changed

+502
-189
lines changed
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
relayer_urls = ["ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction", "ws://relayer-0.pyth-lazer.dourolabs.app/v1/transaction"]
2-
publish_keypair_path = "/path/to/solana/id.json"
1+
relayer_urls = ["ws://localhost:10001"]
2+
publish_keypair_path = "/Users/bartplatak/workspace/pyth-crosschain/apps/pyth-lazer-agent/config/test_keypair.json"
33
listen_address = "0.0.0.0:8910"
44
publish_interval_duration = "25ms"
5+
authorization_token="token1"

apps/pyth-lazer-agent/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct Config {
1717
pub publish_keypair_path: PathBuf,
1818
#[serde(with = "humantime_serde", default = "default_publish_interval")]
1919
pub publish_interval_duration: Duration,
20+
pub history_service_url: Option<Url>
2021
}
2122

2223
fn default_publish_interval() -> Duration {
Lines changed: 22 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,23 @@
1-
use crate::jrpc_handle::jrpc_handler_inner;
2-
use crate::publisher_handle::publisher_inner_handler;
3-
use crate::websocket_utils::{handle_websocket_error, send_text};
1+
use crate::jrpc_handle::{JrpcConnectionContext, handle_jrpc};
2+
use crate::publisher_handle::handle_publisher;
43
use crate::{
5-
config::Config, lazer_publisher::LazerPublisher, publisher_handle::PublisherConnectionContext,
4+
config::Config, http_server, lazer_publisher::LazerPublisher,
5+
publisher_handle::PublisherConnectionContext,
66
};
7-
use anyhow::{Context, Result, bail};
8-
use futures_util::io::{BufReader, BufWriter};
7+
use anyhow::{Context, Result};
98
use hyper::body::Incoming;
109
use hyper::{Response, StatusCode, body::Bytes, server::conn::http1, service::service_fn};
1110
use hyper_util::rt::TokioIo;
12-
use pyth_lazer_protocol::publisher::{ServerResponse, UpdateDeserializationErrorResponse};
1311
use soketto::{
1412
BoxedError,
1513
handshake::http::{Server, is_upgrade_request},
1614
};
1715
use std::fmt::Debug;
18-
use std::pin::Pin;
1916
use std::{io, net::SocketAddr};
2017
use tokio::net::{TcpListener, TcpStream};
21-
use tokio::{pin, select};
22-
use tokio_util::compat::TokioAsyncReadCompatExt;
23-
use tracing::{debug, error, info, instrument, warn};
18+
use tracing::{debug, info, instrument, warn};
2419

2520
type FullBody = http_body_util::Full<Bytes>;
26-
pub type InnerHandlerResult = Pin<Box<dyn Future<Output = Result<Option<String>>> + Send>>;
2721

2822
#[derive(Debug, Copy, Clone)]
2923
pub enum PublisherRequest {
@@ -53,8 +47,10 @@ pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()>
5347
let stream_addr = listener.accept().await;
5448
let lazer_publisher_clone = lazer_publisher.clone();
5549
let config = config.clone();
56-
tokio::spawn(async {
57-
if let Err(err) = try_handle_connection(config, stream_addr, lazer_publisher_clone).await {
50+
tokio::spawn(async move {
51+
if let Err(err) =
52+
try_handle_connection(config, stream_addr, lazer_publisher_clone).await
53+
{
5854
warn!("error while handling connection: {err:?}");
5955
}
6056
});
@@ -74,7 +70,12 @@ async fn try_handle_connection(
7470
TokioIo::new(stream),
7571
service_fn(move |r| {
7672
let request = RelayerRequest(r);
77-
request_handler(config.clone(), request, remote_addr, lazer_publisher.clone())
73+
request_handler(
74+
config.clone(),
75+
request,
76+
remote_addr,
77+
lazer_publisher.clone(),
78+
)
7879
}),
7980
)
8081
.with_upgrades()
@@ -136,27 +137,23 @@ async fn request_handler(
136137
request_type: publisher_request_type,
137138
_remote_addr: remote_addr,
138139
};
139-
140-
tokio::spawn(handle_ws(
141-
config,
140+
tokio::spawn(handle_publisher(
142141
server,
143142
request.0,
144-
lazer_publisher,
145143
publisher_connection_context,
146-
publisher_inner_handler,
144+
lazer_publisher,
147145
));
148146
Ok(response.map(|()| FullBody::default()))
149147
}
150148
Request::JrpcV1 => {
151-
tokio::spawn(handle_ws(
152-
config,
149+
let publisher_connection_context = JrpcConnectionContext {};
150+
tokio::spawn(handle_jrpc(
151+
config.clone(),
153152
server,
154153
request.0,
154+
publisher_connection_context,
155155
lazer_publisher,
156-
(),
157-
jrpc_handler_inner,
158156
));
159-
160157
Ok(response.map(|()| FullBody::default()))
161158
}
162159
}
@@ -170,88 +167,3 @@ async fn request_handler(
170167
}
171168
}
172169
}
173-
174-
#[instrument(
175-
skip(server, request, lazer_publisher),
176-
fields(component = "publisher_ws")
177-
)]
178-
async fn handle_ws<T: Debug + Copy>(
179-
config: Config,
180-
server: Server,
181-
request: http::Request<Incoming>,
182-
lazer_publisher: LazerPublisher,
183-
context: T,
184-
inner_handler: fn(Config, Vec<u8>, LazerPublisher, T) -> InnerHandlerResult,
185-
) {
186-
if let Err(err) = try_handle_ws(config, server, request, lazer_publisher, context, inner_handler).await
187-
{
188-
handle_websocket_error(err);
189-
}
190-
}
191-
192-
#[instrument(
193-
skip(server, request, lazer_publisher),
194-
fields(component = "publisher_ws")
195-
)]
196-
async fn try_handle_ws<T: Debug + Copy>(
197-
config: Config,
198-
server: Server,
199-
request: http::Request<Incoming>,
200-
lazer_publisher: LazerPublisher,
201-
context: T,
202-
inner_handler: fn(Config, Vec<u8>, LazerPublisher, T) -> InnerHandlerResult,
203-
) -> Result<()> {
204-
let stream = hyper::upgrade::on(request).await?;
205-
let io = TokioIo::new(stream);
206-
let stream = BufReader::new(BufWriter::new(io.compat()));
207-
let (mut ws_sender, mut ws_receiver) = server.into_builder(stream).finish();
208-
209-
let mut receive_buf = Vec::new();
210-
211-
let mut error_count = 0u32;
212-
const MAX_ERROR_LOG: u32 = 10u32;
213-
const MAX_ERROR_DISCONNECT: u32 = 100u32;
214-
215-
loop {
216-
receive_buf.clear();
217-
{
218-
// soketto is not cancel-safe, so we need to store the future and poll it
219-
// in the inner loop.
220-
let receive = async { ws_receiver.receive(&mut receive_buf).await };
221-
pin!(receive);
222-
loop {
223-
select! {
224-
_result = &mut receive => {
225-
break
226-
}
227-
}
228-
}
229-
}
230-
231-
match inner_handler(config.clone(), receive_buf.clone(), lazer_publisher.clone(), context).await {
232-
Ok(response) => {
233-
if let Some(response) = response {
234-
send_text(&mut ws_sender, &response).await?;
235-
}
236-
}
237-
Err(err) => {
238-
error_count += 1;
239-
if error_count <= MAX_ERROR_LOG {
240-
warn!("Error decoding message error: {err}");
241-
}
242-
if error_count >= MAX_ERROR_DISCONNECT {
243-
error!("Error threshold reached; disconnecting");
244-
bail!("Error threshold reached");
245-
}
246-
let error_json = &serde_json::to_string::<ServerResponse>(
247-
&UpdateDeserializationErrorResponse {
248-
error: format!("failed to parse a binary message: {err}"),
249-
}
250-
.into(),
251-
)?;
252-
send_text(&mut ws_sender, error_json).await?;
253-
continue;
254-
}
255-
}
256-
}
257-
}

0 commit comments

Comments
 (0)