Skip to content

Commit dac9d93

Browse files
committed
feat(pyth-lazer) Create schema for lazer agent JRPC endpoint
1 parent 3e24151 commit dac9d93

File tree

4 files changed

+309
-9
lines changed

4 files changed

+309
-9
lines changed

lazer/publisher_sdk/rust/src/lib.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use std::{collections::BTreeMap, time::Duration};
22

3+
use crate::publisher_update::feed_update::Update;
4+
use crate::publisher_update::{FeedUpdate, FundingRateUpdate, PriceUpdate};
35
use ::protobuf::MessageField;
46
use anyhow::{bail, ensure, Context};
57
use humantime::format_duration;
68
use protobuf::dynamic_value::{dynamic_value, DynamicValue};
9+
use pyth_lazer_protocol::jrpc::{FeedUpdateParams, UpdateParams};
710
use pyth_lazer_protocol::router::TimestampUs;
811

912
pub mod transaction_envelope {
@@ -164,3 +167,38 @@ impl TryFrom<DynamicValue> for serde_value::Value {
164167
}
165168
}
166169
}
170+
171+
impl From<FeedUpdateParams> for FeedUpdate {
172+
fn from(value: FeedUpdateParams) -> Self {
173+
FeedUpdate {
174+
feed_id: Some(value.feed_id.0),
175+
source_timestamp: value.source_timestamp.into(),
176+
update: Some(value.update.into()),
177+
special_fields: Default::default(),
178+
}
179+
}
180+
}
181+
182+
impl From<UpdateParams> for Update {
183+
fn from(value: UpdateParams) -> Self {
184+
match value {
185+
UpdateParams::PriceUpdate {
186+
price,
187+
best_bid_price,
188+
best_ask_price,
189+
} => Update::PriceUpdate(PriceUpdate {
190+
price: Some(price.0.into()),
191+
best_bid_price: Some(best_bid_price.0.into()),
192+
best_ask_price: Some(best_ask_price.0.into()),
193+
special_fields: Default::default(),
194+
}),
195+
UpdateParams::FundingRateUpdate { price, rate } => {
196+
Update::FundingRateUpdate(FundingRateUpdate {
197+
price: Some(price.0.into()),
198+
rate: Some(rate.0.into()),
199+
special_fields: Default::default(),
200+
})
201+
}
202+
}
203+
}
204+
}

lazer/sdk/rust/protocol/src/jrpc.rs

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
use std::time::Duration;
2+
use crate::router::{Price, PriceFeedId, Rate, TimestampUs};
3+
use serde::{Deserialize, Serialize};
4+
5+
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
6+
pub struct PythLazerAgentJrpcV1 {
7+
pub jsonrpc: JsonRpcVersion,
8+
#[serde(flatten)]
9+
pub params: JrpcParams,
10+
pub id: i64,
11+
}
12+
13+
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
14+
#[serde(tag = "method", content = "params")]
15+
pub enum JrpcParams {
16+
#[serde(rename = "send_updates")]
17+
SendUpdates(FeedUpdateParams),
18+
#[serde(rename = "get_symbols")]
19+
GetMetadata(GetMetadataParams),
20+
}
21+
22+
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
23+
pub struct FeedUpdateParams {
24+
pub feed_id: PriceFeedId,
25+
pub source_timestamp: TimestampUs,
26+
pub update: UpdateParams,
27+
}
28+
29+
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
30+
#[serde(tag = "type")]
31+
pub enum UpdateParams {
32+
#[serde(rename = "price")]
33+
PriceUpdate {
34+
price: Price,
35+
best_bid_price: Price,
36+
best_ask_price: Price,
37+
},
38+
#[serde(rename = "funding_rate")]
39+
FundingRateUpdate { price: Price, rate: Rate },
40+
}
41+
42+
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
43+
pub struct Filter {
44+
name: Option<String>,
45+
asset_type: Option<String>,
46+
}
47+
48+
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
49+
pub struct GetMetadataParams {
50+
filters: Option<Vec<Filter>>,
51+
}
52+
53+
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
54+
pub enum JsonRpcVersion {
55+
#[serde(rename = "2.0")]
56+
V2,
57+
}
58+
59+
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
60+
pub struct JrpcResponse<T> {
61+
pub jsonrpc: JsonRpcVersion,
62+
pub result: T,
63+
pub id: i64,
64+
}
65+
66+
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
67+
pub struct ErrorResponse {
68+
pub message: String
69+
}
70+
71+
#[derive(Serialize, Deserialize)]
72+
struct SymbolMetadata {
73+
pub asset_type: String,
74+
pub cmc_id: i64,
75+
pub description: String,
76+
pub exponent: i64,
77+
pub hermes_id: String,
78+
#[serde(default, with = "humantime_serde", alias = "interval")]
79+
pub interval: Option<Duration>,
80+
pub min_channel: String,
81+
pub min_publishers: i64,
82+
pub name: String,
83+
pub pyth_lazer_id: i64,
84+
pub schedule: String,
85+
pub state: String,
86+
pub symbol: String,
87+
}
88+
89+
#[cfg(test)]
90+
mod tests {
91+
use crate::jrpc::JrpcParams::{GetMetadata, SendUpdates};
92+
use crate::jrpc::{
93+
FeedUpdateParams, Filter, GetMetadataParams, JsonRpcVersion, PythLazerAgentJrpcV1,
94+
UpdateParams,
95+
};
96+
use crate::router::{Price, PriceFeedId, Rate, TimestampUs};
97+
98+
#[test]
99+
fn test_send_updates_price() {
100+
let json = r#"
101+
{
102+
"jsonrpc": "2.0",
103+
"method": "send_updates",
104+
"params": {
105+
"feed_id": 1,
106+
"source_timestamp": 124214124124,
107+
108+
"update": {
109+
"type": "price",
110+
"price": 1234567890,
111+
"best_bid_price": 1234567891,
112+
"best_ask_price": 1234567892
113+
}
114+
},
115+
"id": 1
116+
}
117+
"#;
118+
119+
let expected = PythLazerAgentJrpcV1 {
120+
jsonrpc: JsonRpcVersion::V2,
121+
params: SendUpdates(FeedUpdateParams {
122+
feed_id: PriceFeedId(1),
123+
source_timestamp: TimestampUs(124214124124),
124+
update: UpdateParams::PriceUpdate {
125+
price: Price::from_integer(1234567890, 0).unwrap(),
126+
best_bid_price: Price::from_integer(1234567891, 0).unwrap(),
127+
best_ask_price: Price::from_integer(1234567892, 0).unwrap(),
128+
},
129+
}),
130+
id: 1,
131+
};
132+
133+
assert_eq!(
134+
serde_json::from_str::<PythLazerAgentJrpcV1>(json).unwrap(),
135+
expected
136+
);
137+
}
138+
139+
#[test]
140+
fn test_send_updates_funding_rate() {
141+
let json = r#"
142+
{
143+
"jsonrpc": "2.0",
144+
"method": "send_updates",
145+
"params": {
146+
"feed_id": 1,
147+
"source_timestamp": 124214124124,
148+
149+
"update": {
150+
"type": "funding_rate",
151+
"price": 1234567890,
152+
"rate": 1234567891
153+
}
154+
},
155+
"id": 1
156+
}
157+
"#;
158+
159+
let expected = PythLazerAgentJrpcV1 {
160+
jsonrpc: JsonRpcVersion::V2,
161+
params: SendUpdates(FeedUpdateParams {
162+
feed_id: PriceFeedId(1),
163+
source_timestamp: TimestampUs(124214124124),
164+
update: UpdateParams::FundingRateUpdate {
165+
price: Price::from_integer(1234567890, 0).unwrap(),
166+
rate: Rate::from_integer(1234567891, 0).unwrap(),
167+
},
168+
}),
169+
id: 1,
170+
};
171+
172+
assert_eq!(
173+
serde_json::from_str::<PythLazerAgentJrpcV1>(json).unwrap(),
174+
expected
175+
);
176+
}
177+
#[test]
178+
fn test_send_get_symbols() {
179+
let json = r#"
180+
{
181+
"jsonrpc": "2.0",
182+
"method": "get_symbols",
183+
"params": {
184+
"filters": [
185+
{"name": "BTC/USD"},
186+
{"asset_type": "crypto"}
187+
]
188+
},
189+
"id": 1
190+
}
191+
"#;
192+
193+
let expected = PythLazerAgentJrpcV1 {
194+
jsonrpc: JsonRpcVersion::V2,
195+
params: GetMetadata(GetMetadataParams {
196+
filters: Some(vec![
197+
Filter {
198+
name: Some("BTC/USD".to_string()),
199+
asset_type: None,
200+
},
201+
Filter {
202+
name: None,
203+
asset_type: Some("crypto".to_string()),
204+
},
205+
]),
206+
}),
207+
id: 1,
208+
};
209+
210+
assert_eq!(
211+
serde_json::from_str::<PythLazerAgentJrpcV1>(json).unwrap(),
212+
expected
213+
);
214+
}
215+
216+
#[test]
217+
fn test_get_symbols_without_filters() {
218+
let json = r#"
219+
{
220+
"jsonrpc": "2.0",
221+
"method": "get_symbols",
222+
"params": {},
223+
"id": 1
224+
}
225+
"#;
226+
227+
let expected = PythLazerAgentJrpcV1 {
228+
jsonrpc: JsonRpcVersion::V2,
229+
params: GetMetadata(GetMetadataParams { filters: None }),
230+
id: 1,
231+
};
232+
233+
assert_eq!(
234+
serde_json::from_str::<PythLazerAgentJrpcV1>(json).unwrap(),
235+
expected
236+
);
237+
}
238+
}

lazer/sdk/rust/protocol/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
pub mod api;
44
pub mod binary_update;
5+
pub mod jrpc;
56
pub mod message;
67
pub mod payload;
78
pub mod publisher;
@@ -23,7 +24,7 @@ fn magics_in_big_endian() {
2324
};
2425

2526
// The values listed in this test can be used when reading the magic headers in BE format
26-
// (e.g. on EVM).
27+
// (e.g., on EVM).
2728

2829
assert_eq!(u32::swap_bytes(BINARY_UPDATE_FORMAT_MAGIC), 1937213467);
2930
assert_eq!(u32::swap_bytes(PAYLOAD_FORMAT_MAGIC), 1976813459);
@@ -44,6 +45,6 @@ fn magics_in_big_endian() {
4445
LE_UNSIGNED_FORMAT_MAGIC,
4546
] {
4647
// Required to distinguish between byte orders.
47-
assert!(u32::swap_bytes(magic) != magic);
48+
assert_ne!(u32::swap_bytes(magic), magic);
4849
}
4950
}

lazer/sdk/rust/protocol/src/router.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
//! WebSocket JSON protocol types for API the router provides to consumers and publishers.
1+
//! WebSocket JSON protocol types for the API the router provides to consumers and publishers.
22
3+
use protobuf::MessageField;
34
use {
45
crate::payload::AggregatedPriceFeedData,
56
anyhow::{bail, Context},
@@ -37,6 +38,26 @@ impl TryFrom<&Timestamp> for TimestampUs {
3738
}
3839
}
3940

41+
impl Into<Timestamp> for TimestampUs {
42+
fn into(self) -> Timestamp {
43+
Timestamp {
44+
#[allow(
45+
clippy::cast_possible_wrap,
46+
reason = "u64 to i64 after this division can never overflow because the value cannot be too big"
47+
)]
48+
seconds: (self.0 / 1_000_000) as i64,
49+
nanos: (self.0 % 1_000_000) as i32 * 1000,
50+
special_fields: Default::default(),
51+
}
52+
}
53+
}
54+
55+
impl Into<MessageField<Timestamp>> for TimestampUs {
56+
fn into(self) -> MessageField<Timestamp> {
57+
MessageField::some(self.into())
58+
}
59+
}
60+
4061
impl TimestampUs {
4162
pub fn now() -> Self {
4263
let value = SystemTime::now()
@@ -304,7 +325,7 @@ impl<'de> Deserialize<'de> for Channel {
304325
D: serde::Deserializer<'de>,
305326
{
306327
let value = <String>::deserialize(deserializer)?;
307-
parse_channel(&value).ok_or_else(|| D::Error::custom("unknown channel"))
328+
parse_channel(&value).ok_or_else(|| Error::custom("unknown channel"))
308329
}
309330
}
310331

@@ -341,12 +362,14 @@ fn fixed_rate_values() {
341362
"values must be unique and sorted"
342363
);
343364
for value in FixedRate::ALL {
344-
assert!(
345-
1000 % value.ms == 0,
365+
assert_eq!(
366+
1000 % value.ms,
367+
0,
346368
"1 s must contain whole number of intervals"
347369
);
348-
assert!(
349-
value.value_us() % FixedRate::MIN.value_us() == 0,
370+
assert_eq!(
371+
value.value_us() % FixedRate::MIN.value_us(),
372+
0,
350373
"the interval's borders must be a subset of the minimal interval's borders"
351374
);
352375
}
@@ -383,7 +406,7 @@ impl<'de> Deserialize<'de> for SubscriptionParams {
383406
D: serde::Deserializer<'de>,
384407
{
385408
let value = SubscriptionParamsRepr::deserialize(deserializer)?;
386-
Self::new(value).map_err(D::Error::custom)
409+
Self::new(value).map_err(Error::custom)
387410
}
388411
}
389412

0 commit comments

Comments
 (0)