Skip to content

Commit 93e9c49

Browse files
committed
fix(hermes): handle price feed removal properly
1 parent 214bafa commit 93e9c49

File tree

5 files changed

+156
-26
lines changed

5 files changed

+156
-26
lines changed

hermes/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.4.3"
3+
version = "0.4.4"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

hermes/src/aggregate.rs

Lines changed: 96 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,15 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
251251

252252
// Once the accumulator reaches a complete state for a specific slot
253253
// we can build the message states
254-
build_message_states(state, accumulator_messages, wormhole_merkle_state).await?;
254+
let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?;
255+
256+
let message_state_keys = message_states
257+
.iter()
258+
.map(|message_state| message_state.key())
259+
.collect::<HashSet<_>>();
260+
261+
tracing::info!(len = message_states.len(), "Storing Message States.");
262+
state.store_message_states(message_states).await?;
255263

256264
// Update the aggregate state
257265
let mut aggregate_state = state.aggregate_state.write().await;
@@ -266,6 +274,7 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
266274
.await?;
267275
}
268276
Some(latest) if slot > latest => {
277+
state.prune_removed_keys(message_state_keys).await;
269278
aggregate_state.latest_completed_slot.replace(slot);
270279
state
271280
.api_update_tx
@@ -296,18 +305,17 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
296305
Ok(())
297306
}
298307

299-
#[tracing::instrument(skip(state, accumulator_messages, wormhole_merkle_state))]
300-
async fn build_message_states(
301-
state: &State,
308+
#[tracing::instrument(skip(accumulator_messages, wormhole_merkle_state))]
309+
fn build_message_states(
302310
accumulator_messages: AccumulatorMessages,
303311
wormhole_merkle_state: WormholeMerkleState,
304-
) -> Result<()> {
312+
) -> Result<Vec<MessageState>> {
305313
let wormhole_merkle_message_states_proofs =
306314
construct_message_states_proofs(&accumulator_messages, &wormhole_merkle_state)?;
307315

308316
let current_time: UnixTimestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _;
309317

310-
let message_states = accumulator_messages
318+
accumulator_messages
311319
.raw_messages
312320
.into_iter()
313321
.enumerate()
@@ -326,13 +334,7 @@ async fn build_message_states(
326334
current_time,
327335
))
328336
})
329-
.collect::<Result<Vec<_>>>()?;
330-
331-
tracing::info!(len = message_states.len(), "Storing Message States.");
332-
333-
state.store_message_states(message_states).await?;
334-
335-
Ok(())
337+
.collect::<Result<Vec<_>>>()
336338
}
337339

338340
async fn get_verified_price_feeds<S>(
@@ -677,6 +679,87 @@ mod test {
677679
}
678680
}
679681

682+
/// On this test we will initially have two price feeds. Then we will send an update with only
683+
/// price feed 1 (without price feed 2) and make sure that price feed 2 is not stored anymore.
684+
#[tokio::test]
685+
pub async fn test_getting_price_ids_works_fine_after_price_removal() {
686+
let (state, mut update_rx) = setup_state(10).await;
687+
688+
let price_feed_1 = create_dummy_price_feed_message(100, 10, 9);
689+
let price_feed_2 = create_dummy_price_feed_message(200, 10, 9);
690+
691+
// Populate the state
692+
store_multiple_concurrent_valid_updates(
693+
state.clone(),
694+
generate_update(
695+
vec![
696+
Message::PriceFeedMessage(price_feed_1),
697+
Message::PriceFeedMessage(price_feed_2),
698+
],
699+
10,
700+
20,
701+
),
702+
)
703+
.await;
704+
705+
// Check that the update_rx channel has received a message
706+
assert_eq!(
707+
update_rx.recv().await,
708+
Some(AggregationEvent::New { slot: 10 })
709+
);
710+
711+
// Check the price ids are stored correctly
712+
assert_eq!(
713+
get_price_feed_ids(&*state).await,
714+
vec![
715+
PriceIdentifier::new([100; 32]),
716+
PriceIdentifier::new([200; 32])
717+
]
718+
.into_iter()
719+
.collect()
720+
);
721+
722+
// Check that price feed 2 exists
723+
assert!(get_price_feeds_with_update_data(
724+
&*state,
725+
&[PriceIdentifier::new([200; 32])],
726+
RequestTime::Latest,
727+
)
728+
.await
729+
.is_ok());
730+
731+
// Now send an update with only price feed 1 (without price feed 2)
732+
// and make sure that price feed 2 is not stored anymore.
733+
let price_feed_1 = create_dummy_price_feed_message(100, 12, 10);
734+
735+
// Populate the state
736+
store_multiple_concurrent_valid_updates(
737+
state.clone(),
738+
generate_update(vec![Message::PriceFeedMessage(price_feed_1)], 15, 30),
739+
)
740+
.await;
741+
742+
// Check that the update_rx channel has received a message
743+
assert_eq!(
744+
update_rx.recv().await,
745+
Some(AggregationEvent::New { slot: 15 })
746+
);
747+
748+
// Check that price feed 2 does not exist anymore
749+
assert_eq!(
750+
get_price_feed_ids(&*state).await,
751+
vec![PriceIdentifier::new([100; 32]),].into_iter().collect()
752+
);
753+
754+
assert!(get_price_feeds_with_update_data(
755+
&*state,
756+
&[PriceIdentifier::new([200; 32])],
757+
RequestTime::Latest,
758+
)
759+
.await
760+
.is_err());
761+
}
762+
680763
#[tokio::test]
681764
pub async fn test_metadata_times_and_readiness_work() {
682765
// The receiver channel should stay open for the state to work

hermes/src/api/ws.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -350,22 +350,43 @@ impl Subscriber {
350350
.keys()
351351
.cloned()
352352
.collect::<Vec<_>>();
353-
for update in crate::aggregate::get_price_feeds_with_update_data(
353+
354+
let updates = match crate::aggregate::get_price_feeds_with_update_data(
354355
&*self.store,
355356
&price_feed_ids,
356357
RequestTime::AtSlot(event.slot()),
357358
)
358359
.await
359-
.map_err(|e| {
360-
tracing::warn!(
361-
"Failed to get price feeds {:?} with update data: {:?}",
362-
price_feed_ids,
363-
e
364-
);
365-
e
366-
})?
367-
.price_feeds
368360
{
361+
Ok(updates) => updates,
362+
Err(_) => {
363+
// The error can only happen when a price feed was available
364+
// and is no longer there as we check the price feed ids upon
365+
// subscription. In this case we just remove the non-existing
366+
// price feed from the list and will keep sending updates for
367+
// the rest.
368+
let available_price_feed_ids =
369+
crate::aggregate::get_price_feed_ids(&*self.store).await;
370+
371+
self.price_feeds_with_config
372+
.retain(|price_feed_id, _| available_price_feed_ids.contains(price_feed_id));
373+
374+
let price_feed_ids = self
375+
.price_feeds_with_config
376+
.keys()
377+
.cloned()
378+
.collect::<Vec<_>>();
379+
380+
crate::aggregate::get_price_feeds_with_update_data(
381+
&*self.store,
382+
&price_feed_ids,
383+
RequestTime::AtSlot(event.slot()),
384+
)
385+
.await?
386+
}
387+
};
388+
389+
for update in updates.price_feeds {
369390
let config = self
370391
.price_feeds_with_config
371392
.get(&update.price_feed.id)

hermes/src/state/cache.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use {
2222
collections::{
2323
BTreeMap,
2424
HashMap,
25+
HashSet,
2526
},
2627
ops::Bound,
2728
sync::Arc,
@@ -169,6 +170,7 @@ impl Cache {
169170
pub trait AggregateCache {
170171
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
171172
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
173+
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>);
172174
async fn fetch_message_states(
173175
&self,
174176
ids: Vec<FeedId>,
@@ -206,17 +208,41 @@ impl AggregateCache for crate::state::State {
206208
let key = message_state.key();
207209
let time = message_state.time();
208210
let cache = message_cache.entry(key).or_insert_with(BTreeMap::new);
209-
210211
cache.insert(time, message_state);
211212

212213
// Remove the earliest message states if the cache size is exceeded
213214
while cache.len() > self.cache.cache_size as usize {
214215
cache.pop_first();
215216
}
216217
}
218+
217219
Ok(())
218220
}
219221

222+
/// This method takes the current feed ids and prunes the cache for the keys
223+
/// that are not present in the current feed ids.
224+
///
225+
/// There is a side-effect of this: if a key gets removed, we will
226+
/// lose the cache for that key and cannot retrieve it for historical
227+
/// price queries.
228+
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>) {
229+
let mut message_cache = self.cache.message_cache.write().await;
230+
231+
// Sometimes, some keys are removed from the accumulator. We track which keys are not
232+
// present in the message states and remove them from the cache.
233+
let keys_in_cache = message_cache
234+
.iter()
235+
.map(|(key, _)| key.clone())
236+
.collect::<HashSet<_>>();
237+
238+
for key in keys_in_cache {
239+
if !current_keys.contains(&key) {
240+
tracing::info!("Feed {:?} seems to be removed. Removing it from cache", key);
241+
message_cache.remove(&key);
242+
}
243+
}
244+
}
245+
220246
async fn fetch_message_states(
221247
&self,
222248
ids: Vec<FeedId>,

0 commit comments

Comments
 (0)