1
- use anyhow:: { Context , Result } ;
1
+ use crate :: jrpc_handle:: jrpc_handler_inner;
2
+ use crate :: publisher_handle:: publisher_inner_handler;
3
+ use crate :: websocket_utils:: { handle_websocket_error, send_text} ;
4
+ use crate :: {
5
+ config:: Config , lazer_publisher:: LazerPublisher , publisher_handle:: PublisherConnectionContext ,
6
+ } ;
7
+ use anyhow:: { Context , Result , bail} ;
8
+ use futures_util:: io:: { BufReader , BufWriter } ;
9
+ use hyper:: body:: Incoming ;
2
10
use hyper:: { Response , StatusCode , body:: Bytes , server:: conn:: http1, service:: service_fn} ;
3
11
use hyper_util:: rt:: TokioIo ;
12
+ use pyth_lazer_protocol:: publisher:: { ServerResponse , UpdateDeserializationErrorResponse } ;
4
13
use soketto:: {
5
14
BoxedError ,
6
15
handshake:: http:: { Server , is_upgrade_request} ,
7
16
} ;
17
+ use std:: fmt:: Debug ;
18
+ use std:: pin:: Pin ;
8
19
use std:: { io, net:: SocketAddr } ;
9
20
use tokio:: net:: { TcpListener , TcpStream } ;
10
- use tracing:: { debug, info, instrument, warn} ;
11
-
12
- use crate :: {
13
- config:: Config ,
14
- lazer_publisher:: LazerPublisher ,
15
- publisher_handle:: { PublisherConnectionContext , handle_publisher} ,
16
- } ;
21
+ use tokio:: { pin, select} ;
22
+ use tokio_util:: compat:: TokioAsyncReadCompatExt ;
23
+ use tracing:: { debug, error, info, instrument, warn} ;
17
24
18
25
type FullBody = http_body_util:: Full < Bytes > ;
26
+ pub type InnerHandlerResult = Pin < Box < dyn Future < Output = Result < Option < String > > > + Send > > ;
19
27
20
- #[ derive( Debug ) ]
21
- pub enum Request {
28
+ #[ derive( Debug , Copy , Clone ) ]
29
+ pub enum PublisherRequest {
22
30
PublisherV1 ,
23
31
PublisherV2 ,
24
32
}
25
33
26
- pub struct RelayerRequest ( pub http:: Request < hyper:: body:: Incoming > ) ;
34
+ pub enum Request {
35
+ PublisherRequest ( PublisherRequest ) ,
36
+ JrpcV1 ,
37
+ }
27
38
28
- const PUBLISHER_WS_URI : & str = "/v1/publisher" ;
39
+ pub struct RelayerRequest ( pub http:: Request < Incoming > ) ;
40
+
41
+ const PUBLISHER_WS_URI_V1 : & str = "/v1/publisher" ;
29
42
const PUBLISHER_WS_URI_V2 : & str = "/v2/publisher" ;
43
+ const JRPC_WS_URI_V1 : & str = "/v1/jprc" ;
30
44
31
45
const READINESS_PROBE_PATH : & str = "/ready" ;
32
46
const LIVENESS_PROBE_PATH : & str = "/live" ;
@@ -38,15 +52,17 @@ pub async fn run(config: Config, lazer_publisher: LazerPublisher) -> Result<()>
38
52
loop {
39
53
let stream_addr = listener. accept ( ) . await ;
40
54
let lazer_publisher_clone = lazer_publisher. clone ( ) ;
55
+ let config = config. clone ( ) ;
41
56
tokio:: spawn ( async {
42
- if let Err ( err) = try_handle_connection ( stream_addr, lazer_publisher_clone) . await {
57
+ if let Err ( err) = try_handle_connection ( config , stream_addr, lazer_publisher_clone) . await {
43
58
warn ! ( "error while handling connection: {err:?}" ) ;
44
59
}
45
60
} ) ;
46
61
}
47
62
}
48
63
49
64
async fn try_handle_connection (
65
+ config : Config ,
50
66
stream_addr : io:: Result < ( TcpStream , SocketAddr ) > ,
51
67
lazer_publisher : LazerPublisher ,
52
68
) -> Result < ( ) > {
@@ -58,7 +74,7 @@ async fn try_handle_connection(
58
74
TokioIo :: new ( stream) ,
59
75
service_fn ( move |r| {
60
76
let request = RelayerRequest ( r) ;
61
- request_handler ( request, remote_addr, lazer_publisher. clone ( ) )
77
+ request_handler ( config . clone ( ) , request, remote_addr, lazer_publisher. clone ( ) )
62
78
} ) ,
63
79
)
64
80
. with_upgrades ( )
@@ -68,15 +84,17 @@ async fn try_handle_connection(
68
84
69
85
#[ instrument( skip_all, fields( component = "http_server" , remote_addr = remote_addr. to_string( ) ) ) ]
70
86
async fn request_handler (
87
+ config : Config ,
71
88
request : RelayerRequest ,
72
89
remote_addr : SocketAddr ,
73
90
lazer_publisher : LazerPublisher ,
74
91
) -> Result < Response < FullBody > , BoxedError > {
75
92
let path = request. 0 . uri ( ) . path ( ) ;
76
93
77
94
let request_type = match path {
78
- PUBLISHER_WS_URI => Request :: PublisherV1 ,
79
- PUBLISHER_WS_URI_V2 => Request :: PublisherV2 ,
95
+ PUBLISHER_WS_URI_V1 => Request :: PublisherRequest ( PublisherRequest :: PublisherV1 ) ,
96
+ PUBLISHER_WS_URI_V2 => Request :: PublisherRequest ( PublisherRequest :: PublisherV2 ) ,
97
+ JRPC_WS_URI_V1 => Request :: JrpcV1 ,
80
98
LIVENESS_PROBE_PATH => {
81
99
let response = Response :: builder ( ) . status ( StatusCode :: OK ) ;
82
100
return Ok ( response. body ( FullBody :: default ( ) ) ?) ;
@@ -113,17 +131,32 @@ async fn request_handler(
113
131
Ok ( response) => {
114
132
info ! ( "accepted connection from publisher" ) ;
115
133
match request_type {
116
- Request :: PublisherV1 | Request :: PublisherV2 => {
134
+ Request :: PublisherRequest ( publisher_request_type ) => {
117
135
let publisher_connection_context = PublisherConnectionContext {
118
- request_type,
136
+ request_type : publisher_request_type ,
119
137
_remote_addr : remote_addr,
120
138
} ;
121
- tokio:: spawn ( handle_publisher (
139
+
140
+ tokio:: spawn ( handle_ws (
141
+ config,
122
142
server,
123
143
request. 0 ,
144
+ lazer_publisher,
124
145
publisher_connection_context,
146
+ publisher_inner_handler,
147
+ ) ) ;
148
+ Ok ( response. map ( |( ) | FullBody :: default ( ) ) )
149
+ }
150
+ Request :: JrpcV1 => {
151
+ tokio:: spawn ( handle_ws (
152
+ config,
153
+ server,
154
+ request. 0 ,
125
155
lazer_publisher,
156
+ ( ) ,
157
+ jrpc_handler_inner,
126
158
) ) ;
159
+
127
160
Ok ( response. map ( |( ) | FullBody :: default ( ) ) )
128
161
}
129
162
}
@@ -137,3 +170,88 @@ async fn request_handler(
137
170
}
138
171
}
139
172
}
173
+
174
+ #[ instrument(
175
+ skip( server, request, lazer_publisher) ,
176
+ fields( component = "publisher_ws" )
177
+ ) ]
178
+ async fn handle_ws < T : Debug + Copy > (
179
+ config : Config ,
180
+ server : Server ,
181
+ request : http:: Request < Incoming > ,
182
+ lazer_publisher : LazerPublisher ,
183
+ context : T ,
184
+ inner_handler : fn ( Config , Vec < u8 > , LazerPublisher , T ) -> InnerHandlerResult ,
185
+ ) {
186
+ if let Err ( err) = try_handle_ws ( config, server, request, lazer_publisher, context, inner_handler) . await
187
+ {
188
+ handle_websocket_error ( err) ;
189
+ }
190
+ }
191
+
192
+ #[ instrument(
193
+ skip( server, request, lazer_publisher) ,
194
+ fields( component = "publisher_ws" )
195
+ ) ]
196
+ async fn try_handle_ws < T : Debug + Copy > (
197
+ config : Config ,
198
+ server : Server ,
199
+ request : http:: Request < Incoming > ,
200
+ lazer_publisher : LazerPublisher ,
201
+ context : T ,
202
+ inner_handler : fn ( Config , Vec < u8 > , LazerPublisher , T ) -> InnerHandlerResult ,
203
+ ) -> Result < ( ) > {
204
+ let stream = hyper:: upgrade:: on ( request) . await ?;
205
+ let io = TokioIo :: new ( stream) ;
206
+ let stream = BufReader :: new ( BufWriter :: new ( io. compat ( ) ) ) ;
207
+ let ( mut ws_sender, mut ws_receiver) = server. into_builder ( stream) . finish ( ) ;
208
+
209
+ let mut receive_buf = Vec :: new ( ) ;
210
+
211
+ let mut error_count = 0u32 ;
212
+ const MAX_ERROR_LOG : u32 = 10u32 ;
213
+ const MAX_ERROR_DISCONNECT : u32 = 100u32 ;
214
+
215
+ loop {
216
+ receive_buf. clear ( ) ;
217
+ {
218
+ // soketto is not cancel-safe, so we need to store the future and poll it
219
+ // in the inner loop.
220
+ let receive = async { ws_receiver. receive ( & mut receive_buf) . await } ;
221
+ pin ! ( receive) ;
222
+ loop {
223
+ select ! {
224
+ _result = & mut receive => {
225
+ break
226
+ }
227
+ }
228
+ }
229
+ }
230
+
231
+ match inner_handler ( config. clone ( ) , receive_buf. clone ( ) , lazer_publisher. clone ( ) , context) . await {
232
+ Ok ( response) => {
233
+ if let Some ( response) = response {
234
+ send_text ( & mut ws_sender, & response) . await ?;
235
+ }
236
+ }
237
+ Err ( err) => {
238
+ error_count += 1 ;
239
+ if error_count <= MAX_ERROR_LOG {
240
+ warn ! ( "Error decoding message error: {err}" ) ;
241
+ }
242
+ if error_count >= MAX_ERROR_DISCONNECT {
243
+ error ! ( "Error threshold reached; disconnecting" ) ;
244
+ bail ! ( "Error threshold reached" ) ;
245
+ }
246
+ let error_json = & serde_json:: to_string :: < ServerResponse > (
247
+ & UpdateDeserializationErrorResponse {
248
+ error : format ! ( "failed to parse a binary message: {err}" ) ,
249
+ }
250
+ . into ( ) ,
251
+ ) ?;
252
+ send_text ( & mut ws_sender, error_json) . await ?;
253
+ continue ;
254
+ }
255
+ }
256
+ }
257
+ }
0 commit comments