Skip to content

Commit cef03e2

Browse files
authored
Hermes: Add error log when receiving invalid quorum VAA (#2810)
1 parent bce96b5 commit cef03e2

File tree

7 files changed

+51
-33
lines changed

7 files changed

+51
-33
lines changed

apps/hermes/server/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/hermes/server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.10.0-alpha"
3+
version = "0.10.1-alpha"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

apps/hermes/server/src/api/rest.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ mod tests {
182182
unimplemented!("Not needed for this test")
183183
}
184184

185-
async fn store_update(&self, _update: Update) -> Result<()> {
185+
async fn store_update(&self, _update: Update) -> Result<bool> {
186186
unimplemented!("Not needed for this test")
187187
}
188188

apps/hermes/server/src/network/pythnet.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,8 +511,10 @@ where
511511
tokio::spawn({
512512
let state = state.clone();
513513
async move {
514-
if let Err(e) = state.process_message(vaa_bytes).await {
515-
tracing::debug!(error = ?e, "Skipped VAA.");
514+
// We always want to verify the VAA, even if it has been seen before.
515+
// This ensures that VAAs from the quorum are valid, and allows us to alert and log an error if they are not.
516+
if let Err(e) = state.process_message(vaa_bytes, true).await {
517+
tracing::error!(error = ?e, "Received an invalid VAA from PythNet quorum.");
516518
}
517519
}
518520
});

apps/hermes/server/src/network/wormhole.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,10 @@ where
160160
while let Some(Ok(message)) = stream.next().await {
161161
let state = state.clone();
162162
tokio::spawn(async move {
163-
if let Err(e) = state.process_message(message.vaa_bytes).await {
163+
// We do not want to verify the VAA if it has already been seen.
164+
// This improves performance, since the beacon may send the same body
165+
// multiple times with different signatures.
166+
if let Err(e) = state.process_message(message.vaa_bytes, false).await {
164167
tracing::debug!(error = ?e, "Skipped VAA.");
165168
}
166169
});

apps/hermes/server/src/state/aggregate.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ where
233233
{
234234
fn subscribe(&self) -> Receiver<AggregationEvent>;
235235
async fn is_ready(&self) -> (bool, ReadinessMetadata);
236-
async fn store_update(&self, update: Update) -> Result<()>;
236+
async fn store_update(&self, update: Update) -> Result<bool>;
237237
async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier>;
238238
async fn get_price_feeds_with_update_data(
239239
&self,
@@ -274,7 +274,7 @@ where
274274

275275
/// Stores the update data in the store
276276
#[tracing::instrument(skip(self, update))]
277-
async fn store_update(&self, update: Update) -> Result<()> {
277+
async fn store_update(&self, update: Update) -> Result<bool> {
278278
// The slot that the update is originating from. It should be available
279279
// in all the updates.
280280
let slot = match update {
@@ -300,7 +300,7 @@ where
300300
slot = proof.slot,
301301
"VAA Merkle Proof already stored, skipping."
302302
);
303-
return Ok(());
303+
return Ok(false);
304304
}
305305

306306
self.into()
@@ -331,7 +331,7 @@ where
331331
slot = slot,
332332
"Accumulator Messages already stored, skipping."
333333
);
334-
return Ok(());
334+
return Ok(false);
335335
}
336336

337337
self.into()
@@ -358,7 +358,7 @@ where
358358
(Some(accumulator_messages), Some(wormhole_merkle_state)) => {
359359
(accumulator_messages, wormhole_merkle_state)
360360
}
361-
_ => return Ok(()),
361+
_ => return Ok(true),
362362
};
363363

364364
tracing::info!(slot = wormhole_merkle_state.root.slot, "Completed Update.");
@@ -408,7 +408,7 @@ where
408408
.metrics
409409
.observe(slot, metrics::Event::CompletedUpdate);
410410

411-
Ok(())
411+
Ok(true)
412412
}
413413

414414
async fn get_twaps_with_update_data(

apps/hermes/server/src/state/wormhole.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,13 @@ impl<'a> From<&'a State> for &'a WormholeState {
6060

6161
#[async_trait::async_trait]
6262
pub trait Wormhole: Aggregates {
63-
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>);
64-
async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<()>;
63+
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) -> bool;
64+
/// Process a Wormhole message, extracting the VAA and storing it in the state.
65+
/// If `always_verify` is false, it will check if the VAA has been seen before verifying it.
66+
/// If true, it will verify the VAA even if it has been seen before.
67+
/// Returns true if the message was processed successfully, false if it was already seen.
68+
/// Throws an error if the VAA is invalid or cannot be processed.
69+
async fn process_message(&self, vaa_bytes: Vec<u8>, always_verify: bool) -> Result<bool>;
6570
async fn update_guardian_set(&self, id: u32, guardian_set: GuardianSet);
6671
}
6772

@@ -80,27 +85,35 @@ where
8085
}
8186

8287
#[tracing::instrument(skip(self, vaa_bytes))]
83-
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) {
88+
async fn store_vaa(&self, sequence: u64, vaa_bytes: Vec<u8>) -> bool {
8489
// Check VAA hasn't already been seen, this may have been checked previously
8590
// but due to async nature it's possible other threads have mutated the state
8691
// since this VAA started processing.
8792
let mut observed_vaa_seqs = self.into().observed_vaa_seqs.write().await;
8893
if observed_vaa_seqs.contains(&sequence) {
89-
return;
94+
return false;
9095
}
9196

9297
// Clear old cached VAA sequences.
9398
while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
9499
observed_vaa_seqs.pop_first();
95100
}
96101

102+
observed_vaa_seqs.insert(sequence);
103+
// Drop the lock to allow other threads to access the state.
104+
drop(observed_vaa_seqs);
105+
97106
// Hand the VAA to the aggregate store.
98-
if let Err(e) = Aggregates::store_update(self, Update::Vaa(vaa_bytes)).await {
99-
tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
107+
match Aggregates::store_update(self, Update::Vaa(vaa_bytes)).await {
108+
Ok(is_stored) => is_stored,
109+
Err(e) => {
110+
tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
111+
false
112+
}
100113
}
101114
}
102115

103-
async fn process_message(&self, vaa_bytes: Vec<u8>) -> Result<()> {
116+
async fn process_message(&self, vaa_bytes: Vec<u8>, always_verify: bool) -> Result<bool> {
104117
let vaa = serde_wormhole::from_slice::<Vaa<&RawMessage>>(&vaa_bytes)?;
105118

106119
// Log VAA Processing.
@@ -114,17 +127,19 @@ where
114127
};
115128
tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA");
116129

117-
// Check VAA hasn't already been seen.
118-
ensure!(
119-
!self
120-
.into()
121-
.observed_vaa_seqs
122-
.read()
123-
.await
124-
.contains(&vaa.sequence),
125-
"Previously observed VAA: {}",
126-
vaa.sequence
127-
);
130+
if !always_verify {
131+
// Check VAA hasn't already been seen.
132+
ensure!(
133+
!self
134+
.into()
135+
.observed_vaa_seqs
136+
.read()
137+
.await
138+
.contains(&vaa.sequence),
139+
"Previously observed VAA: {}",
140+
vaa.sequence
141+
);
142+
}
128143

129144
// Check VAA source is valid, we don't want to process other protocols VAAs.
130145
validate_vaa_source(&vaa)?;
@@ -140,9 +155,7 @@ where
140155
vaa,
141156
)?;
142157

143-
// Finally, store the resulting VAA in Hermes.
144-
self.store_vaa(vaa.sequence, vaa_bytes).await;
145-
Ok(())
158+
Ok(self.store_vaa(vaa.sequence, vaa_bytes).await)
146159
}
147160
}
148161
// Rejects VAAs from invalid sources.

0 commit comments

Comments
 (0)