Skip to content

Commit a7bb916

Browse files
authored
feat(hermes): add additional sse features (#1443)
* add allow_unordered query param * add benchmarks_only query params * update docs * bump * address comments * address comments * address comments
1 parent 392a3df commit a7bb916

File tree

9 files changed

+116
-61
lines changed

9 files changed

+116
-61
lines changed

hermes/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.5.4"
3+
version = "0.5.5"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

hermes/src/api.rs

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use {
2323
mod doc_examples;
2424
mod metrics_middleware;
2525
mod rest;
26-
mod sse;
2726
pub mod types;
2827
mod ws;
2928

@@ -96,39 +95,40 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> {
9695

9796
#[derive(OpenApi)]
9897
#[openapi(
99-
paths(
100-
rest::get_price_feed,
101-
rest::get_vaa,
102-
rest::get_vaa_ccip,
103-
rest::latest_price_feeds,
104-
rest::latest_vaas,
105-
rest::price_feed_ids,
106-
rest::latest_price_updates,
107-
rest::timestamp_price_updates,
108-
rest::price_feeds_metadata,
109-
),
110-
components(
111-
schemas(
112-
rest::GetVaaCcipInput,
113-
rest::GetVaaCcipResponse,
114-
rest::GetVaaResponse,
115-
types::PriceIdInput,
116-
types::RpcPrice,
117-
types::RpcPriceFeed,
118-
types::RpcPriceFeedMetadata,
119-
types::RpcPriceIdentifier,
120-
types::EncodingType,
121-
types::PriceUpdate,
122-
types::BinaryPriceUpdate,
123-
types::ParsedPriceUpdate,
124-
types::RpcPriceFeedMetadataV2,
125-
types::PriceFeedMetadata,
126-
types::AssetType
127-
)
128-
),
129-
tags(
130-
(name = "hermes", description = "Pyth Real-Time Pricing API")
131-
)
98+
paths(
99+
rest::get_price_feed,
100+
rest::get_vaa,
101+
rest::get_vaa_ccip,
102+
rest::latest_price_feeds,
103+
rest::latest_vaas,
104+
rest::price_feed_ids,
105+
rest::latest_price_updates,
106+
rest::timestamp_price_updates,
107+
rest::price_feeds_metadata,
108+
rest::price_stream_sse_handler,
109+
),
110+
components(
111+
schemas(
112+
rest::GetVaaCcipInput,
113+
rest::GetVaaCcipResponse,
114+
rest::GetVaaResponse,
115+
types::PriceIdInput,
116+
types::RpcPrice,
117+
types::RpcPriceFeed,
118+
types::RpcPriceFeedMetadata,
119+
types::RpcPriceIdentifier,
120+
types::EncodingType,
121+
types::PriceUpdate,
122+
types::BinaryPriceUpdate,
123+
types::ParsedPriceUpdate,
124+
types::RpcPriceFeedMetadataV2,
125+
types::PriceFeedMetadata,
126+
types::AssetType
127+
)
128+
),
129+
tags(
130+
(name = "hermes", description = "Pyth Real-Time Pricing API")
131+
)
132132
)]
133133
struct ApiDoc;
134134

@@ -146,7 +146,7 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> {
146146
.route("/api/price_feed_ids", get(rest::price_feed_ids))
147147
.route(
148148
"/v2/updates/price/stream",
149-
get(sse::price_stream_sse_handler),
149+
get(rest::price_stream_sse_handler),
150150
)
151151
.route("/v2/updates/price/latest", get(rest::latest_price_updates))
152152
.route(

hermes/src/api/rest.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub use {
3535
v2::{
3636
latest_price_updates::*,
3737
price_feeds_metadata::*,
38+
sse::*,
3839
timestamp_price_updates::*,
3940
},
4041
};

hermes/src/api/rest/index.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub async fn index() -> impl IntoResponse {
1717
"/api/get_vaa?id=<price_feed_id>&publish_time=<publish_time_in_unix_timestamp>",
1818
"/api/get_vaa_ccip?data=<0x<price_feed_id_32_bytes>+<publish_time_unix_timestamp_be_8_bytes>>",
1919
"/v2/updates/price/latest?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
20+
"/v2/updates/price/stream?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)(&allow_unordered=false)(&benchmarks_only=false)",
2021
"/v2/updates/price/<timestamp>?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&encoding=hex|base64)(&parsed=false)",
2122
"/v2/price_feeds?(query=btc)(&asset_type=crypto|equity|fx|metal|rates)",
2223
])

hermes/src/api/rest/v2/latest_price_updates.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ pub struct LatestPriceUpdatesQueryParams {
4646
#[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")]
4747
ids: Vec<PriceIdInput>,
4848

49-
/// If true, include the parsed price update in the `parsed` field of each returned feed.
49+
/// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `hex`.
5050
#[serde(default)]
5151
encoding: EncodingType,
5252

53-
/// If true, include the parsed price update in the `parsed` field of each returned feed.
53+
/// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `true`.
5454
#[serde(default = "default_true")]
5555
parsed: bool,
5656
}

hermes/src/api/rest/v2/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod latest_price_updates;
22
pub mod price_feeds_metadata;
3+
pub mod sse;
34
pub mod timestamp_price_updates;

hermes/src/api/sse.rs renamed to hermes/src/api/rest/v2/sse.rs

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use {
1515
ParsedPriceUpdate,
1616
PriceIdInput,
1717
PriceUpdate,
18+
RpcPriceIdentifier,
1819
},
1920
ApiState,
2021
},
@@ -56,13 +57,21 @@ pub struct StreamPriceUpdatesQueryParams {
5657
#[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")]
5758
ids: Vec<PriceIdInput>,
5859

59-
/// If true, include the parsed price update in the `parsed` field of each returned feed.
60+
/// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `hex`.
6061
#[serde(default)]
6162
encoding: EncodingType,
6263

63-
/// If true, include the parsed price update in the `parsed` field of each returned feed.
64+
/// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `true`.
6465
#[serde(default = "default_true")]
6566
parsed: bool,
67+
68+
/// If true, allows unordered price updates to be included in the stream.
69+
#[serde(default)]
70+
allow_unordered: bool,
71+
72+
/// If true, only include benchmark prices that are the initial price updates at a given timestamp (i.e., prevPubTime != pubTime).
73+
#[serde(default)]
74+
benchmarks_only: bool,
6675
}
6776

6877
fn default_true() -> bool {
@@ -105,10 +114,15 @@ pub async fn price_stream_sse_handler(
105114
price_ids_clone,
106115
params.encoding,
107116
params.parsed,
117+
params.benchmarks_only,
118+
params.allow_unordered,
108119
)
109120
.await
110121
{
111-
Ok(price_update) => Ok(Event::default().json_data(price_update).unwrap()),
122+
Ok(Some(update)) => Ok(Event::default()
123+
.json_data(update)
124+
.unwrap_or_else(|e| error_event(e))),
125+
Ok(None) => Ok(Event::default().comment("No update available")),
112126
Err(e) => Ok(error_event(e)),
113127
}
114128
}
@@ -126,18 +140,64 @@ async fn handle_aggregation_event(
126140
mut price_ids: Vec<PriceIdentifier>,
127141
encoding: EncodingType,
128142
parsed: bool,
129-
) -> Result<PriceUpdate> {
143+
benchmarks_only: bool,
144+
allow_unordered: bool,
145+
) -> Result<Option<PriceUpdate>> {
146+
// Handle out-of-order events
147+
if let AggregationEvent::OutOfOrder { .. } = event {
148+
if !allow_unordered {
149+
return Ok(None);
150+
}
151+
}
152+
130153
// We check for available price feed ids to ensure that the price feed ids provided exists since price feeds can be removed.
131154
let available_price_feed_ids = crate::aggregate::get_price_feed_ids(&*state.state).await;
132155

133156
price_ids.retain(|price_feed_id| available_price_feed_ids.contains(price_feed_id));
134157

135-
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
158+
let mut price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
136159
&*state.state,
137160
&price_ids,
138161
RequestTime::AtSlot(event.slot()),
139162
)
140163
.await?;
164+
165+
let mut parsed_price_updates: Vec<ParsedPriceUpdate> = price_feeds_with_update_data
166+
.price_feeds
167+
.into_iter()
168+
.map(|price_feed| price_feed.into())
169+
.collect();
170+
171+
172+
if benchmarks_only {
173+
// Remove those with metadata.prev_publish_time != price.publish_time from parsed_price_updates
174+
parsed_price_updates.retain(|price_feed| {
175+
price_feed
176+
.metadata
177+
.prev_publish_time
178+
.map_or(false, |prev_time| {
179+
prev_time != price_feed.price.publish_time
180+
})
181+
});
182+
// Retain price id in price_ids that are in parsed_price_updates
183+
price_ids.retain(|price_id| {
184+
parsed_price_updates
185+
.iter()
186+
.any(|price_feed| price_feed.id == RpcPriceIdentifier::from(*price_id))
187+
});
188+
price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
189+
&*state.state,
190+
&price_ids,
191+
RequestTime::AtSlot(event.slot()),
192+
)
193+
.await?;
194+
}
195+
196+
// Check if price_ids is empty after filtering and return None if it is
197+
if price_ids.is_empty() {
198+
return Ok(None);
199+
}
200+
141201
let price_update_data = price_feeds_with_update_data.update_data;
142202
let encoded_data: Vec<String> = price_update_data
143203
.into_iter()
@@ -147,23 +207,15 @@ async fn handle_aggregation_event(
147207
encoding,
148208
data: encoded_data,
149209
};
150-
let parsed_price_updates: Option<Vec<ParsedPriceUpdate>> = if parsed {
151-
Some(
152-
price_feeds_with_update_data
153-
.price_feeds
154-
.into_iter()
155-
.map(|price_feed| price_feed.into())
156-
.collect(),
157-
)
158-
} else {
159-
None
160-
};
161210

162-
163-
Ok(PriceUpdate {
211+
Ok(Some(PriceUpdate {
164212
binary: binary_price_update,
165-
parsed: parsed_price_updates,
166-
})
213+
parsed: if parsed {
214+
Some(parsed_price_updates)
215+
} else {
216+
None
217+
},
218+
}))
167219
}
168220

169221
fn error_event<E: std::fmt::Debug>(e: E) -> Event {

hermes/src/api/rest/v2/timestamp_price_updates.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ pub struct TimestampPriceUpdatesQueryParams {
5858
#[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")]
5959
ids: Vec<PriceIdInput>,
6060

61-
/// If true, include the parsed price update in the `parsed` field of each returned feed.
61+
/// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `hex`.
6262
#[serde(default)]
6363
encoding: EncodingType,
6464

65-
/// If true, include the parsed price update in the `parsed` field of each returned feed.
65+
/// If true, include the parsed price update in the `parsed` field of each returned feed. Default is `true`.
6666
#[serde(default = "default_true")]
6767
parsed: bool,
6868
}

0 commit comments

Comments
 (0)